Go 深入源码 —— channel

Iceber · · 402 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

> Don't communicate by sharing memory, share memory by communicating. **不要通过共享内存来通信,而要通过通信来实现内存共享。** ## 数据结构 我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列,那么如何实现这种队列 **channel 的底层数据结构是一个 `*hchan`,在编译时期会将 make(chan int) 语句转换成 makechan 函数调用** ### hchan ```go // runtime/chan.go type hchan struct { lock mutex // lock 用来保护 hchan 上所有的字段 // 缓冲区实际是一个循环队列 buf unsafe.Pointer // 指向缓冲区的指针 dataqsiz uint // 缓冲区循环队列的大小 sendx uint // 缓冲区循环队列接收下一个元素的索引 recvx uint // 缓冲区循环队列中下一个会返回的元素的索引 qcount uint // 当前 hchan 缓存的元素数量 closed uint32 // hchan 是否关闭 elemsize uint16 // hchan 的元素大小 elemtype *_type // hchan 的元素类型 recvq waitq // 等待接收的 goroutine 队列 sendq waitq // 等待发送的 goroutine 队列 } ``` 可以看出 channel 的底层数据结构 * 缓冲区 `buf` 底层是一个循环队列,`dataqsiz` 和 `qcount` 分别记录了缓冲区的大小和当前缓冲的元素数量,`sendx`,`recvx` 用来记录位置索引 * `elemsize` 和 `elemtype` 表示元素大小和类型 * `recvq` 和 `sendq` 来记录被发送接收阻塞的 goroutine 队列 * `closed` 用来记录是否关闭 * `lock` 用来保护hchan中的字段,更新其他字段的时候都需要加锁 对于无缓冲 channel 是不需要和缓冲区相关的字段的 **channel 在实现中依然使用到了锁,Go 所说的 *使用通信来实现共享内存*,实际上依然在底层使用锁来保证读写的原子性,实现出了一个面向数据流式的数据结构** ### 待发送者和待接收者 注意到 `recvq` 和 `sendq` 类型 `waitq` 是一个双向链表,提供了等待 goroutine 的出队入队 ```go // runtime/chan.go type waitq struct { first *sudog last * sudog } func(q *waitq) enqueue(sgp *sudog){ // ... } func (q *waitq) dequeue(sgp *sudog){ // ... } ``` `sudog` 是对被阻塞的 goroutine 的封装,简单看一下 channel 会使用到的一些字段 ```go // runtime/runtime2.go type sudog struct { g *g //阻塞的 goroutine elem unsafe.Pointer c *hchan // 阻塞的 channel ``` `elem` 字段是一个指针,在 channel 会被用来指向待发送者要发送的数据或者待接收者的接收位置 ```go // 从 ch 接收数据被阻塞,那么 sudog.elem 会指向 x x <- ch // 向 ch 发送数据被阻塞,那么 sudog.elem 会指向 y ch <- y ``` ### makechan 创建 channel channel 分为无缓冲 channel 和 缓冲 channel,虽然两种 channel 的创建方式不同,但是都是调用 `makechan` ```go ch := make(chan int) // 无缓冲 channel ch := make(chan int, 10)// 有缓冲 channel ``` `makechan` 函数会接受元素的类型和缓冲的大小,如果 `size` 为 0,就是无缓冲 channel 了 ```go // src/runtime/chan.go func makechan(t *chantype, size int) *hchan{ elem := t.elem // 检查 elem size,align // 计算出缓冲区的大小,如果是非缓冲 channel 或者元素为 struct{},那么 mem 就是 0 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0{ panic(plainError("makechan: size out of range")) } var c *hchan switch{ // 非缓冲 channel 或者 缓冲区元素 为 struct{} case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) // 如果是非缓冲,则buf并没有用 // 如果缓冲元素类型为 struct{}, 则只会用到 sendx 和 recvx, 并不会真正拷贝数据到缓冲区 c.buf = unsafe.Pointer(&c.buf) // channel 中元素不包含指针 case elem.ptrdata == 0: // 将 hchan 结构和缓冲区的内存一起分配 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // buf 指向 hchan 后边的地址 c.buf = add(unsafe.Pointer(c), hchanSize) // 默认,分别分配 chan 和 buf 的内存 default: c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 设置 hchan 的其他字段 c.elemsize = uint16(elem.size) c.elemtype = elem // 底层循环队列长度 c.datasiz = uint(size) return c ``` 通过 `makechan` 函数,可以总结出 hchan 结构的特点 * **无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区(hcha.buf)分配内存** * **缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 是一块连续的内存** ### make 与 makechan `make` 函数在编译阶段又是如何转换成 `makechan` 函数调用的呢 **首先编译器会将 `make` 的调用转换成 `OMAKE` 类型的节点,然后判断 `make` 的对象类型,如果是 `TCHAN` 的话,将节点类型置为 `OMAKECHAN`,并且检查 `make` 的第二个参数,也就是缓冲区大小** ```go // src/cmd/compile/internal/gc/typecheck.go func typecheck1(n *Node, top int) (res *Node) { // ... switch n.Op{ case OMAKE: switch t.Etype { case TCHAN: l = nil if i < len(args){ // ... 对缓冲区大小进行检测 n.Left = l // 带缓冲区,赋值缓冲区大小 }else{ n.Left = nodintconst(0) // 不带缓冲区 } n.Op = OMAKECHAN } } } ``` **然后OMAKECHAN 节点会在 walkexpr 函数中转换成调用 makechan 或者 makechan64 函数** ```go // src/cmd/compile/internal/gc/walk.go func walkexpr(n *Node, init *Nodes) *Node { switch n.Op { case OMAKECHAN: size := n.Left fnname := "makechan64" argtype := types.Types[TINT64] if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 { fnname = "makechan" argtype = types.Types[TINT] } n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype)) } } ``` ## 发送数据 向 channel 发送数据的语句会在编译期间转换成 chansend 函数 ```go ch := make(chan int) ch <- 10 ``` 发送语句非常简单,但是真正的函数执行会区分很多的情况,做一些小的优化,可以称为特性 ### 发送操作的特性 * **向 nil channel 发送数据会被永久阻塞,并且不会被 select 语句选中** * **如果 channel 未关闭,非缓冲并且没有待接收的 goroutine,或者缓冲区已满,那么不会被 select 语句选中** * **向关闭的 channel 发送数据,会 panic ,并且可以被 select 语句选中,意味着 select 语句中可能会 panic** * **如果有待接收者,那么会将发送的数据直接 copy 到待接收者的接收位置,然后唤醒接收者** * **如果有缓冲区,并且缓冲区未满,那么就把发送的数据 copy 到缓冲区中** * **如果 channel 未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前 goroutine, 等待被唤醒** * **发送者被阻塞后,可以被关闭 channel 操作或者被接收操作唤醒,关闭 channel 导致发送者被唤醒后,会panic** * **当 channel 中有待接收 goroutine,那么 channel 的状态必然是 非缓冲或者缓冲区为空** ##### 发送数据,可以被 select 选中的情况 * **channel 已关闭** * **channel 未关闭,channel有待接收的 goroutine,或者缓冲区不为空并且缓冲区未满** ### 深入源码 `ch <- i` 发送语句实际会被转换为 `chansend1` ```go func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } ``` chansend1 会直接调用 chansend 来发送数据,并且 `block` 为 true,说明 `ch <- i` 语句可以被阻塞 ```go // src/runtime/chan.go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool ``` `c` 表示操作的 channel `ep` 是一个指针,指向发送的数据 `ch <- i` `block` 表示是否是阻塞调用,在 select case 语句中才会设置为 false `callerpc` 暂时不需要关心 返回值是个 `bool` 类型,表示是否发送成功,未发送成功的操作也不会被 `select` 语句选中 **首先看一下 channel 为 nil 的情况,这时并不需要加锁** ```go if c == nil{ if !block { // block 为 false, 则直接返回 false, 表示发送失败 return false } // 对于 nil channel,直接挂起当前 goroutine,并永久阻塞 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 不会执行到这一步 throw("unreadable") } ``` 如果是非阻塞调用,也就是 select case 语句中调用,那么直接返回 false,意味着向 nil channel 发送数据不会被选中 阻塞调用就被 gopark 挂起,永久阻塞 **** **在 channel 加锁之前,对于非阻塞并且未关闭的情况会有一步快速检测的判断,可以快速返回** ```go // 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回 // 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || ((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) { // 返回 false,表示未发送成功 return false } ``` 缓冲区没有空间,并且待接收的 goroutine 时,可以直接返回未发送成功 **** **加锁,判断 channel 是否关闭,如果已关闭,直接 panic** ```go // 加锁 lock(&c.lock) // 如果 channel 已关闭,则 panic if c.closed != 0{ unlock(&c.lock) panic(plainError("send on closed channel")) } ``` **** **channel 待接收队列中有等待的 goroutine** ```go lock(&c.lock) // ... // 从待接收队列中获取等待的 goroutine if sg := c.recvq.dequeue(); seq != nil { // 只要可以从待接收队列中获取到 goroutine,那么发送操作都是只需要 copy 一次 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ``` 如果待接收队列中有等待的接收者的话,说明 channel 的缓冲区为空 调用 send 函数,无论是否是`无缓冲 channel`,都直接复制给待接收者 ```go func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // sg.elem 是指向待接收 goroutine 中接收数据的指针 s <- ch // 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem if sg.elem != nil{ sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() // unlock(&c.lock) // 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的 gp.param = unsafe.Pointer(sg) goready(gp, skip+1) // 唤醒待接收者 } ``` 会判断一下接收者是否需要接收数据,也就是 `sudog.elem` 是否为 nil 如果不为 nil,就调用 sendDirect 把发送的数据(ep 指向的数据) 复制到 `sudog.elem` ```go func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src 是发送的数据源地址,dst 是接收数据的地址 // src 在当前的 goroutine 栈中,而 dst 在其他栈上 dst := sg.elem // 使用 memove 直接进行内存 copy // 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置 // 所以会加读写前加一个屏障 typebitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memove(dst, src, t.size) ``` *sendDirect* 在进行跨 goroutine 内存 copy 时,调用 *typebitsBulkBarrier* 来加上了写屏障 因为 GC 会假设对栈的写操作只会发生在 goroutine 正在运行时,并且是由当前 goroutine 写的, 而 *sendDirect* 跨 goroutine 的栈读写会违背这个假设,为了避免出现问题,需要加上写屏障 **** **缓冲区未满,直接将数据发送到缓冲区中** ```go lock(&c.lock) // ... if c.qcount < c.dataqsiz { // 获取缓冲发送数据的指针 // add(c.buf, uintptr(i)*uintptr(c.elemsize)) qp := chanbuf(c, c.sendx) // copy 数据,ep, gp 都是指针,分别指向数据源和数据目的地 typedmemove(c.elemtype, qp, ep) // 递增存放发送数据的索引 c.sendx++ if c.sendx == c.dataqsiz{ // 缓冲区是一个循环数组,调整索引 c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } ``` chanbuf 函数通过 `hchan.sendx` 获取到缓冲区存放发送的数据的地址,然后调整循环数组的`sendx` 索引 **** **channel 未关闭,对于非缓冲 channel,待接收队列为空,对于缓冲 channel,缓冲区已满** 逻辑依次执行到这里: ```go lock(&c.lock) // ... // 如果非阻塞发送,那么可以直接解锁返回,未发送成功 if !block{ unlock(&c.lock) return false } // 阻塞发送,那么就挂起当前 goroutine gp := getg() // 生成配置 sudo,省略部分赋值操作 mysg := acquireSudog() mysg.elem = ep // 将指向发送数据的指针保存到 elem 中 mysg.g = gp mysg.c = c // 当前阻塞的 channel gp.wait = mysg // param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断 pg.param = nil // 入队待发送队列 c.sendq.enqueue(mysg) // 挂起goroutine,等待唤醒 // chanparkcommit 函数会解锁 ch.lock gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ``` 非阻塞的话,会直接返回为发送成功 阻塞调用,则会构建 sudog 对象,然后添加到待发送队列,解锁,挂起当前 goroutine 会被唤醒的情况有两种 * 关闭 channel * 发生接收操作,接收者可能会唤醒该发送者 ```go // 被唤醒,执行检查清理操作 // ... // param 字段为 nil 表示是由于 close channel 导致的关闭,panic // close channel 和接收操作都可能唤醒等待发送的 goroutine, 但是他们设置 param 不一样 if gp.param == nil { if c.closed = 0 { throw("chansend: suprious wakeup") } panic(plainError("send on closed channel")) } // 清理,释放 sudog pg.param == nil mysq.c = nil releaseSudog(mysg) // 发送成功 return true } ``` 被唤醒后会判断 `g.param` 是否为 nil,因为关闭 channel 时会将待发送 goroutine 的 `param` 字段置为 nil,会根据这个字段决定是否 panic ### select & 发送操作 golang 会对 select 语句进行一些优化 #### 单个发送 case ```go select { case ch <- i: // ... } // 会被优化为 if ch == nil { block() } ch <- i ``` 会在编译期间转换为阻塞发送语句 #### 非阻塞操作,发送 + default ```go select { case ch <- i: // ... default: // ... } // =====> if selectnbsend(ch, i) { // ... } else { // ... } ``` 非阻塞操作实际调用 `selectnbsend`,根据函数返回值决定是否执行 `default` 逻辑 ```go func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { // block 参数为 false,非阻塞调用 return chansend(c,elem, false, getcallerpc()) } ``` 返回 false 表示未发送成功,`select` 便会执行 `default` ### 思考:为什么向关闭的 channel 发送数据需要 panic ## 接收数据 如何从 channel 中接收数据 ```go // 接收单个值,如果 channel 被关闭后,会返回 channel 中元素的零值 i <- ch // 调用 `chanrecv1` 函数 // 如果 channel 被关闭并且缓冲区为空,那么 ok 的值就是 false i, ok <- ch // 调用 `chanrecv2` 函数 ``` `i` 是接收操作的`接收值`,`ok` 表示是否从 channel 中接收到有效的数据,**即使 channel 已经关闭,但是缓冲区中依然存在数据,那么 `ok` 也会是 true** ### 接收操作的特性 * **从 nil channel 中接收数据会永久阻塞,而且不会被select 语句选中** * **如果 channel 未关闭,没有待发送者或者缓冲 channel 的缓冲区为空的话,不会被 select 语句选中** * **从已关闭并且缓冲区为空的 channel 中接收数据的话,会把`接收值`置为空值,而且可以被 `select` 语句选中** * **如果待发送队列不为空,说明无缓冲或者缓冲已满,对于无缓冲直接从待发送者复制数据到`接收值`,如果缓冲区已满,那么先将缓冲区中数据复制给接收者,然后将待发送者的数据复制到缓冲区中并唤醒发送者** * **只要缓冲区不为空,即使channel已关闭,依然可以从缓冲区中获取到数据** * **如果缓冲为空并且没有待发送者,不会被 select 语句选中,如果是阻塞接收操作的话,会被阻塞直到 channel 被关闭或者被发送者唤醒** * **接收者被关闭操作唤醒,那么`接收值`会被置为空值** ##### 接收操作被 select 语句选中的情况 * channel 已关闭 * 缓冲区中有数据 * 待发送队列不为空 ### 深入源码 **单值的接收语句实际调用 `chanrecv1`** ```go // src/runtime/chan.go i <- ch // ===> func chanrecv1(c *hchan, elem unsafe.Pointer){ chanrecv(c, elem, true) } ``` **接收两个值实际调用 `chanrecv2`** ```go i, ok <- ch // ===> func chanrecv2(c *hchan, elem unsafe.Pointer)(received bool) { _, received = chanrecv(c, elem, true) } ``` `chanrecv1` 和 `chanrecv2` 实际都是调用 `chanrecv` ,他们两个之间的区别就是是否返回接收到有效数据 *** ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) ``` `c` 表示接收操作的 channel `ep` 是一个指针,指向`接收值`,`i <- ch`语句 `ep` 就是 `接收值 i` 的地址 `block` 是否是阻塞操作,`chanrecv1` 和 `chanrecv2` 函数中`block`为 true,说明是阻塞操作 返回值 `selected` 表示是否可以被 `select` 语句选中 返回值 `received` 表示是否可以接收到有效数据 **** **channel 在加锁前会判断一下是否为 nil ** ```go if c == nil { // 非阻塞下会直接返回 if !block { return } // 永久挂起 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } ``` 阻塞接收会被永久阻塞,非阻塞的话就直接返回,而且不会被 select 选中 **** **阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回** ```go // 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回 // 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空 if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } ``` * 非缓冲 channel 如果没有待发送者 * 缓冲 channel 但是缓冲区为空 **** **加锁,首先判断 channel 是否已关闭,缓冲区中是否还有数据** ```go lock(&c.lock) // channel 处于关闭,并且缓冲区已空 if c.closed != 0 && c.qcount == 0{ unlock(&c.lock) if ep != nil{ // 如果接收的值需要赋值到变量 x <- ch // 将接收的值置为空值 typedmemclr(c.elemtype, ep) } // 可被 select 语句选中,但是未接收到有效数据 return true, false } ``` channel 已经关闭,而且缓冲区没有数据,如果 `ep` 不为nil ,也就是说存在`接收值`,那么就把接收值置为空值 > ep 为空的情况是 `<- chan` 接收操作没有`接收值` `selected` 返回 true,表示可以被 `select` 语句选中 **** **待发送队列不为空,存在待发送者** ```go lock(&c.lock) // ... // 待发送队列中有 goroutine,说明是非缓冲 channel 或者 缓冲已满的 channel if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func(){ unlock(&c.lock) }, 3) return true, true // 可被选中,并且接收成功 } ``` 如果待发送队列中有等待发送的 goroutine,说明 channel 是非缓冲channel,或者缓冲区已经满了 * 非缓冲channel,会将数据从待发送者复制给接收者 * 缓冲区已满的话,会先从缓冲区中接收数据,然后将待发送者的数据发送到缓冲区中 > 这里和发送操作时,channel 的待接收队列不为空的情况不一样,因为待接收队列不为空,说明缓冲区肯定是没有数据的,可以跳过缓冲区,直接将数据发送到等待接收的 goroutine 因为要区分 channel 的类型所以 `recv` 函数的逻辑就会有一点复杂 **对于非缓冲 channel,如果有`接收值`,直接调用 `recvDirect` 从待发送者复制值** ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 无缓冲 channel if c.dataqsiz == 0 { // 如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置 if ep != nil{ recvDirect(c.elemtype, sg, ep) } } ``` 对于缓冲区有数据的情况 * 先从缓冲区复制数据到`接收值`,也就是 ep 指向的地址 * 然后将待发送者要发送的数据复制到缓冲区中 * 调整缓冲区循环数据的接收索引 `recvx` ```go } else { // 获取缓冲区中待接收的地址 gp := chanbuf(c, c.recvx) if ep != nil { // 将待接收数据复制到接收位置 typedmemmove(c.elemtype, ep, qp) } // 将待发送者发送的数据复制到相应缓冲区的位置 typedmemmove(c.elemtype, qp,sq.elem) // 调整 recvx c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 由于缓冲区已满,sendx 和 recvx 必然相等 c.sendx = c.recvx } ``` **无论是缓冲还是非缓冲 channel,`recv` 函数最后都会唤醒发送者** ```go // 赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的 sg.elem = nil gp := sg,g unlockf() gp.param = unsafe.Pointer(sg) goready(gp, skip+1) } ``` 接收操作会赋值发送者 goroutine 的 `param` 字段,发送者被唤醒后,会根据 param 参数来判断是有接收操作唤醒还是被关闭 channel 操作唤醒 *** **缓冲区中有数据,无论 channel 被关闭,都会发送给接收者** ```go lock(&c.lock) // ... // 如果缓冲区不为空,依然有未发送的数据 // 需要注意,这时 channel 可能已经处于关闭状态了,但是依然可以从关闭的缓冲区中接收到数据 if c.qcount > 0{ // 获取指向缓冲区中待接收数据的指针 gp ;= chanbuf(c, c.recvx) if ep != nil{ // 如果接收操作有接收值,那么直接 copy 到 ep typedmemmove(c.elemtype, ep, gp) } // 清理缓冲区中已接收到的数据内存 typedememclr(c.elemtype, gp) // 调整待接收索引 c.recv++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) // 可以被选中,并且接收成功 return true, true } ``` 这一部分的逻辑就比较简单 * 获取缓冲区的待接收数据的地址 `gp`,如果有`接收者`,便将数据复制给`接收者` * 调整缓冲区循环数据的待接收索引`recvx` *** **channel 未关闭, 缓冲区没有元素,并且没有待接收者** 非阻塞操作,可以直接解锁返回,并且不会被 `select` 语句选中 ```go lock(&c.lock) // ... // 缓冲区没有元素并且没有待发送者 if !block { unblock(&c.block) // 不会被选中,并且没有接收到有效数据 return false, false } ``` 阻塞操作,挂起当前 goroutine,等待被发送操作或者关闭操作唤醒 ```go lock(&c.lock) // ... gp = getg() mysg := acquireSudog() mysg.elem = ep mysg.g = gp mysg.c = c gp.param = nil // 入队到待发送者队列中 c.recvq.enqueue(mysg) // 挂起 goroutine,等待由关闭操作或者发送操作唤醒 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // 被唤醒,做一些检测,和清理操作 // 根据 param 判断是否是由关闭唤醒的 // 有 closed 唤醒时,param 会被置为 nil closed := gp.param == nil pg.param = nil mysg.c = nil releaseSudog(mysg) // 可以被选中,但是 closed 反应是否接受到有效数据 return true, !closed } ``` **被唤醒后会根据 `param` 字段,判断是否是由关闭操作唤醒** **** #### select 与 接收操作 ##### 单个接收 case ```go select { case i <- ch: } // ====> if ch == nil{ block() } i <- ch ``` ##### 非阻塞接收 ```go select { case v <- ch: // case v, received <- ch: // ... default: // ... } // ===> // if ch != nil && selectnbrecv2(&v, &ok, ch) { if selectnbrecv(&v, ch) { // ... } else { // ... } ``` 非阻塞接收会调用 `selectnbrecv` 和 `selectnbrecv2` ```go func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { elected, *received = chanrecv(c, elem, false) return } ``` ## 关闭 channel 关闭 channel 直接调用 `close` 函数即可,但是贸然关闭 channel 会引发很多的问题 ```go ch := make(chan int) // 关闭 goroutine close(ch) ``` ### 关闭操作的特性 * **关闭 nil channel 会 panic** * **关闭已关闭的 channel 会 panic** * **关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者** 关于如何优雅的关闭 channel,可以看一下 [go101](https://gfw.go101.org/article/101.html) 中 [如何优雅地关闭通道](https://gfw.go101.org/article/channel-closing.html) ### 深入源码 **关闭 nil channel 会panic** ```go func closechan(c *hchan) { // 关闭 nil channel 会 panic if c == nil{ panic(plainError("close of nil channel")) } ``` *** **重复关闭 channel,也会 panic** ```go // 加锁 lock(&c.lock) if c.closed != 0 { // 重复关闭会 panic unlock(&c.lock) panic(plainError("close of closed channel")) } ``` **需要注意关闭操作中,判断 channel 是否关闭前会加锁** *** **处理待接收者,如果有`接收者`,那么就置为空值** ```go c.closed = 1 var glist gList // 处理待接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { // 将待接收位置置为空值 typedmemclr(c.elemtype, sg.elem) sg.elem = nil // 清理 elem 指针 } gp := sg.g // param 置为 nil,接收者被唤醒后会返回未接收到有效数据 gp.param = nil glist.push(gp) } ``` *** **处理待发送者** ```go // 处理待发送的队列 for { sg := c.sendq.dequeue() if sg == nil { // 没有待发送的goroutine了 break } sg.elem = nil gp := sg.g // 将 param 置为 nil, 待发送者被唤醒后,会 panic gp.param = nil glist.push(gp) } ``` *** **解锁,唤醒所有待发送者和待接收者** ```go unlock(&c.lock) // 唤醒所有阻塞的 goroutine for !glist.empty(){ gp := glist.pop() gpready(gp, 3) } } ``` ### 关闭操作唤醒 channel 中阻塞的 goroutine 在处理待发送者和待接收者时,都会将 goroutine 的 `param` 字段置为 nil,然后当被唤醒后待发送者和待接收者就能区分如何被唤醒的 #### 发送操作 ```go // runtime/chan.go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ... // 阻塞,挂起 goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) if gp.param == nil { if c.closed = 0 { throw("chansend: suprious wakeup") } panic(plainError("send on closed channel")) } // ... ``` 可以看到发送操作被唤醒后会判断 `param` 字段 **如果是由于 channel 关闭导致被唤醒,那么直接 panic** * **关闭操作唤醒,goroutine param 字段为 nil** ```go func closechan(c *hchan) { // ... for { sg := c.recvq.dequeue() // ... pg := sg.pg gp.param = nil // ... } // ... 唤醒 goroutine } ``` * **接收操作唤醒,goroutine param 不为 nil** ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ... 数据复制 pg := sg.g pg.param = unsafe.Pointer(sg) goready(gp, skip+1) } ``` #### 接收操作 ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ... // 阻塞,挂起当前 goroutine goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // 被唤醒 // ... closed := gp.parma == nil // ... return true, !closed ``` **接收操作在关闭后并不会 panic,而是会作为 received 返回,表示是否接收到有效的数据** # 参考资料 **[深度解密Go语言之channel](https://qcrao.com/2019/07/22/dive-into-go-channel/)** **[Go 语言设计与实现 —— Channel](https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/)** ### 推荐阅读 **[Go101 通道](https://gfw.go101.org/article/channel.html)** **[如何优雅地关闭通道](https://gfw.go101.org/article/channel-closing.html)** **[浅谈 Go 语言 select 的实现原理](https://juejin.im/entry/5ca15911518825550b35bf98)** **[图解Go的channel底层原理](https://mp.weixin.qq.com/s/40uxAPdubIk0lU321LmfRg)** **[走进Golang之Channel的使用](https://mp.weixin.qq.com/s/7Hoa5fX8U8xSNXYOB3g4jg)** **[走进Golang之Channel的数据结构](https://mp.weixin.qq.com/s/X-245bTIHQGG9GqfyEIHXw)**

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

402 次点击  ∙  2 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传