我们创建了一大堆线程, 现在我们想要实现线程间的同步, 这其中的关键就是chan(通道)的使用, 如果没有通道, 你应该怎么去做线程间同步呢? time.Sleep吗?
Introduction
type hchan struct {
dataq_size uint // 缓冲槽大小
buf unsafe.Pointer // 缓冲槽本体
elem_type *_type // 槽内数据类型
}
复制代码
缓冲槽的工作方式就是上图那样, 每当你往通道里写消息, 消息会先存到缓冲槽里, 而后才被取出来. 这种常规的工作模式也叫异步模式, 因为收发工作不是同步进行的, 你可以先发, 发完你走人, 随后收件人再去管道里取.
同样还有一种用法是不设置缓冲槽, 或者说你直接把缓冲槽大小设置成0, 这种工作模式下, 收发双方必须同时守在管道旁, 否则先到的人一定会堵塞在管道哪儿, 等待后到的人, 然后通信才能开始, 这种也叫同步模式
仔细想一想通道给我们带来了什么, 如果将通道的功能点拆开, 通道的核心功能点就是: 可以在两个不同的G之间相互发消息, 同时还有一套阻塞/唤醒的机制.
通道的工作模式
首先分析一个关键点, 你需要知道有哪些G正守着这个通道, 然后把管道里的参数拷贝给这些堵塞着的G, 最后去解除他们的阻 . 有了这个前提条件, 通道工作围绕的对象就一定是G,
type hchan struct {
recv_q waitq // 接收者队列
send_q waitq // 发送者队列
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g // 想要收发消息的g本体
elem unsafe.Pointer // 要发的消息本体
}
复制代码
每个通道都会维护一个发送者队里以及一个接收者队列, 队列里的元素是一个包装过的G(G本体+消息). 我们通过一个发送与接收的过程, 先说说同步通道是如何工作的, 异步通道于此类似
收发 - 同步模式
chan <- data
的操作会被编译器翻译成去执行chansend
函数, 执行的对象是名为chan
的通道, 携带一个消息结构体. 检查chan的缓冲槽长度为0, 进入通道的同步模式工作:
- 找一个接收者, 检查chan的接收者队列
- chan.recv_q为空, 按照同步队列的工作模式, 因为没人接消息, 这个G必须阻塞(沉睡), 你这个G, 连同你的消息, 一起被打包成一个sudog, 放到chan的发送者队列里
- 然后通过gopark把你放入沉睡模式
- 但如果有接收者, 取出这个sudog对象, 然后把这条消息拷贝到sudog.elem里面, 这代表我这条消息已经发给你了.
- 最后因为这个接收者之前因为没收到消息一直在沉睡中, 通过goready唤醒
ok以上说明了几件事, 通道是如何知道消息应该发给谁, 消息是怎么发送过去的, 以及我们看到的阻塞效果是怎么实现的.
仔细想想, 这阻塞? 在某种条件达到以后自动解除阻塞? 这个场景好像在哪里见过? 你小子在暗示sync.WaitGroup
!! 等wg.Count
变成0了之后自动解除阻塞, wg使用的阻塞效果也正是通过gopark实现的! 这种沉睡/阻塞最大的特点就是, 某个G被放入沉睡以后, 必须由你手动唤醒, 在我们的场景中这个条件就是找到了接收方, 在wg的场景中这个条件就是wg.Count变成0了, 条件一命中, 我手动立刻帮你唤醒并解除阻塞.
收发 - 异步模式
想象一下异步模式与同步模式的区别在哪? 唯一的区别, 仅仅是在管道填满了才会产生堵塞, 不然你发完/收完就走人.
剩下来原理基本一样, chan <- data
操作同样被翻译成chansend
函数的调用, 发现缓冲槽大小不为0以后进入异步工作模式, 开始检查缓冲槽的剩余舱位
- 如果还有剩余舱位, 将消息通过memmove拷贝到管道内, 然后如果发现还有接收者正在堵塞中, 通过goready唤醒
- 如果没有剩余舱位, 通过gopark进入休眠模式, 在被唤醒以后检查有没有数据
关闭
到了这儿你已经对这一套工作模式非常了解了, 所谓的关闭其实就是遍历通道的发送者队列+接收者队列, 在他们的数据区发送一条nil消息, 然后执行goready唤醒他们中的每一个人
Select的工作模式
经常与通道一起出现的就是Select, Select的功能只是: 从所有的通道case中随机挑一个能用的, 否则就一直堵塞直到出现一个能用的. 我们解析一下这种特性是怎么做到的.
随机序
type hselect struct {
ncases uint16 // 总数
poll_order *uint16 // 随机序号
cases []scase // 按照初始化顺序的case队列
}
type scase struct {
c *hchan // case的本体, 一个通道
kind uint16 // 通道类型
}
复制代码
一个select在初始化的时候, 然后把所有的通道从chan类型包装成scase
类型, 添加上一个字段叫做Kind,这个字段可以是"接收者通道"/"发送者通道", 最后还有一个"default"类型通道, 表明这是一个default case.
然后会生成一个随机序号存到poll_order字段中去, 这代表一个随机数, 然后等程序运行到select的位置的时候, 调用select_go
函数, 开始找可以用的通道:
for i,_ := range [0...ncases] {
random_case_id := poll_order[i]
random_chanel := cases[random_id]
if check(random_chanel) {
return
}
}
if default_case != nil {
execute(default_case)
return
}
复制代码
我们按照以上的方法去执行随机序, 在所有的case都遍历完了以后, 如果没用能用的, 检查有没有能用的default用
沉睡的select
我们已经知道通道的沉睡与唤醒是怎么实现的, 针对select有意思的一点是, 如果子通道被唤醒, 则自己这个selectG也同时被唤醒了. 这点很神奇, 怎么做到的
for i,_ := range [0...ncases] {
random_case_id := poll_order[i]
random_chanel := cases[random_id]
if random_chanel.kind == recv_chanel {
random_chanel.recvq.append(selG)
}
if random_chanel.kind == send_chanel {
random_chanel.sendq.append(selG)
}
}
gopark(selectG)
复制代码
同样的, 我们也是遍历select下的所有通道, 把自己添加到通道的消息队列中去
- 如果是它是发送类型通道, 那就在它的发送者队列中添加自己这个selectG
- 如果是它是接收类型通道, 那就在它的接收者队列中添加自己这个selectG
想一想这样做会发生什么, 自己这个SelectG协程会同时出现在很多通道的消息队列里, 其中任何一个通道被goready
唤醒的时候, 自己这个SelectG也会被通知到, 自己也会跟这个通道一起被唤醒. 这就实现了select会一直堵塞直到其中任何一个通道畅通为止的特性
有疑问加站长微信联系(非本文作者)