前言
Golang在并发编程上有两大利器,分别是channel和goroutine,这篇文章我们先聊聊channel。熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync包中提供了传统的锁机制,但更推荐使用channel来解决并发问题。这篇文章会先从channel的用法、channel的原理两部分对channel做一个较为深入的探究。
channel用法
什么是channel
从字面上看,channel的意思大概就是管道的意思。channel是一种go协程用以接收或发送消息的安全的消息队列,channel就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:
channel的用法很简单:
func main() {
ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel
ch <- 2 // 将2发送到ch
n, ok := <- ch // n接收从ch发出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 关闭channel
}
使用channel时有几个注意点:
向一个nil channel发送消息,会一直阻塞;
向一个已经关闭的channel发送消息,会引发运行时恐慌(panic);
channel关闭后不可以继续向channel发送消息,但可以继续从channel接收消息;
当channel关闭并且缓冲区为空时,继续从从channel接收消息会得到一个对应类型的零值。
Unbuffered channels与Buffered channels
Unbuffered channels是指缓冲区大小为0的channel,这种channel的接收者会阻塞直至接收到消息,发送者会阻塞直至接收者接收到消息,这种机制可以用于两个goroutine进行状态同步;Buffered channels拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。
引用The Nature Of Channels In Go中的两张图来说明Unbuffered channels与Buffered channels, 非常形象,读者可自行体会一下:
Unbuffered channels
Buffered channels:
channel的遍历
for range
channel支持 for range 的方式进行遍历:
packagemain
import"fmt"
funcmain(){
ci :=make(chanint,5)
fori :=1; i <=5; i++ {
ci <- i
}
close(ci)
fori :=rangeci {
fmt.Println(i)
}
}
值得注意的是,在遍历时,如果channel没有关闭,那么会一直等待下去,出现deadlock的错误;如果在遍历时channel已经关闭,那么在遍历完数据后自动退出遍历。也就是说,for range的遍历方式时阻塞型的遍历方式。
for select
select可以处理非阻塞式消息发送、接收及多路选择。
packagemain
import"fmt"
funcmain(){
ci :=make(chanint,2)
fori :=1; i <=2; i++ {
ci <- i
}
close(ci)
cs :=make(chanstring,2)
cs <-"hi"
cs <-"golang"
close(cs)
ciClosed, csClosed :=false,false
for{
ifciClosed && csClosed {
return
}
select{
casei, ok := <-ci:
ifok {
fmt.Println(i)
}else{
ciClosed =true
fmt.Println("ci closed")
}
cases, ok := <-cs:
ifok {
fmt.Println(s)
}else{
csClosed =true
fmt.Println("cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
select中有case代码块,用于channel发送或接收消息,任意一个case代码块准备好时,执行其对应内容;多个case代码块准备好时,随机选择一个case代码块并执行;所有case代码块都没有准备好,则等待;还可以有一个default代码块,所有case代码块都没有准备好时执行default代码块。
channel原理
先贴一下channel的源码地址,读者可以对照来看。
数据结构
先看channel的结构体:
typehchanstruct{
qcountuint// total data in the queue
dataqsizuint// size of the circular queue
buf unsafe.Pointer// points to an array of dataqsiz elements
// channel中元素大小
elemsizeuint16
// 是否已关闭
closeduint32
// channel中元素类型
elemtype *_type// element type
sendxuint// send index
recvxuint// receive index
recvq waitq// list of recv waiters
sendq waitq// list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channel的缓冲区其实是一个环形队列,qcount表示队列中元素的数量,dataqsiz表示环形队列的总大小,buf表示一个指向循环数组的指针;sendx和recvx分别用来标识当前发送和接收的元素在循环队列中的位置;recvq和sendq都是一个列表,分别用于存储当前处于等待接收和等待发送的Goroutine。
再看一下waitq的数据结构:
typewaitqstruct{
first *sudog
last *sudog
}
typesudogstruct{
// 当前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelectbool
next *sudog
prev *sudog
elem unsafe.Pointer// data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretimeint64
releasetimeint64
ticketuint32
parent *sudog// semaRoot binary tree
waitlink *sudog// g.waiting list or semaRoot
waittail *sudog// semaRoot
c *hchan// channel
}
其中sudog表示处于等待列表中的Goroutine封装,包含了一些上下文信息,first和last分别指向等待列表的首位的Goroutine。
编译分析
在分析channel的原理之前,我们先使用go tool分析以下代码,看看channel的各种操作在底层调用了什么运行时方法:
ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
编译
go build test.go
go tool objdump -s "main\.main" test | grep CALL
把CALL过滤出来:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
创建
从上面的编译分析可以看出在创建channel时调用了运行时方法makechan:
funcmakechan(t *chantype, sizeint)*hchan{
elem := t.elem
// compiler checks this but be safe.
ifelem.size >=1<<16{
throw("makechan: invalid channel element type")
}
ifhchanSize%maxAlign !=0|| elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
mem, overflow := math.MulUintptr(elem.size,uintptr(size))
ifoverflow || mem > maxAlloc-hchanSize || size <0{
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
varc *hchan
switch{
casemem ==0:
// 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize,nil,true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
caseelem.kind&kindNoPointers !=0:
// 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem,nil,true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针,为hchan和缓冲区分别分配空间
// Elements contain pointers.
c =new(hchan)
c.buf = mallocgc(mem, elem,true)
}
c.elemsize =uint16(elem.size)
c.elemtype = elem
c.dataqsiz =uint(size)
ifdebugChan {
print("makechan: chan=", c,"; elemsize=", elem.size,"; elemalg=", elem.alg,"; dataqsiz=", size,"\n")
}
returnc
}
makechan的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel中元素大小为0时,只需分配channel必需的空间即可;当channel元素类型不是指针时,则只需要为hchan和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan必需的空间;默认情况,缓冲区包含指针,则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段,包括elemsize,elemtype,dataqsiz。
发送
channel的发送操作调用了运行时方法chansend1, 在
chansend1内部又调用了chansend,直接来看chansend的实现:
funcchansend(c *hchan, ep unsafe.Pointer, blockbool, callerpcuintptr)bool{
// channel为nil
ifc ==nil{
// 如果是非阻塞,直接返回发送不成功
if!block {
returnfalse
}
// 否则,当前Goroutine阻塞挂起
gopark(nil,nil, waitReasonChanSendNilChan, traceEvGoStop,2)
throw("unreachable")
}
ifdebugChan {
print("chansend: chan=", c,"\n")
}
ifraceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 对于非阻塞且channel未关闭,如果无缓冲区且没有等待接收的Goroutine,或者有缓冲区且缓冲区已满,那么都直接返回发送不成功
if!block && c.closed ==0&& ((c.dataqsiz ==0&& c.recvq.first ==nil) ||
(c.dataqsiz >0&& c.qcount == c.dataqsiz)) {
returnfalse
}
vart0int64
ifblockprofilerate >0{
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭
ifc.closed !=0{
// 解锁,直接panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 除了以上情况,当channel未关闭时,就有以下几种情况:
// 1、当存在等待接收的Goroutine
ifsg := c.recvq.dequeue(); sg !=nil{
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 那么直接把正在发送的值发送给等待接收的Goroutine
send(c, sg, ep,func(){ unlock(&c.lock) },3)
returntrue
}
// 2、当缓冲区未满时
ifc.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 获取指向缓冲区数组中位于sendx位置的元素的指针
qp := chanbuf(c, c.sendx)
ifraceenabled {
raceacquire(qp)
racerelease(qp)
}
// 将当前发送的值拷贝到缓冲区
typedmemmove(c.elemtype, qp, ep)
// sendx索引加一
c.sendx++
// 因为是循环队列,sendx等于队列长度时置为0
ifc.sendx == c.dataqsiz {
c.sendx =0
}
// 队列中元素总数加一,并解锁,返回发送成功
c.qcount++
unlock(&c.lock)
returntrue
}
// 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败
if!block {
unlock(&c.lock)
returnfalse
}
// Block on the channel. Some receiver will complete our operation for us.
// 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里
gp := getg()
mysg := acquireSudog()
mysg.releasetime =0
ift0 !=0{
mysg.releasetime =-1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink =nil
mysg.g = gp
mysg.isSelect =false
mysg.c = c
gp.waiting = mysg
gp.param =nil
c.sendq.enqueue(mysg)
// 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend,3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被唤醒之后执行清理工作并释放sudog结构体
ifmysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting =nil
ifgp.param ==nil{
ifc.closed ==0{
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param =nil
ifmysg.releasetime >0{
blockevent(mysg.releasetime-t0,2)
}
mysg.c =nil
releaseSudog(mysg)
returntrue
}
chansend的执行逻辑,上面的注释已经写得很清楚了,我们再来梳理一下。对于非阻塞发送或者channel已经关闭条件下的几种发送失败的情况,处理逻辑比较简单,读者可以对照注释来看;这里我们重点关注channel未关闭时几种常规情况:
存在等待接收的Goroutine
如果等待接收的队列recvq中存在Goroutine,那么直接把正在发送的值发送给等待接收的Goroutine。示意图如下:
具体看一下send方法:
funcsend(c *hchan, sg *sudog, ep unsafe.Pointer, unlockffunc(),skipint){
...
ifsg.elem !=nil{
// 将发送的值直接拷贝到接收值(比如v = <-ch 中的v)的内存地址
sendDirect(c.elemtype, sg, ep)
sg.elem =nil
}
// 获取等待接收数据的Goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
ifsg.releasetime !=0{
sg.releasetime = cputicks()
}
// 唤醒之前等待接收数据的Goroutine
goready(gp, skip+1)
}
这里有必要说明一下Goroutine在调度过程中的几种状态:
_Gidle =iota// goroutine刚刚分配,还没有初始化
_Grunnable// goroutine处于运行队列中, 还没有运行,没有自己的栈
_Grunning// goroutine在运行中,拥有自己的栈,被分配了M(线程)和P(调度上下文)
_Gsyscall// goroutine在执行系统调用
_Gwaiting// goroutine被阻塞
_Gdead// goroutine没有被使用,可能是刚刚退出,或者正在初始化中
_Gcopystack// 表示g当前的栈正在被移除并分配新栈
当调用goready时,将Goroutine的状态从_Gwaiting置为_Grunnable,等待下一次调度再次执行。
当缓冲区未满时
当缓冲区未满时,找到sendx所指向的缓冲区数组的位置,将正在发送的值拷贝到该位置,并增加sendx索引以及释放锁,示意图如下:
阻塞发送
如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里。示意图如下:
之后则调用goparkunlock将当前Goroutine设置为_Gwaiting状态并解锁,进入阻塞状态等待被唤醒(调用goready);如果被调度器唤醒,执行清理工作并最终释放对应的sudog结构体。
接收
channel的接收有两种形式:
<-ch
n, ok := <-ch
这两种方式分别调用运行时方法chanrecv1和chanrecv2:
funcchanrecv1(c *hchan, elem unsafe.Pointer){
chanrecv(c, elem,true)
}
funcchanrecv2(c *hchan, elem unsafe.Pointer)(receivedbool){
_, received = chanrecv(c, elem,true)
return
}
这两个方法最终都会调用chanrecv方法:
funcchanrecv(c *hchan, ep unsafe.Pointer, blockbool)(selected, receivedbool){
ifdebugChan {
print("chanrecv: chan=", c,"\n")
}
// channel为nil
ifc ==nil{
// 非阻塞直接返回(false, false)
if!block {
return
}
// 阻塞接收,则当前Goroutine阻塞挂起
gopark(nil,nil, waitReasonChanReceiveNilChan, traceEvGoStop,2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 非阻塞模式,对于以下两种情况:
// 1、无缓冲区且等待发送队列也为空
// 2、有缓冲区但缓冲区数组为空且channel未关闭
// 这两种情况都是接收失败, 直接返回(false, false)
if!block && (c.dataqsiz ==0&& c.sendq.first ==nil||
c.dataqsiz >0&& atomic.Loaduint(&c.qcount) ==0) &&
atomic.Load(&c.closed) ==0{
return
}
vart0int64
ifblockprofilerate >0{
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭,并且缓冲区无元素
ifc.closed !=0&& c.qcount ==0{
ifraceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 有等待接收的变量(即 v = <-ch中的v)
ifep !=nil{
//根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
typedmemclr(c.elemtype, ep)
}
// 返回(true, false),即接收到值,但不是从channel中接收的有效值
returntrue,false
}
// 除了以上非常规情况,还有有以下几种常见情况:
// 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满
ifsg := c.sendq.dequeue(); sg !=nil{
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
recv(c, sg, ep,func(){ unlock(&c.lock) },3)
// 接收成功
returntrue,true
}
// 2、缓冲区buf中有元素
ifc.qcount >0{
// Receive directly from queue
// 从recvx指向的位置获取元素
qp := chanbuf(c, c.recvx)
ifraceenabled {
raceacquire(qp)
racerelease(qp)
}
ifep !=nil{
// 将从buf中取出的元素拷贝到当前协程
typedmemmove(c.elemtype, ep, qp)
}
// 同时将取出的数据所在的内存清空
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
ifc.recvx == c.dataqsiz {
c.recvx =0
}
// buf元素总数 -1
c.qcount--
// 解锁,返回接收成功
unlock(&c.lock)
returntrue,true
}
// 3、非阻塞模式,且没有数据可以接受
if!block {
// 解锁,直接返回接收失败
unlock(&c.lock)
returnfalse,false
}
// no sender available: block on this channel.
// 4、阻塞模式,获取当前Goroutine,打包一个sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime =0
ift0 !=0{
mysg.releasetime =-1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink =nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect =false
mysg.c = c
gp.param =nil
// 加入到channel的等待接收队列recvq中
c.recvq.enqueue(mysg)
// 挂起当前Goroutine,设置为_Gwaiting状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv,3)
// someone woke us up
// 被唤醒之后执行清理工作并释放sudog结构体
ifmysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting =nil
ifmysg.releasetime >0{
blockevent(mysg.releasetime-t0,2)
}
closed := gp.param ==nil
gp.param =nil
mysg.c =nil
releaseSudog(mysg)
returntrue, !closed
}
chanrecv方法的处理逻辑与chansend非常类似,我们这里仍然只分析几种常见情况,其他情况上述注释也解释得比较清楚了,读者可对照相应代码和注释查看。
存在等待发送的Goroutine
如果等待发送的队列sendq里存在挂起的Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满。从sendq中取出最先阻塞的Goroutine,然后调用recv方法:
funcrecv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockffunc(),skipint){
ifc.dataqsiz ==0{
// 无缓冲区
ifraceenabled {
racesync(c, sg)
}
ifep !=nil{
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
}else{
// 缓冲区已满
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
ifraceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
ifep !=nil{
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
ifc.recvx == c.dataqsiz {
c.recvx =0
}
c.sendx = c.recvx// c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem =nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
ifsg.releasetime !=0{
sg.releasetime = cputicks()
}
// 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。
goready(gp, skip+1)
}
1、如果无缓冲区,那么直接从sender接收数据;
2、如果缓冲区已满,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部;
3、最后调用goready函数将等待发送数据的Goroutine的状态从_Gwaiting置为_Grunnable,等待下一次调度。
下图示意了当缓冲区已满时的处理过程:
缓冲区buf中还有数据
如果缓冲区buf中还有元素,那么就走正常的接收,将从buf中取出的元素拷贝到当前协程的接收数据目标内存地址中。值得注意的是,即使此时channel已经关闭,仍然可以正常地从缓冲区buf中接收数据。这种情况比较简单,示意图就不画了。
阻塞接收
如果是阻塞模式,且当前没有数据可以接收,那么就需要将当前Goroutine打包成一个sudog加入到channel的等待接收队列recvq中,将当前Goroutine的状态置为_Gwaiting,等待唤醒。示意图如下:
如果之后当前Goroutine被调度器唤醒,则执行清理工作并最终释放对应的sudog结构体。
关闭
说完收发数据,最后就是关闭channel了:
funcclosechan(c *hchan){
// nil channel检查
ifc ==nil{
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 已关闭的channel不能再次关闭
ifc.closed !=0{
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
ifraceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 设置关闭状态为1
c.closed =1
varglist glist
// release all readers
// 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for{
sg := c.recvq.dequeue()
ifsg ==nil{
break
}
ifsg.elem !=nil{
typedmemclr(c.elemtype, sg.elem)
sg.elem =nil
}
ifsg.releasetime !=0{
sg.releasetime = cputicks()
}
gp := sg.g
gp.param =nil
ifraceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for{
sg := c.sendq.dequeue()
ifsg ==nil{
break
}
sg.elem =nil
ifsg.releasetime !=0{
sg.releasetime = cputicks()
}
gp := sg.g
gp.param =nil
ifraceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度
for!glist.empty() {
gp := glist.pop()
gp.schedlink =0
goready(gp,3)
}
}
1、关闭channel时,会遍历recvq和sendq(实际只有recvq或者sendq),取出sudog中挂起的Goroutine加入到glist列表中,并清除sudog上的一些信息和状态。
2、然后遍历glist列表,为每个Goroutine调用goready函数,将所有Goroutine置为_Grunnable状态,等待调度。
3、当Goroutine被唤醒之后,会继续执行chansend和chanrecv函数中当前Goroutine被唤醒后的剩余逻辑。
总结
总结一下,本文先通过channel的基本用法对channel的定义、用法细节进行了介绍,然后对channel的基本操作包括发送、接收和关闭进行了较为详细和深入的探究。细心的读者应该也会发现channel的操作跟协程的调度关系密切,不过这篇文章关于goroutine的调度只是一笔带过,后续时机成熟会对这部分内容再作探究。
以上内容都是我自己的一些感想,分享出来欢迎大家指正,顺便求一波关注
作者:Turling_hu
链接:https://juejin.im/post/5decff136fb9a016544bce67
来源:掘金
有疑问加站长微信联系(非本文作者)