![qrcode_for_gh_5fcf50a6cc50_344.jpg](https://static.studygolang.com/180302/5316ebc60d4994ed92450c8b5da025f9.jpg)
实现: go1.10/src/runtime/chan.go CSP模型的实现
带缓冲的channel:
c.qcount>0 表示c.recvq是empty的,可以接收数据。
c.qcount < c.dataqsiz意味着c.sendq是empty的,可以发送数据。
hchan结构体使用一个环形队列来保存groutine之间传递的数据(如果是缓存channel的话),使用两个list保存向该chan发送和从该chan接收数据的goroutine,还有一个mutex来保证操作这些结构的安全。
type hchan struct {
qcount uint //队列中的总数据量
// 环形队列的大小,channel的cap
dataqsiz uint
// 指向环形队列的数组(hchan指针)
//分配的一块连续的内存空间,可以看成是字节数组
buf unsafe.Pointer
elemsize uint16 //channel元素类型大小
closed uint32 //是否关闭,关闭的时候 closed==1
elemtype *_type //channel元素类型
sendx uint // send index,当sendx==dataqsiz时候,sendx清零,回到对列头 recvx uint // receive index,recvx==dataqsiz时候,recvx清零,回到队列头
recvq waitq //因recv而阻塞的等待队列 <-ch
sendq waitq //因send而阻塞的等待队列 ch<-
//使得结构体和阻塞在该channel上的一些sudog中的字段是并发安全的。
//当协程持有锁的时候不要修改其他协程的状态(尤其是不要将其他协程的状态变成ready),因为这可能导致堆栈减少而死锁。
//mutex是互斥锁,没有竞争的情况跟spin lock(自旋锁)一样快(只是几条用户级别的命令),发生竞争的时候会 sleep in kernel。
lock mutex
}
//go1.10/src/runtime/type.go
type chantype struct {
typ _type
elem *_type
//channel的方向
//调用go1.10/src/cmd/compile/internal/syntax/parser.go解析语法的时候,
//可以知道channel的方向信息。
//go1.10/src/cmd/compile/internal/syntax/nodes.go
// chan Elem可读可写
// <-chan Elem 只读
// chan<- Elem 只写
//ChanType struct {
// Dir ChanDir // 0 表示没有方向,可读可写
// Elem Expr
// expr
// }
//dir的值种类:
//const (
// _ ChanDir = iota //默认0
// SendOnly //1 只写
// RecvOnly //2 只读
//)
dir uintptr
}
makechan实现:
//有同学问为什么channel是引用类型,从返回值可以看出返回的是*hchan,
//只是通过内置的make(chan elemtype,size)函数在堆上分配的内存把底层屏蔽了。
func makechan(t *chantype, size int) *hchan {//元素类型,channel容量
elem := t.elem
if elem.size >= 1<<16 {//元素类型大小>=64k
throw("makechan: invalid channel element type")
}
......
//校验size,必须非负数,且受系统内存限制
if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > _MaxMem-hchanSize {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case size == 0 || elem.size == 0://队列或元素大小为0
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = unsafe.Pointer(c)//buf是hchan指针
case elem.kind&kindNoPointers != 0://非指针类型元素
//分配一块内存给队列,小对象从P关联的缓存free list分配
//大对象(>32kB)直接从heap中分配
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default://指针类型元素
c = new(hchan)
c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
}
c.elemsize = uint16(elem.size)//元素大小
c.elemtype = elem//元素类型
c.dataqsiz = uint(size)//队列容量
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
//用于获取buf指向的数组的第i个元素
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
//sender: ch<-1
//ep是取变量v的地址,block 是select时候使用的表示是否阻塞,
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
//channel接收数据之前获取互斥锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
//如果channel已经关闭,继续发送数据会panic
panic(plainError("send on closed channel"))
}
//从环形队列中获取一个*sudog,
//sudog代表waiting状态goroutine的双向链表。
if sg := c.recvq.dequeue(); sg != nil {
//c是empty的channel
//ep被拷贝进了sg.elem中,然后唤醒goroutine,
//此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。
//最后解锁c, sg必须从recvq删除。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
//获取空间
qp := chanbuf(c, c.sendx)
......
//将ep指向的类型内存拷贝进qp地址空间
typedmemmove(c.elemtype, qp, ep)
c.sendx++//数组索引增加
if c.sendx == c.dataqsiz {//channel已经填满
c.sendx = 0
}
c.qcount++//channel中元素数量增加
unlock(&c.lock)
return true
}
//channel已满,或者无缓冲的channel
//使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//*sudog进入sender队列
c.sendq.enqueue(mysg)
//将当前goroutine状态置成waiting状态,然后解锁c
//goroutine通过调用goready(gp) 变成runnable。
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
......
}
//receiver: elem,ok:=<-ch
//从c接收数据并将数据写入到ep,如果ep是nil接收到的数据被忽略
//if block == false 并且channel是empty,返回(false,false)
//如果channel已经关闭,ep存的是类型零值,返回(true,false)
//否则ep存储元素地址,返回(true,true)
//非nil的ep指向堆或者调用者的帧栈。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
//channel发送数据之前获取互斥锁
lock(&c.lock)
//channel未关闭并且channel没有元素
if c.closed != 0 && c.qcount == 0 {
......
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)//清空ep指向的内存
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
//如果是无缓冲的channel,直接将sudog.elem拷贝进ep
//否则通过计算recvx得到qp,将ep拷贝进qp地址空间,
//然后唤醒goroutine,此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。
//最后解锁c, sg必须从sendq删除。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true,true
}
//如果channel中有元素,直接从内存数组qp中移动到ep中,
//之后将qp指向的内存清空,recvx++
if c.qcount > 0 {
//根据recv index获取内存地址
qp := chanbuf(c, c.recvx)
......
if ep != nil {
//将qp指向的内存拷贝到ep
typedmemmove(c.elemtype, ep, qp)
}
//清空qp指向的内存
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {//队列已空
c.recvx = 0
}
c.qcount--//队列中的元素数量减一
unlock(&c.lock)//解锁channel
return true, true
}
//阻塞在<-ch操作上,
//使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//*sudog进入receiver队列
c.recvq.enqueue(mysg)
//将当前goroutine状态置成waiting状态,然后解锁c
//goroutine通过调用goready(gp) 变成runnable。
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
......
}
//close(ch)
func closechan(c *hchan) {
if c == nil {//channel是nil,调用close panic
panic(plainError("close of nil channel"))
}
//channel加锁
lock(&c.lock)
if c.closed != 0 {//channel已经closed
unlock(&c.lock)//解锁channel
//同一个channel只可以close一次
panic(plainError("close of closed channel"))
}
......
c.closed = 1//标记成closed
var glist *g//MPG中的g
//接下来是顺序执行的两个for循环:
//1:清空recerver queqe,将sg.elem指向的内存清空。
//2:清空sender queue, sg.elem=nil,解锁channel,
//并将receiver queue和 sender queue中获得的goroutine全部置成runnable状态,可以被P调度。
}
# 更多内容: 关注公众号 Leonard18701702934
有疑问加站长微信联系(非本文作者)