> Don't communicate by sharing memory, share memory by communicating.
**不要通过共享内存来通信,而要通过通信来实现内存共享。**
## 数据结构
我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列,那么如何实现这种队列
**channel 的底层数据结构是一个 `*hchan`,在编译时期会将 make(chan int) 语句转换成 makechan 函数调用**
### hchan
```go
// runtime/chan.go
type hchan struct {
lock mutex // lock 用来保护 hchan 上所有的字段
// 缓冲区实际是一个循环队列
buf unsafe.Pointer // 指向缓冲区的指针
dataqsiz uint // 缓冲区循环队列的大小
sendx uint // 缓冲区循环队列接收下一个元素的索引
recvx uint // 缓冲区循环队列中下一个会返回的元素的索引
qcount uint // 当前 hchan 缓存的元素数量
closed uint32 // hchan 是否关闭
elemsize uint16 // hchan 的元素大小
elemtype *_type // hchan 的元素类型
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
}
```
可以看出 channel 的底层数据结构
* 缓冲区 `buf` 底层是一个循环队列,`dataqsiz` 和 `qcount` 分别记录了缓冲区的大小和当前缓冲的元素数量,`sendx`,`recvx` 用来记录位置索引
* `elemsize` 和 `elemtype` 表示元素大小和类型
* `recvq` 和 `sendq` 来记录被发送接收阻塞的 goroutine 队列
* `closed` 用来记录是否关闭
* `lock` 用来保护hchan中的字段,更新其他字段的时候都需要加锁
对于无缓冲 channel 是不需要和缓冲区相关的字段的
**channel 在实现中依然使用到了锁,Go 所说的 *使用通信来实现共享内存*,实际上依然在底层使用锁来保证读写的原子性,实现出了一个面向数据流式的数据结构**
### 待发送者和待接收者
注意到 `recvq` 和 `sendq` 类型 `waitq` 是一个双向链表,提供了等待 goroutine 的出队入队
```go
// runtime/chan.go
type waitq struct {
first *sudog
last * sudog
}
func(q *waitq) enqueue(sgp *sudog){
// ...
}
func (q *waitq) dequeue(sgp *sudog){
// ...
}
```
`sudog` 是对被阻塞的 goroutine 的封装,简单看一下 channel 会使用到的一些字段
```go
// runtime/runtime2.go
type sudog struct {
g *g //阻塞的 goroutine
elem unsafe.Pointer
c *hchan // 阻塞的 channel
```
`elem` 字段是一个指针,在 channel 会被用来指向待发送者要发送的数据或者待接收者的接收位置
```go
// 从 ch 接收数据被阻塞,那么 sudog.elem 会指向 x
x <- ch
// 向 ch 发送数据被阻塞,那么 sudog.elem 会指向 y
ch <- y
```
### makechan 创建 channel
channel 分为无缓冲 channel 和 缓冲 channel,虽然两种 channel 的创建方式不同,但是都是调用 `makechan`
```go
ch := make(chan int) // 无缓冲 channel
ch := make(chan int, 10)// 有缓冲 channel
```
`makechan` 函数会接受元素的类型和缓冲的大小,如果 `size` 为 0,就是无缓冲 channel 了
```go
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan{
elem := t.elem
// 检查 elem size,align
// 计算出缓冲区的大小,如果是非缓冲 channel 或者元素为 struct{},那么 mem 就是 0
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0{
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch{
// 非缓冲 channel 或者 缓冲区元素 为 struct{}
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 如果是非缓冲,则buf并没有用
// 如果缓冲元素类型为 struct{}, 则只会用到 sendx 和 recvx, 并不会真正拷贝数据到缓冲区
c.buf = unsafe.Pointer(&c.buf)
// channel 中元素不包含指针
case elem.ptrdata == 0:
// 将 hchan 结构和缓冲区的内存一起分配
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// buf 指向 hchan 后边的地址
c.buf = add(unsafe.Pointer(c), hchanSize)
// 默认,分别分配 chan 和 buf 的内存
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 设置 hchan 的其他字段
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 底层循环队列长度
c.datasiz = uint(size)
return c
```
通过 `makechan` 函数,可以总结出 hchan 结构的特点
* **无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区(hcha.buf)分配内存**
* **缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 是一块连续的内存**
### make 与 makechan
`make` 函数在编译阶段又是如何转换成 `makechan` 函数调用的呢
**首先编译器会将 `make` 的调用转换成 `OMAKE` 类型的节点,然后判断 `make` 的对象类型,如果是 `TCHAN` 的话,将节点类型置为 `OMAKECHAN`,并且检查 `make` 的第二个参数,也就是缓冲区大小**
```go
// src/cmd/compile/internal/gc/typecheck.go
func typecheck1(n *Node, top int) (res *Node) {
// ...
switch n.Op{
case OMAKE:
switch t.Etype {
case TCHAN:
l = nil
if i < len(args){
// ... 对缓冲区大小进行检测
n.Left = l // 带缓冲区,赋值缓冲区大小
}else{
n.Left = nodintconst(0) // 不带缓冲区
}
n.Op = OMAKECHAN
}
}
}
```
**然后OMAKECHAN 节点会在 walkexpr 函数中转换成调用 makechan 或者 makechan64 函数**
```go
// src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
```
## 发送数据
向 channel 发送数据的语句会在编译期间转换成 chansend 函数
```go
ch := make(chan int)
ch <- 10
```
发送语句非常简单,但是真正的函数执行会区分很多的情况,做一些小的优化,可以称为特性
### 发送操作的特性
* **向 nil channel 发送数据会被永久阻塞,并且不会被 select 语句选中**
* **如果 channel 未关闭,非缓冲并且没有待接收的 goroutine,或者缓冲区已满,那么不会被 select 语句选中**
* **向关闭的 channel 发送数据,会 panic ,并且可以被 select 语句选中,意味着 select 语句中可能会 panic**
* **如果有待接收者,那么会将发送的数据直接 copy 到待接收者的接收位置,然后唤醒接收者**
* **如果有缓冲区,并且缓冲区未满,那么就把发送的数据 copy 到缓冲区中**
* **如果 channel 未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前 goroutine, 等待被唤醒**
* **发送者被阻塞后,可以被关闭 channel 操作或者被接收操作唤醒,关闭 channel 导致发送者被唤醒后,会panic**
* **当 channel 中有待接收 goroutine,那么 channel 的状态必然是 非缓冲或者缓冲区为空**
##### 发送数据,可以被 select 选中的情况
* **channel 已关闭**
* **channel 未关闭,channel有待接收的 goroutine,或者缓冲区不为空并且缓冲区未满**
### 深入源码
`ch <- i` 发送语句实际会被转换为 `chansend1`
```go
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
```
chansend1 会直接调用 chansend 来发送数据,并且 `block` 为 true,说明 `ch <- i` 语句可以被阻塞
```go
// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
```
`c` 表示操作的 channel
`ep` 是一个指针,指向发送的数据 `ch <- i`
`block` 表示是否是阻塞调用,在 select case 语句中才会设置为 false
`callerpc` 暂时不需要关心
返回值是个 `bool` 类型,表示是否发送成功,未发送成功的操作也不会被 `select` 语句选中
**首先看一下 channel 为 nil 的情况,这时并不需要加锁**
```go
if c == nil{
if !block {
// block 为 false, 则直接返回 false, 表示发送失败
return false
}
// 对于 nil channel,直接挂起当前 goroutine,并永久阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
// 不会执行到这一步
throw("unreadable")
}
```
如果是非阻塞调用,也就是 select case 语句中调用,那么直接返回 false,意味着向 nil channel 发送数据不会被选中
阻塞调用就被 gopark 挂起,永久阻塞
****
**在 channel 加锁之前,对于非阻塞并且未关闭的情况会有一步快速检测的判断,可以快速返回**
```go
// 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回
// 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满
if !block && c.closed == 0 &&
((c.dataqsiz == 0 && c.recvq.first == nil) ||
((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) {
// 返回 false,表示未发送成功
return false
}
```
缓冲区没有空间,并且待接收的 goroutine 时,可以直接返回未发送成功
****
**加锁,判断 channel 是否关闭,如果已关闭,直接 panic**
```go
// 加锁
lock(&c.lock)
// 如果 channel 已关闭,则 panic
if c.closed != 0{
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
```
****
**channel 待接收队列中有等待的 goroutine**
```go
lock(&c.lock)
// ...
// 从待接收队列中获取等待的 goroutine
if sg := c.recvq.dequeue(); seq != nil {
// 只要可以从待接收队列中获取到 goroutine,那么发送操作都是只需要 copy 一次
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
```
如果待接收队列中有等待的接收者的话,说明 channel 的缓冲区为空
调用 send 函数,无论是否是`无缓冲 channel`,都直接复制给待接收者
```go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// sg.elem 是指向待接收 goroutine 中接收数据的指针 s <- ch
// 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem
if sg.elem != nil{
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf() // unlock(&c.lock)
// 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1) // 唤醒待接收者
}
```
会判断一下接收者是否需要接收数据,也就是 `sudog.elem` 是否为 nil
如果不为 nil,就调用 sendDirect 把发送的数据(ep 指向的数据) 复制到 `sudog.elem`
```go
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 是发送的数据源地址,dst 是接收数据的地址
// src 在当前的 goroutine 栈中,而 dst 在其他栈上
dst := sg.elem
// 使用 memove 直接进行内存 copy
// 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置
// 所以会加读写前加一个屏障
typebitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memove(dst, src, t.size)
```
*sendDirect* 在进行跨 goroutine 内存 copy 时,调用 *typebitsBulkBarrier* 来加上了写屏障
因为 GC 会假设对栈的写操作只会发生在 goroutine 正在运行时,并且是由当前 goroutine 写的,
而 *sendDirect* 跨 goroutine 的栈读写会违背这个假设,为了避免出现问题,需要加上写屏障
****
**缓冲区未满,直接将数据发送到缓冲区中**
```go
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// 获取缓冲发送数据的指针
// add(c.buf, uintptr(i)*uintptr(c.elemsize))
qp := chanbuf(c, c.sendx)
// copy 数据,ep, gp 都是指针,分别指向数据源和数据目的地
typedmemove(c.elemtype, qp, ep)
// 递增存放发送数据的索引
c.sendx++
if c.sendx == c.dataqsiz{
// 缓冲区是一个循环数组,调整索引
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
```
chanbuf 函数通过 `hchan.sendx` 获取到缓冲区存放发送的数据的地址,然后调整循环数组的`sendx` 索引
****
**channel 未关闭,对于非缓冲 channel,待接收队列为空,对于缓冲 channel,缓冲区已满**
逻辑依次执行到这里:
```go
lock(&c.lock)
// ...
// 如果非阻塞发送,那么可以直接解锁返回,未发送成功
if !block{
unlock(&c.lock)
return false
}
// 阻塞发送,那么就挂起当前 goroutine
gp := getg()
// 生成配置 sudo,省略部分赋值操作
mysg := acquireSudog()
mysg.elem = ep // 将指向发送数据的指针保存到 elem 中
mysg.g = gp
mysg.c = c // 当前阻塞的 channel
gp.wait = mysg
// param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断
pg.param = nil
// 入队待发送队列
c.sendq.enqueue(mysg)
// 挂起goroutine,等待唤醒
// chanparkcommit 函数会解锁 ch.lock
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
```
非阻塞的话,会直接返回为发送成功
阻塞调用,则会构建 sudog 对象,然后添加到待发送队列,解锁,挂起当前 goroutine
会被唤醒的情况有两种
* 关闭 channel
* 发生接收操作,接收者可能会唤醒该发送者
```go
// 被唤醒,执行检查清理操作
// ...
// param 字段为 nil 表示是由于 close channel 导致的关闭,panic
// close channel 和接收操作都可能唤醒等待发送的 goroutine, 但是他们设置 param 不一样
if gp.param == nil {
if c.closed = 0 {
throw("chansend: suprious wakeup")
}
panic(plainError("send on closed channel"))
}
// 清理,释放 sudog
pg.param == nil
mysq.c = nil
releaseSudog(mysg)
// 发送成功
return true
}
```
被唤醒后会判断 `g.param` 是否为 nil,因为关闭 channel 时会将待发送 goroutine 的 `param` 字段置为 nil,会根据这个字段决定是否 panic
### select & 发送操作
golang 会对 select 语句进行一些优化
#### 单个发送 case
```go
select {
case ch <- i:
// ...
}
// 会被优化为
if ch == nil {
block()
}
ch <- i
```
会在编译期间转换为阻塞发送语句
#### 非阻塞操作,发送 + default
```go
select {
case ch <- i:
// ...
default:
// ...
}
// =====>
if selectnbsend(ch, i) {
// ...
} else {
// ...
}
```
非阻塞操作实际调用 `selectnbsend`,根据函数返回值决定是否执行 `default` 逻辑
```go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// block 参数为 false,非阻塞调用
return chansend(c,elem, false, getcallerpc())
}
```
返回 false 表示未发送成功,`select` 便会执行 `default`
### 思考:为什么向关闭的 channel 发送数据需要 panic
## 接收数据
如何从 channel 中接收数据
```go
// 接收单个值,如果 channel 被关闭后,会返回 channel 中元素的零值
i <- ch // 调用 `chanrecv1` 函数
// 如果 channel 被关闭并且缓冲区为空,那么 ok 的值就是 false
i, ok <- ch // 调用 `chanrecv2` 函数
```
`i` 是接收操作的`接收值`,`ok` 表示是否从 channel 中接收到有效的数据,**即使 channel 已经关闭,但是缓冲区中依然存在数据,那么 `ok` 也会是 true**
### 接收操作的特性
* **从 nil channel 中接收数据会永久阻塞,而且不会被select 语句选中**
* **如果 channel 未关闭,没有待发送者或者缓冲 channel 的缓冲区为空的话,不会被 select 语句选中**
* **从已关闭并且缓冲区为空的 channel 中接收数据的话,会把`接收值`置为空值,而且可以被 `select` 语句选中**
* **如果待发送队列不为空,说明无缓冲或者缓冲已满,对于无缓冲直接从待发送者复制数据到`接收值`,如果缓冲区已满,那么先将缓冲区中数据复制给接收者,然后将待发送者的数据复制到缓冲区中并唤醒发送者**
* **只要缓冲区不为空,即使channel已关闭,依然可以从缓冲区中获取到数据**
* **如果缓冲为空并且没有待发送者,不会被 select 语句选中,如果是阻塞接收操作的话,会被阻塞直到 channel 被关闭或者被发送者唤醒**
* **接收者被关闭操作唤醒,那么`接收值`会被置为空值**
##### 接收操作被 select 语句选中的情况
* channel 已关闭
* 缓冲区中有数据
* 待发送队列不为空
### 深入源码
**单值的接收语句实际调用 `chanrecv1`**
```go
// src/runtime/chan.go
i <- ch
// ===>
func chanrecv1(c *hchan, elem unsafe.Pointer){
chanrecv(c, elem, true)
}
```
**接收两个值实际调用 `chanrecv2`**
```go
i, ok <- ch
// ===>
func chanrecv2(c *hchan, elem unsafe.Pointer)(received bool) {
_, received = chanrecv(c, elem, true)
}
```
`chanrecv1` 和 `chanrecv2` 实际都是调用 `chanrecv` ,他们两个之间的区别就是是否返回接收到有效数据
***
```go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
```
`c` 表示接收操作的 channel
`ep` 是一个指针,指向`接收值`,`i <- ch`语句 `ep` 就是 `接收值 i` 的地址
`block` 是否是阻塞操作,`chanrecv1` 和 `chanrecv2` 函数中`block`为 true,说明是阻塞操作
返回值 `selected` 表示是否可以被 `select` 语句选中
返回值 `received` 表示是否可以接收到有效数据
****
**channel 在加锁前会判断一下是否为 nil **
```go
if c == nil {
// 非阻塞下会直接返回
if !block {
return
}
// 永久挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
```
阻塞接收会被永久阻塞,非阻塞的话就直接返回,而且不会被 select 选中
****
**阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回**
```go
// 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回
// 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
```
* 非缓冲 channel 如果没有待发送者
* 缓冲 channel 但是缓冲区为空
****
**加锁,首先判断 channel 是否已关闭,缓冲区中是否还有数据**
```go
lock(&c.lock)
// channel 处于关闭,并且缓冲区已空
if c.closed != 0 && c.qcount == 0{
unlock(&c.lock)
if ep != nil{
// 如果接收的值需要赋值到变量 x <- ch
// 将接收的值置为空值
typedmemclr(c.elemtype, ep)
}
// 可被 select 语句选中,但是未接收到有效数据
return true, false
}
```
channel 已经关闭,而且缓冲区没有数据,如果 `ep` 不为nil ,也就是说存在`接收值`,那么就把接收值置为空值
> ep 为空的情况是 `<- chan` 接收操作没有`接收值`
`selected` 返回 true,表示可以被 `select` 语句选中
****
**待发送队列不为空,存在待发送者**
```go
lock(&c.lock)
// ...
// 待发送队列中有 goroutine,说明是非缓冲 channel 或者 缓冲已满的 channel
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func(){ unlock(&c.lock) }, 3)
return true, true // 可被选中,并且接收成功
}
```
如果待发送队列中有等待发送的 goroutine,说明 channel 是非缓冲channel,或者缓冲区已经满了
* 非缓冲channel,会将数据从待发送者复制给接收者
* 缓冲区已满的话,会先从缓冲区中接收数据,然后将待发送者的数据发送到缓冲区中
> 这里和发送操作时,channel 的待接收队列不为空的情况不一样,因为待接收队列不为空,说明缓冲区肯定是没有数据的,可以跳过缓冲区,直接将数据发送到等待接收的 goroutine
因为要区分 channel 的类型所以 `recv` 函数的逻辑就会有一点复杂
**对于非缓冲 channel,如果有`接收值`,直接调用 `recvDirect` 从待发送者复制值**
```go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 无缓冲 channel
if c.dataqsiz == 0 {
// 如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置
if ep != nil{
recvDirect(c.elemtype, sg, ep)
}
}
```
对于缓冲区有数据的情况
* 先从缓冲区复制数据到`接收值`,也就是 ep 指向的地址
* 然后将待发送者要发送的数据复制到缓冲区中
* 调整缓冲区循环数据的接收索引 `recvx`
```go
} else {
// 获取缓冲区中待接收的地址
gp := chanbuf(c, c.recvx)
if ep != nil {
// 将待接收数据复制到接收位置
typedmemmove(c.elemtype, ep, qp)
}
// 将待发送者发送的数据复制到相应缓冲区的位置
typedmemmove(c.elemtype, qp,sq.elem)
// 调整 recvx
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 由于缓冲区已满,sendx 和 recvx 必然相等
c.sendx = c.recvx
}
```
**无论是缓冲还是非缓冲 channel,`recv` 函数最后都会唤醒发送者**
```go
// 赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的
sg.elem = nil
gp := sg,g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
```
接收操作会赋值发送者 goroutine 的 `param` 字段,发送者被唤醒后,会根据 param 参数来判断是有接收操作唤醒还是被关闭 channel 操作唤醒
***
**缓冲区中有数据,无论 channel 被关闭,都会发送给接收者**
```go
lock(&c.lock)
// ...
// 如果缓冲区不为空,依然有未发送的数据
// 需要注意,这时 channel 可能已经处于关闭状态了,但是依然可以从关闭的缓冲区中接收到数据
if c.qcount > 0{
// 获取指向缓冲区中待接收数据的指针
gp ;= chanbuf(c, c.recvx)
if ep != nil{
// 如果接收操作有接收值,那么直接 copy 到 ep
typedmemmove(c.elemtype, ep, gp)
}
// 清理缓冲区中已接收到的数据内存
typedememclr(c.elemtype, gp)
// 调整待接收索引
c.recv++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
// 可以被选中,并且接收成功
return true, true
}
```
这一部分的逻辑就比较简单
* 获取缓冲区的待接收数据的地址 `gp`,如果有`接收者`,便将数据复制给`接收者`
* 调整缓冲区循环数据的待接收索引`recvx`
***
**channel 未关闭, 缓冲区没有元素,并且没有待接收者**
非阻塞操作,可以直接解锁返回,并且不会被 `select` 语句选中
```go
lock(&c.lock)
// ...
// 缓冲区没有元素并且没有待发送者
if !block {
unblock(&c.block)
// 不会被选中,并且没有接收到有效数据
return false, false
}
```
阻塞操作,挂起当前 goroutine,等待被发送操作或者关闭操作唤醒
```go
lock(&c.lock)
// ...
gp = getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.param = nil
// 入队到待发送者队列中
c.recvq.enqueue(mysg)
// 挂起 goroutine,等待由关闭操作或者发送操作唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 被唤醒,做一些检测,和清理操作
// 根据 param 判断是否是由关闭唤醒的
// 有 closed 唤醒时,param 会被置为 nil
closed := gp.param == nil
pg.param = nil
mysg.c = nil
releaseSudog(mysg)
// 可以被选中,但是 closed 反应是否接受到有效数据
return true, !closed
}
```
**被唤醒后会根据 `param` 字段,判断是否是由关闭操作唤醒**
****
#### select 与 接收操作
##### 单个接收 case
```go
select {
case i <- ch:
}
// ====>
if ch == nil{
block()
}
i <- ch
```
##### 非阻塞接收
```go
select {
case v <- ch: // case v, received <- ch:
// ...
default:
// ...
}
// ===>
// if ch != nil && selectnbrecv2(&v, &ok, ch) {
if selectnbrecv(&v, ch) {
// ...
} else {
// ...
}
```
非阻塞接收会调用 `selectnbrecv` 和 `selectnbrecv2`
```go
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
elected, *received = chanrecv(c, elem, false)
return
}
```
## 关闭 channel
关闭 channel 直接调用 `close` 函数即可,但是贸然关闭 channel 会引发很多的问题
```go
ch := make(chan int)
// 关闭 goroutine
close(ch)
```
### 关闭操作的特性
* **关闭 nil channel 会 panic**
* **关闭已关闭的 channel 会 panic**
* **关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者**
关于如何优雅的关闭 channel,可以看一下 [go101](https://gfw.go101.org/article/101.html) 中 [如何优雅地关闭通道](https://gfw.go101.org/article/channel-closing.html)
### 深入源码
**关闭 nil channel 会panic**
```go
func closechan(c *hchan) {
// 关闭 nil channel 会 panic
if c == nil{
panic(plainError("close of nil channel"))
}
```
***
**重复关闭 channel,也会 panic**
```go
// 加锁
lock(&c.lock)
if c.closed != 0 {
// 重复关闭会 panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
```
**需要注意关闭操作中,判断 channel 是否关闭前会加锁**
***
**处理待接收者,如果有`接收者`,那么就置为空值**
```go
c.closed = 1
var glist gList
// 处理待接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// 将待接收位置置为空值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil // 清理 elem 指针
}
gp := sg.g
// param 置为 nil,接收者被唤醒后会返回未接收到有效数据
gp.param = nil
glist.push(gp)
}
```
***
**处理待发送者**
```go
// 处理待发送的队列
for {
sg := c.sendq.dequeue()
if sg == nil {
// 没有待发送的goroutine了
break
}
sg.elem = nil
gp := sg.g
// 将 param 置为 nil, 待发送者被唤醒后,会 panic
gp.param = nil
glist.push(gp)
}
```
***
**解锁,唤醒所有待发送者和待接收者**
```go
unlock(&c.lock)
// 唤醒所有阻塞的 goroutine
for !glist.empty(){
gp := glist.pop()
gpready(gp, 3)
}
}
```
### 关闭操作唤醒 channel 中阻塞的 goroutine
在处理待发送者和待接收者时,都会将 goroutine 的 `param` 字段置为 nil,然后当被唤醒后待发送者和待接收者就能区分如何被唤醒的
#### 发送操作
```go
// runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// 阻塞,挂起 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
if gp.param == nil {
if c.closed = 0 {
throw("chansend: suprious wakeup")
}
panic(plainError("send on closed channel"))
}
// ...
```
可以看到发送操作被唤醒后会判断 `param` 字段
**如果是由于 channel 关闭导致被唤醒,那么直接 panic**
* **关闭操作唤醒,goroutine param 字段为 nil**
```go
func closechan(c *hchan) {
// ...
for {
sg := c.recvq.dequeue()
// ...
pg := sg.pg
gp.param = nil
// ...
}
// ... 唤醒 goroutine
}
```
* **接收操作唤醒,goroutine param 不为 nil**
```go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ... 数据复制
pg := sg.g
pg.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
```
#### 接收操作
```go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// 阻塞,挂起当前 goroutine
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 被唤醒
// ...
closed := gp.parma == nil
// ...
return true, !closed
```
**接收操作在关闭后并不会 panic,而是会作为 received 返回,表示是否接收到有效的数据**
# 参考资料
**[深度解密Go语言之channel](https://qcrao.com/2019/07/22/dive-into-go-channel/)**
**[Go 语言设计与实现 —— Channel](https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/)**
### 推荐阅读
**[Go101 通道](https://gfw.go101.org/article/channel.html)**
**[如何优雅地关闭通道](https://gfw.go101.org/article/channel-closing.html)**
**[浅谈 Go 语言 select 的实现原理](https://juejin.im/entry/5ca15911518825550b35bf98)**
**[图解Go的channel底层原理](https://mp.weixin.qq.com/s/40uxAPdubIk0lU321LmfRg)**
**[走进Golang之Channel的使用](https://mp.weixin.qq.com/s/7Hoa5fX8U8xSNXYOB3g4jg)**
**[走进Golang之Channel的数据结构](https://mp.weixin.qq.com/s/X-245bTIHQGG9GqfyEIHXw)**
有疑问加站长微信联系(非本文作者))