go-channel初识

GGBond_8488 · · 589 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

了解过go的都知道,go最为突出的优点就是它天然支持高并发,但是所有高并发情况都面临着一个很明显的问题,就是并发的多线程或多协程之间如何通信,而channel就是go中goroutine通信的‘管道’。

channel在go中时如何使用的

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit = make(chan string, 1)

func main() {
  go dealSignal()
  exited := make(chan struct{}, 1)
  go channel1(exited)
  count := 0
  t := time.Tick(time.Second)
Loop:
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exited:
      fmt.Println("main exit begin")
      break Loop
    }
  }
  fmt.Println("main exit end")
}

func dealSignal() {
  c := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    exit <- "shutdown"
  }()
}

func channel1(exited chan<- struct{}) {
  t := time.Tick(time.Second)
  count := 0
  for {
    select {
    case <-t:
      count++
      fmt.Printf("channel1 run %d\n", count)
    case <-exit:
      fmt.Println("channel1 exit")
      close(exited)
      return
    }
  }
}

这个例子首先并发出一个dealsign方法,用来接收关闭信号,如果接收到关闭信号后往exit channel发送一条消息,然后并发运行channel1,channel1中定了一个ticker,正常情况下channel1每秒打印第一个case语句,如果接收到exit的信号,进入第二个case,然后关闭传入的exited channel,那么main中的Loop,接收到exited关闭的信号后,打印“main exit begin”, 然后退出循环,进程成功退出。这个例子演示了channel在goroutine中起到的传递消息的作用。这个例子是为了向大家展示channel在多个goroutine之间进行通信。

Channel在底层是什么样的


type hchan struct {
  qcount   uint           // total data in the queue;chan中的元素总数
  dataqsiz uint           // size of the circular queue;底层循环数组的size
  buf      unsafe.Pointer // points to an array of dataqsiz elements,指向底层循环数组的指针,只针对有缓冲的channel
  elemsize uint16  //chan中元素的大小
  closed   uint32  //chan是否关闭
  elemtype *_type // element type;元素类型
  sendx    uint   // send index;已发送元素在循环数组中的索引
  recvx    uint   // receive index;已接收元素在循环数组中的索引
  recvq    waitq  // list of recv waiters,等待接收消息的goroutine队列
  sendq    waitq  // list of send waiters,等待发送消息的goroutine队列

  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock mutex
}

type waitq struct {
  first *sudog
  last  *sudog
}

创建一个底层数组容量为5,元素类型为int,那么channel的数据结构如下图所示:


创建channel的时候到底发生了什么

创建channel的时候,其实底层是调用makechan方法,我们来看下源码:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // compiler checks this but be safe.
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }

  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }

  // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
  // buf points into the same allocation, elemtype is persistent.
  // SudoG's are referenced from their owning thread so they can't be collected.
  // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  var c *hchan
  switch {
  case mem == 0:
    // Queue or element size is zero.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // Race detector uses this location for synchronization.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // Elements do not contain pointers.
    // Allocate hchan and buf in one call.
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  }
  return c
}

从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

具体来看下代码:
可以看出makechan中其实主要的代码就是一个switch,针对不同的情况:

1、case mem == 0代表无缓冲型channel,只分配hchan本身结构体大小的内存
2、case elem.ptrdata==0 代表元素类型不含指针,只分配hchan本身结构体大小+元素大小*个数的内存,是连续的内存空间
3、default元素类型包括指针,两次分配内存的操作

channel的接收与发送

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先创建了一个无缓冲型的channel,然后启动两个goroutine去消费channel的数据,紧接着向channel中发送数据。我们一步一步来分析channel是如何接收和发送数据的,首先来看接收,golang中接收channel数据有两种方式:

i <- ch
i, ok <- ch

// 位于 src/runtime/chan.go

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 内容 …………

    // 如果是一个 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否则,接收一个 nil 的 channel,goroutine 挂起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不会执行到这里
        throw("unreachable")
    }

    // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
    // 当我们观察到 channel 没准备好接收:
    // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
    // 2. 缓冲型,但 buf 里没有元素
    // 之后,又观察到 closed == 0,即 channel 未关闭。
    // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
    // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    // channel 已关闭,并且循环数组 buf 里没有元素
    // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
    // 也就是说即使是关闭状态,但在缓冲型的 channel,
    // buf 里有元素的情况下还能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解锁
        unlock(&c.lock)
        if ep != nil {
            // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
            // 那么接收的值将是一个该类型的零值
            // typedmemclr 根据类型清理相应地址的内存
            typedmemclr(c.elemtype, ep)
        }
        // 从一个已关闭的 channel 接收,selected 会返回true
        return true, false
    }

    // 等待发送队列里有 goroutine 存在,说明 buf 是满的
    // 这有可能是:
    // 1. 非缓冲型的 channel
    // 2. 缓冲型的 channel,但 buf 满了
    // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
    // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 缓冲型,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // 直接从循环数组里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循环数组里相应位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游标向前移动
        c.recvx++
        // 接收游标归零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 数组里的元素个数减 1
        c.qcount--
        // 解锁
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下来就是要被阻塞的情况了
    // 构造一个 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收数据的地址保存下来
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 进入channel 的等待接收队列
    c.recvq.enqueue(mysg)
    // 将当前 goroutine 挂起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被唤醒了,接着从这里继续执行一些扫尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
Step1

如果channel是nil:如果是非阻塞模式,直接返回(false,false);如果是阻塞模式,调用goprak挂起goroutine,会阻塞下去。

if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
Step2

快速操作(不用获取锁,快速返回),三组条件全部满足,快速返(false,false)

条件1:首先是在非阻塞模式下
条件2:如果是非缓冲型(datasiz=0)并且等待发送goroutine队列为空(sendq.first=nil,就是没人往channel写数据),或者缓冲型channel(datasiz>0)并且buf中没有数据;
条件3:channel未关闭


//##################step2####################
  if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    atomic.Load(&c.closed) == 0 {
    return
  }
Step3

首先加锁,如果channel已经关闭,并且buf中没有元素,返回对应类型的0值,但是received为false;两种情况

情形1:非缓冲型,channel已关闭
情形2:缓冲型,channel已关闭,并且buf无元素

也就是说即使是关闭状态,但在缓冲型的 channel,
buf 里有元素的情况下还能接收到元素


//##################step3####################
  lock(&c.lock)

  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
step4

如果等待发送队列中有元素,证明channel已经满了,两种情形

情形1:非缓冲型,无buf
情形2:缓冲型,buf满了

//##################step4####################
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

两种情形都正常进入recv方法,我们来看下源码:


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  //##################step4-1####################
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      // copy data from sender
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
     //##################step4-2####################
    // Queue is full. Take the item at the
    // head of the queue. Make the sender enqueue
    // its item at the tail of the queue. Since the
    // queue is full, those are both the same slot.
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
    }
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)(从发送者的栈copy到接收者的栈)
针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部.
然后唤醒等待发送队列中的goroutine,等待调度器调度。

step5

没有等待发送的队列,并且buf中有元素,直接把接收游标处的数据copy到接收数据的地址,然后改变hchan中元素数据。

if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
step6

如果是非阻塞,那么直接返回;如果是阻塞的,构造sudog,保存各种值;将sudog保存到channel的recvq中,调用goparkunlock将goroutine挂起

if !block {
    unlock(&c.lock)
    return false, false
  }
// no sender available: block on this channel.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

  // someone woke us up
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, !closed

非阻塞接收,解锁。selected 返回 false,因为没有接收到值

我们继续之前的例子。前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。

在程序的 17 行之前,chan 的整体数据结构如下:


buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq 和 sendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段,g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。

此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。

再从整体上来看一下 chan 此时的状态:


当一个channel关闭后,我们依然可以从中读出数据,如果chan的buf中有元素,则读出的是chan中buf的数据,如果buf为空,则输出对应元素类型的零值。那么我们来看下如下的一段程序:

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit1 = make(chan struct{}, 1)

func main() {
  go dealSignal1()
  count := 0
  t := time.Tick(time.Second)
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exit1:
      fmt.Println("main exit begin")
    }
  }
  fmt.Println("main exit over")
}

func dealSignal1() {
  c := make(chan os.Signal, 2)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    close(exit1)
  }()
}

发送

接着上面的例子,G1 和 G2 现在都在 recvq 队列里了。
17 行向 channel 发送了一个元素 3。

发送操作最终转化为 chansend 函数,直接上源码,同样大部分都注释了,可以看懂主流程:

// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未发送成功
        if !block {
            return false
        }
        // 当前 goroutine 被挂起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相关……

    // 对于不阻塞的 send,快速检测失败场景
    //
    // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
    // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
    // 2. channel 是缓冲型的,但循环数组已经装满了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 锁住 channel,并发安全
    lock(&c.lock)

    // 如果 channel 关闭了
    if c.closed != 0 {
        // 解锁
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 对于缓冲型的 channel,如果还有缓冲空间
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 将数据从 ep 处拷贝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 发送游标值加 1
        c.sendx++
        // 如果发送游标值等于容量值,游标值归 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 缓冲区的元素数量加一
        c.qcount++

        // 解锁
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,则直接返回错误
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 满了,发送方会被阻塞。接下来会构造一个 sudog

    // 获取当前 goroutine 的指针
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 当前 goroutine 进入发送等待队列
    c.sendq.enqueue(mysg)

    // 当前 goroutine 被挂起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 从这里开始被唤醒了(channel 有机会可以发送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被唤醒后,channel 关闭了。坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上绑定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

我们继续往下走,G1、G2被挂起后,往channel中发送一个数据3,其实调用的是chansend方法,我们还是逐步的去讲解

step1

如果channel=nil,当前goroutine会被挂起


if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
step2

依然是一个不加锁的快速操作,三组条件

条件1:非阻塞
条件2:channel未关闭
条件3:channel是非缓冲型,并且等待接收队列为空;或者缓冲型,并且循环数组已经满了

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
  }
step3

加锁,如果channel已经关闭,直接panic

lock(&c.lock)

if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
step4

如果等待接收队列不为空,说明什么?

情形1:非缓冲型,等待接收队列不为空
情形2:缓冲型,等待接收队列不为空(说明buf为空)

两种情形,都是直接将待发送数据直接copy到接收处

if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接从ep copy到sg
    return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if raceenabled {
    if c.dataqsiz == 0 {
      racesync(c, sg)
    } else {
      // Pretend we go through the buffer, even though
      // we copy directly. Note that we need to increment
      // the head/tail locations only when raceenabled.
      qp := chanbuf(c, c.recvx)
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
        c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  }
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

两种情形,都直接从一个用一个goroutine操作另一个goroutine的栈,因此在sendDirect方法中会有一次写屏障

step5

如果等待队列为空,并且缓冲区未满,肯定是缓冲型的channel


if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

将元素放在sendx处,然后sendx加1,channel总量加1

step6

如果以上情况都没有命中,说明什么?说明channel已经满了,如果是非阻塞的直接返回,否则需要调用gopack将这个goroutine挂起,等待被唤醒。

if !block {
    unlock(&c.lock)
    return false
  }

  // Block on the channel. Some receiver will complete our operation for us.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  // Ensure the value being sent is kept alive until the
  // receiver copies it out. The sudog has a pointer to the
  // stack object, but sudogs aren't considered as roots of the
  // stack tracer.
  KeepAlive(ep)

我们对照程序分析下,在前一个小节G1、G2被挂起来了,等待sender的解救;这时候往ch中发送了一个3,(step4)这时sender发现ch的等待接收队列recvq中有receiver,就会出队一个sudog,然后将元素直接copy到sudog的elem处,然后调用goready将G1唤醒,继续执行G1原来的代码,打印出结果。如下图:



当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。

这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!

关闭

close一个channel会调用closechan方法,比较简单,我们也来看下

func closechan(c *hchan) {
    // 关闭一个 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上锁
    lock(&c.lock)
    // 如果 channel 已经关闭
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改关闭状态
    c.closed = 1

    var glist *g

    // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        // 从接收队列里出队一个 sudog
        sg := c.recvq.dequeue()
        // 出队完毕,跳出循环
        if sg == nil {
            break
        }

        // 如果 elem 不为空,说明此 receiver 未忽略接收数据
        // 给它赋一个相应类型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相连,形成链表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在,这些 goroutine 将会 panic
    for {
        // 从发送队列里出队一个 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 发送者会 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成链表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解锁
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍历链表
    for glist != nil {
        // 取最后一个
        gp := glist
        // 向前走一步,下一个唤醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 唤醒相应 goroutine
        goready(gp, 3)
    }
}
step1

如果channel为nil,会直接panic

if c == nil {
    panic(plainError("close of nil channel"))
  }
step2

加锁,如果channel已经关闭,再次关闭会panic


lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
step3

首选将hchan对应close标志置为1,然后声明一个链表;将等待接收队列中的所有sudog加入到链表,并将其elem赋予一个相应类型的0值;


c.closed = 1

  var glist gList

  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
step4

向所有等待发送队列的sudog加入链表


// release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)
step5

唤醒sudog所有goroutine

for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }

close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。

close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。

唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。

总结

总结一下,发生 panic 的情况有三种:

1.向一个关闭的 channel 进行写操作;

  1. 关闭一个 nil 的 channel;
  2. 重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

channel发送和接收元素的本质还是值得拷贝
channel是并发安全的(加锁)

参考:博客园-深度解密Go语言只channel
好未来Golang源码系列三:Channel实现原理分析


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:GGBond_8488

查看原文:go-channel初识

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

589 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传