waitq 为双向链表,sudog 代表一个封装的 goroutine,其参数 g 为 goroutine 实例结,构如下图:
02. 新建 chan:
在 go 中,通过如下代码创建 chan
1
c := make(chanint, 4)
以上代码,对应的是源码:
1
funcmakechan(t *chantype, size int) *hchan
逻辑流程如下:
graph TD
A[makechan] -->|t, size| B{安全检查}
B -->|N| ZR[ERROR]
B -->|Y| E{size 或 t.elem.size 是否为0?}
E -->|Y| F[mallocgc 默认大小 hchanSize 内存]
E -->|N| G{数据类型是否为指针?}
G -->|Y| H[通过new单独分配chan内存]
G -->|N| I[mallocgc 内存 hchanSize + mem]
H --> Z
F --> Z
I --> Z
Z[chan 赋值属性]
Z --> ZB[END]
funcmakechan(t *chantype, size int) *hchan { elem := t.elem // 安全检查,数据项大小不超过 16K if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 获取要分配的内存 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // size 为 0 的情况,分配 hchan 结构体大小的内存,64位系统为 96 Byte. c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // 数据项不为指针类型,调用 mallocgc 一次性分配内存大小,hchan 结构体大小 + 数据总量大小 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 数据项为指针类型,hchan 和 buf 分开分配内存,GC 中指针类型判断 reachable and unreadchable. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // chan 赋值属性, 数据项大小、数据项类型、缓存数据的容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
03.读写chan
在 go 中,写入 chan 的代码如下:
1 2 3
v := 1 c := make(chanint) c <- v
读取 chan 的代码如下:
1 2 3
var v int c := make(chanint) c -> v
c <- v 操作对应的源码为 runtime 中的
1
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool
而 c -> v 操作对应源码为 runtime 中的
1
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool)
其中 c 为 chansend 的 c, v 的地址为 chansend 的 ep.
逻辑流程如下:
graph TD
A[chansend 或 chanrecv] -->|hchan, ep| B{校验}
B --> |Y| C[加锁 lock]
B --> |N| D["gopark(), 阻塞当前 goroutine 和 throw error"]
C --> E{chan close?}
E --> |Y| F[unlock和panic]
E --> |N| G[取出 recvq 或 sendq 队列]
G --> H{是否等待的sudog?}
H --> |Y| I["send()或recv()"]
I --> J["goready(), 运行 sudog 的 goroutine"]
H --> |N| K{存在剩余缓冲区?}
K --> |Y| L["数据放入缓冲区 buf, unlock"]
K --> |N| M["打包成sudog,加入sendq或recvq队列"]
M --> O["gopark(),阻塞当前goroutine等待被接受者唤醒"]
O -."如果被唤醒,说明数据已经被接收,回收sudog".-> P["保存context,运行别的 goroutine"]
P --> Z
L --> Z
Z[End]
graph TD
A[closechan] --c--> B{检查}
B --> |Y| C[加锁 lock]
B --> |N| D[panic]
C --> E{chan是否已close}
E --> |Y| D
E --> |N| F[置 hchan.close = 1]
F --> G[释放recvq的所有等待接收者]
G --> H[释放sendq的所有等待发送者]
H -.发送者会panic.-> D
H --> I[unlock]
I --> J[唤醒recvq和sendq的所有goroutine]
J --> End
waitq 为双向链表,sudog 代表一个封装的 goroutine,其参数 g 为 goroutine 实例结,构如下图:
02. 新建 chan:
在 go 中,通过如下代码创建 chan
1
c := make(chanint, 4)
以上代码,对应的是源码:
1
funcmakechan(t *chantype, size int) *hchan
逻辑流程如下:
graph TD
A[makechan] -->|t, size| B{安全检查}
B -->|N| ZR[ERROR]
B -->|Y| E{size 或 t.elem.size 是否为0?}
E -->|Y| F[mallocgc 默认大小 hchanSize 内存]
E -->|N| G{数据类型是否为指针?}
G -->|Y| H[通过new单独分配chan内存]
G -->|N| I[mallocgc 内存 hchanSize + mem]
H --> Z
F --> Z
I --> Z
Z[chan 赋值属性]
Z --> ZB[END]
funcmakechan(t *chantype, size int) *hchan { elem := t.elem // 安全检查,数据项大小不超过 16K if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 获取要分配的内存 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // size 为 0 的情况,分配 hchan 结构体大小的内存,64位系统为 96 Byte. c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // 数据项不为指针类型,调用 mallocgc 一次性分配内存大小,hchan 结构体大小 + 数据总量大小 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 数据项为指针类型,hchan 和 buf 分开分配内存,GC 中指针类型判断 reachable and unreadchable. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // chan 赋值属性, 数据项大小、数据项类型、缓存数据的容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
03.读写chan
在 go 中,写入 chan 的代码如下:
1 2 3
v := 1 c := make(chanint) c <- v
读取 chan 的代码如下:
1 2 3
var v int c := make(chanint) c -> v
c <- v 操作对应的源码为 runtime 中的
1
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool
而 c -> v 操作对应源码为 runtime 中的
1
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool)
其中 c 为 chansend 的 c, v 的地址为 chansend 的 ep.
逻辑流程如下:
graph TD
A[chansend 或 chanrecv] -->|hchan, ep| B{校验}
B --> |Y| C[加锁 lock]
B --> |N| D["gopark(), 阻塞当前 goroutine 和 throw error"]
C --> E{chan close?}
E --> |Y| F[unlock和panic]
E --> |N| G[取出 recvq 或 sendq 队列]
G --> H{是否等待的sudog?}
H --> |Y| I["send()或recv()"]
I --> J["goready(), 运行 sudog 的 goroutine"]
H --> |N| K{存在剩余缓冲区?}
K --> |Y| L["数据放入缓冲区 buf, unlock"]
K --> |N| M["打包成sudog,加入sendq或recvq队列"]
M --> O["gopark(),阻塞当前goroutine等待被接受者唤醒"]
O -."如果被唤醒,说明数据已经被接收,回收sudog".-> P["保存context,运行别的 goroutine"]
P --> Z
L --> Z
Z[End]
graph TD
A[closechan] --c--> B{检查}
B --> |Y| C[加锁 lock]
B --> |N| D[panic]
C --> E{chan是否已close}
E --> |Y| D
E --> |N| F[置 hchan.close = 1]
F --> G[释放recvq的所有等待接收者]
G --> H[释放sendq的所有等待发送者]
H -.发送者会panic.-> D
H --> I[unlock]
I --> J[唤醒recvq和sendq的所有goroutine]
J --> End