go channel实现浅析

Leonard · · 444 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。
![qrcode_for_gh_5fcf50a6cc50_344.jpg](https://static.studygolang.com/180302/5316ebc60d4994ed92450c8b5da025f9.jpg) 实现: go1.10/src/runtime/chan.go CSP模型的实现 带缓冲的channel: c.qcount>0 表示c.recvq是empty的,可以接收数据。 c.qcount < c.dataqsiz意味着c.sendq是empty的,可以发送数据。 hchan结构体使用一个环形队列来保存groutine之间传递的数据(如果是缓存channel的话),使用两个list保存向该chan发送和从该chan接收数据的goroutine,还有一个mutex来保证操作这些结构的安全。 type hchan struct { qcount uint //队列中的总数据量 // 环形队列的大小,channel的cap dataqsiz uint // 指向环形队列的数组(hchan指针) //分配的一块连续的内存空间,可以看成是字节数组 buf unsafe.Pointer elemsize uint16 //channel元素类型大小 closed uint32 //是否关闭,关闭的时候 closed==1 elemtype *_type //channel元素类型 sendx uint // send index,当sendx==dataqsiz时候,sendx清零,回到对列头 recvx uint // receive index,recvx==dataqsiz时候,recvx清零,回到队列头 recvq waitq //因recv而阻塞的等待队列 <-ch sendq waitq //因send而阻塞的等待队列 ch<- //使得结构体和阻塞在该channel上的一些sudog中的字段是并发安全的。 //当协程持有锁的时候不要修改其他协程的状态(尤其是不要将其他协程的状态变成ready),因为这可能导致堆栈减少而死锁。 //mutex是互斥锁,没有竞争的情况跟spin lock(自旋锁)一样快(只是几条用户级别的命令),发生竞争的时候会 sleep in kernel。 lock mutex } //go1.10/src/runtime/type.go type chantype struct { typ _type elem *_type //channel的方向 //调用go1.10/src/cmd/compile/internal/syntax/parser.go解析语法的时候, //可以知道channel的方向信息。 //go1.10/src/cmd/compile/internal/syntax/nodes.go // chan Elem可读可写 // <-chan Elem 只读 // chan<- Elem 只写 //ChanType struct { // Dir ChanDir // 0 表示没有方向,可读可写 // Elem Expr // expr // } //dir的值种类: //const ( // _ ChanDir = iota //默认0 // SendOnly //1 只写 // RecvOnly //2 只读 //) dir uintptr } makechan实现: //有同学问为什么channel是引用类型,从返回值可以看出返回的是*hchan, //只是通过内置的make(chan elemtype,size)函数在堆上分配的内存把底层屏蔽了。 func makechan(t *chantype, size int) *hchan {//元素类型,channel容量 elem := t.elem if elem.size >= 1<<16 {//元素类型大小>=64k throw("makechan: invalid channel element type") } ...... //校验size,必须非负数,且受系统内存限制 if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > _MaxMem-hchanSize { panic(plainError("makechan: size out of range")) } var c *hchan switch { case size == 0 || elem.size == 0://队列或元素大小为0 c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = unsafe.Pointer(c)//buf是hchan指针 case elem.kind&kindNoPointers != 0://非指针类型元素 //分配一块内存给队列,小对象从P关联的缓存free list分配 //大对象(>32kB)直接从heap中分配 c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default://指针类型元素 c = new(hchan) c.buf = mallocgc(uintptr(size)*elem.size, elem, true) } c.elemsize = uint16(elem.size)//元素大小 c.elemtype = elem//元素类型 c.dataqsiz = uint(size)//队列容量 if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n") } return c } //用于获取buf指向的数组的第i个元素 func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) } //sender: ch<-1 //ep是取变量v的地址,block 是select时候使用的表示是否阻塞, func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ...... //channel接收数据之前获取互斥锁 lock(&c.lock) if c.closed != 0 { unlock(&c.lock) //如果channel已经关闭,继续发送数据会panic panic(plainError("send on closed channel")) } //从环形队列中获取一个*sudog, //sudog代表waiting状态goroutine的双向链表。 if sg := c.recvq.dequeue(); sg != nil { //c是empty的channel //ep被拷贝进了sg.elem中,然后唤醒goroutine, //此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。 //最后解锁c, sg必须从recvq删除。 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { //获取空间 qp := chanbuf(c, c.sendx) ...... //将ep指向的类型内存拷贝进qp地址空间 typedmemmove(c.elemtype, qp, ep) c.sendx++//数组索引增加 if c.sendx == c.dataqsiz {//channel已经填满 c.sendx = 0 } c.qcount++//channel中元素数量增加 unlock(&c.lock) return true } //channel已满,或者无缓冲的channel //使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil //*sudog进入sender队列 c.sendq.enqueue(mysg) //将当前goroutine状态置成waiting状态,然后解锁c //goroutine通过调用goready(gp) 变成runnable。 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) ...... } //receiver: elem,ok:=<-ch //从c接收数据并将数据写入到ep,如果ep是nil接收到的数据被忽略 //if block == false 并且channel是empty,返回(false,false) //如果channel已经关闭,ep存的是类型零值,返回(true,false) //否则ep存储元素地址,返回(true,true) //非nil的ep指向堆或者调用者的帧栈。 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ...... //channel发送数据之前获取互斥锁 lock(&c.lock) //channel未关闭并且channel没有元素 if c.closed != 0 && c.qcount == 0 { ...... unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep)//清空ep指向的内存 } return true, false } if sg := c.sendq.dequeue(); sg != nil { //如果是无缓冲的channel,直接将sudog.elem拷贝进ep //否则通过计算recvx得到qp,将ep拷贝进qp地址空间, //然后唤醒goroutine,此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。 //最后解锁c, sg必须从sendq删除。 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true,true } //如果channel中有元素,直接从内存数组qp中移动到ep中, //之后将qp指向的内存清空,recvx++ if c.qcount > 0 { //根据recv index获取内存地址 qp := chanbuf(c, c.recvx) ...... if ep != nil { //将qp指向的内存拷贝到ep typedmemmove(c.elemtype, ep, qp) } //清空qp指向的内存 typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz {//队列已空 c.recvx = 0 } c.qcount--//队列中的元素数量减一 unlock(&c.lock)//解锁channel return true, true } //阻塞在<-ch操作上, //使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil //*sudog进入receiver队列 c.recvq.enqueue(mysg) //将当前goroutine状态置成waiting状态,然后解锁c //goroutine通过调用goready(gp) 变成runnable。 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) ...... } //close(ch) func closechan(c *hchan) { if c == nil {//channel是nil,调用close panic panic(plainError("close of nil channel")) } //channel加锁 lock(&c.lock) if c.closed != 0 {//channel已经closed unlock(&c.lock)//解锁channel //同一个channel只可以close一次 panic(plainError("close of closed channel")) } ...... c.closed = 1//标记成closed var glist *g//MPG中的g //接下来是顺序执行的两个for循环: //1:清空recerver queqe,将sg.elem指向的内存清空。 //2:清空sender queue, sg.elem=nil,解锁channel, //并将receiver queue和 sender queue中获得的goroutine全部置成runnable状态,可以被P调度。 } # 更多内容: 关注公众号 Leonard18701702934

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

444 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传