01.chan 的数据结构:
golang 中 chan 的源码在 src/runtime/chan.go 文件中,hchan
则为 chan 的结构体
hchan:
type hchan struct {
qcount uint // 当前缓存数据的总量
dataqsiz uint // 缓存数据的容量
buf unsafe.Pointer // 缓存数据,为一个循环数组,容量大小为 dataqsiz,当前大小为 qcount
elemsize uint16 // 数据类型的大小,比如 int 为 4
closed uint32 // 标记是否关闭
elemtype *_type // 数据的类型
sendx uint // 发送队列 sendq 的长度
recvx uint // 接收队列 recvq 的长度
recvq waitq // 阻塞的接收 goroutine 的队列
sendq waitq // 阻塞的发送 goroutine 的队列
lock mutex // 锁,用于并发控制队列操作
}
waitq:
type waitq struct {
first *sudog
last *sudog
}
waitq 为双向链表,sudog 代表一个封装的 goroutine,其参数 g 为 goroutine 实例结,构如下图:
02. 新建 chan:
在 go 中,通过如下代码创建 chan
c := make(chan int, 4)
以上代码,对应的是源码:
func makechan(t *chantype, size int) *hchan
逻辑流程如下:
graph TD
A[makechan] -->|t, size| B{安全检查}
B -->|N| ZR[ERROR]
B -->|Y| E{size 或 t.elem.size 是否为0?}
E -->|Y| F[mallocgc 默认大小 hchanSize 内存]
E -->|N| G{数据类型是否为指针?}
G -->|Y| H[通过new单独分配chan内存]
G -->|N| I[mallocgc 内存 hchanSize + mem]
H --> Z
F --> Z
I --> Z
Z[chan 赋值属性]
Z --> ZB[END]
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 安全检查,数据项大小不超过 16K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 获取要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// size 为 0 的情况,分配 hchan 结构体大小的内存,64位系统为 96 Byte.
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 数据项不为指针类型,调用 mallocgc 一次性分配内存大小,hchan 结构体大小 + 数据总量大小
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 数据项为指针类型,hchan 和 buf 分开分配内存,GC 中指针类型判断 reachable and unreadchable.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// chan 赋值属性, 数据项大小、数据项类型、缓存数据的容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
03.读写chan
在 go 中,写入 chan 的代码如下:
v := 1
c := make(chan int)
c <- v
读取 chan 的代码如下:
var v int
c := make(chan int)
c -> v
c <- v
操作对应的源码为 runtime 中的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
而 c -> v
操作对应源码为 runtime 中的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
其中 c 为 chansend 的 c, v 的地址为 chansend 的 ep.
逻辑流程如下:
graph TD
A[chansend 或 chanrecv] -->|hchan, ep| B{校验}
B --> |Y| C[加锁 lock]
B --> |N| D["gopark(), 阻塞当前 goroutine 和 throw error"]
C --> E{chan close?}
E --> |Y| F[unlock和panic]
E --> |N| G[取出 recvq 或 sendq 队列]
G --> H{是否等待的sudog?}
H --> |Y| I["send()或recv()"]
I --> J["goready(), 运行 sudog 的 goroutine"]
H --> |N| K{存在剩余缓冲区?}
K --> |Y| L["数据放入缓冲区 buf, unlock"]
K --> |N| M["打包成sudog,加入sendq或recvq队列"]
M --> O["gopark(),阻塞当前goroutine等待被接受者唤醒"]
O -."如果被唤醒,说明数据已经被接收,回收sudog".-> P["保存context,运行别的 goroutine"]
P --> Z
L --> Z
Z[End]
有疑问加站长微信联系(非本文作者)