go channel实现浅析

Leonard · 2018-03-02 10:25:50 · 923 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2018-03-02 10:25:50 的主题,其中的信息可能已经有所发展或是发生改变。

qrcode_for_gh_5fcf50a6cc50_344.jpg

实现: go1.10/src/runtime/chan.go CSP模型的实现

带缓冲的channel:

    c.qcount>0 表示c.recvq是empty的,可以接收数据。

    c.qcount < c.dataqsiz意味着c.sendq是empty的,可以发送数据。

hchan结构体使用一个环形队列来保存groutine之间传递的数据(如果是缓存channel的话),使用两个list保存向该chan发送和从该chan接收数据的goroutine,还有一个mutex来保证操作这些结构的安全。

type hchan struct { qcount uint //队列中的总数据量 // 环形队列的大小,channel的cap

  dataqsiz uint           

  // 指向环形队列的数组(hchan指针)

  //分配的一块连续的内存空间,可以看成是字节数组

  buf      unsafe.Pointer 
  elemsize uint16   //channel元素类型大小
  closed   uint32   //是否关闭,关闭的时候 closed==1
  elemtype *_type //channel元素类型
  sendx    uint  // send index,当sendx==dataqsiz时候,sendx清零,回到对列头       recvx    uint   // receive index,recvx==dataqsiz时候,recvx清零,回到队列头
  recvq    waitq  //因recv而阻塞的等待队列 <-ch
  sendq   waitq  //因send而阻塞的等待队列 ch<-



  //使得结构体和阻塞在该channel上的一些sudog中的字段是并发安全的。

  //当协程持有锁的时候不要修改其他协程的状态(尤其是不要将其他协程的状态变成ready),因为这可能导致堆栈减少而死锁。

 //mutex是互斥锁,没有竞争的情况跟spin lock(自旋锁)一样快(只是几条用户级别的命令),发生竞争的时候会 sleep in kernel。

   lock mutex

}

//go1.10/src/runtime/type.go

type chantype struct {

  typ  _type

  elem *_type

  //channel的方向

  //调用go1.10/src/cmd/compile/internal/syntax/parser.go解析语法的时候,

  //可以知道channel的方向信息。

  //go1.10/src/cmd/compile/internal/syntax/nodes.go

  // chan Elem可读可写

  // <-chan Elem 只读
 // chan<- Elem  只写
 //ChanType struct {
 //      Dir  ChanDir // 0 表示没有方向,可读可写
 //      Elem Expr
 //      expr

 // }    

 //dir的值种类:  

//const (
//     _ ChanDir = iota //默认0
//    SendOnly            //1 只写
//    RecvOnly            //2 只读

//)

dir uintptr

}

makechan实现:

//有同学问为什么channel是引用类型,从返回值可以看出返回的是*hchan,

//只是通过内置的make(chan elemtype,size)函数在堆上分配的内存把底层屏蔽了。

func makechan(t chantype, size int) hchan {//元素类型,channel容量 elem := t.elem if elem.size >= 1<<16 {//元素类型大小>=64k throw("makechan: invalid channel element type") }

  ......
   //校验size,必须非负数,且受系统内存限制
  if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > _MaxMem-hchanSize {
         panic(plainError("makechan: size out of range"))
  }

  var c *hchan

  switch {

        case size == 0 || elem.size == 0://队列或元素大小为0
             c = (*hchan)(mallocgc(hchanSize, nil, true))
             c.buf = unsafe.Pointer(c)//buf是hchan指针
        case elem.kind&kindNoPointers != 0://非指针类型元素

              //分配一块内存给队列,小对象从P关联的缓存free list分配

              //大对象(>32kB)直接从heap中分配
             c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))

             c.buf = add(unsafe.Pointer(c), hchanSize)
        default://指针类型元素
             c = new(hchan)
             c.buf = mallocgc(uintptr(size)*elem.size, elem, true)

  }

  c.elemsize = uint16(elem.size)//元素大小
  c.elemtype = elem//元素类型
  c.dataqsiz = uint(size)//队列容量

  if debugChan {
         print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  }
  return c

}

//用于获取buf指向的数组的第i个元素

func chanbuf(c hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)uintptr(c.elemsize)) }

//sender: ch<-1

//ep是取变量v的地址,block 是select时候使用的表示是否阻塞,

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    ......

    //channel接收数据之前获取互斥锁

    lock(&c.lock)

    if c.closed != 0 {

          unlock(&c.lock)

          //如果channel已经关闭,继续发送数据会panic

          panic(plainError("send on closed channel"))

    }

    //从环形队列中获取一个*sudog,

    //sudog代表waiting状态goroutine的双向链表。

    if sg := c.recvq.dequeue(); sg != nil {

           //c是empty的channel

           //ep被拷贝进了sg.elem中,然后唤醒goroutine,

           //此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。

           //最后解锁c, sg必须从recvq删除。

          send(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true
    }

    if c.qcount < c.dataqsiz {
        //获取空间

        qp := chanbuf(c, c.sendx)

      ......

        //将ep指向的类型内存拷贝进qp地址空间

        typedmemmove(c.elemtype, qp, ep)
        c.sendx++//数组索引增加
        if c.sendx == c.dataqsiz {//channel已经填满
             c.sendx = 0
        }
        c.qcount++//channel中元素数量增加
        unlock(&c.lock)
        return true

    }

    //channel已满,或者无缓冲的channel

    //使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
          mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c

    gp.param = nil



    //*sudog进入sender队列

    c.sendq.enqueue(mysg)

    //将当前goroutine状态置成waiting状态,然后解锁c

    //goroutine通过调用goready(gp) 变成runnable。

    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    ......        

}

//receiver: elem,ok:=<-ch

//从c接收数据并将数据写入到ep,如果ep是nil接收到的数据被忽略

//if block == false 并且channel是empty,返回(false,false)

//如果channel已经关闭,ep存的是类型零值,返回(true,false)

//否则ep存储元素地址,返回(true,true)

//非nil的ep指向堆或者调用者的帧栈。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    ......

    //channel发送数据之前获取互斥锁

    lock(&c.lock)

    //channel未关闭并且channel没有元素

    if c.closed != 0 && c.qcount == 0 {
         ......
         unlock(&c.lock)
         if ep != nil {
             typedmemclr(c.elemtype, ep)//清空ep指向的内存
         }
         return true, false

    }



    if sg := c.sendq.dequeue(); sg != nil {

           //如果是无缓冲的channel,直接将sudog.elem拷贝进ep

           //否则通过计算recvx得到qp,将ep拷贝进qp地址空间,

           //然后唤醒goroutine,此时goroutine是ready状态可以被P(调度器上下文可以理解为cpu)调用。

          //最后解锁c, sg必须从sendq删除。

          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

          return true,true

    }

//如果channel中有元素,直接从内存数组qp中移动到ep中,

//之后将qp指向的内存清空,recvx++

if c.qcount > 0 {

        //根据recv index获取内存地址

        qp := chanbuf(c, c.recvx)

        ......

        if ep != nil {

               //将qp指向的内存拷贝到ep

              typedmemmove(c.elemtype, ep, qp)

        }        

        //清空qp指向的内存

        typedmemclr(c.elemtype, qp)

        c.recvx++

        if c.recvx == c.dataqsiz {//队列已空
              c.recvx = 0
        }
        c.qcount--//队列中的元素数量减一
        unlock(&c.lock)//解锁channel
        return true, true

}

//阻塞在<-ch操作上,

//使用当前waiting状态的goroutine,ep(元素地址) 构造*sudog

gp := getg()

mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
      mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c

gp.param = nil

//*sudog进入receiver队列

c.recvq.enqueue(mysg)



 //将当前goroutine状态置成waiting状态,然后解锁c

 //goroutine通过调用goready(gp) 变成runnable。

 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

......

}

//close(ch)

func closechan(c *hchan) {

if c == nil {//channel是nil,调用close panic
      panic(plainError("close of nil channel"))

}

//channel加锁

lock(&c.lock)
if c.closed != 0 {//channel已经closed

      unlock(&c.lock)//解锁channel

      //同一个channel只可以close一次
      panic(plainError("close of closed channel"))
}

......

c.closed = 1//标记成closed

var glist *g//MPG中的g

//接下来是顺序执行的两个for循环:

//1:清空recerver queqe,将sg.elem指向的内存清空。

//2:清空sender queue, sg.elem=nil,解锁channel,

//并将receiver queue和 sender queue中获得的goroutine全部置成runnable状态,可以被P调度。

}

更多内容: 关注公众号 Leonard18701702934


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

923 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传