上篇文章讲了 channel
的基本使用,讲了一些使用时需要注意的事项,本文将重点介绍 channel
中的两个数据结构:循环队列 与 双端链表 。
channel 的需求描述
为了理解这些数据结构解决了什么问题,我们先来做个简单的回顾,看看为什么需要这两个数据结构,他们解决了什么问题。我们知道 goroutine 是用户态的线程,不同的 goroutine 之间是有消息传递这个需求的。在原始的进程与线程(系统线程)编程中我们会采用管道的方式,而 channel
就是用户态线程传递消息的管道实现,并且是类型安全的。
既然 channel
是一个管道,用来满足不同 goroutine 间交换消息的。那么实现这样一个管道要怎么做呢?
来看看我们日常传递消息的需求:
能够有多个 goroutine 向 channel
进行读写,并且保证没有竞争问题,需要用 队列 来管理阻塞的 goroutine,解决竞争问题;需要管理发送到 channel
的消息(有缓冲、无缓冲),对于有缓存的channel
可以采用 循环队列 来管理多个消息。
当然上面的需求是经过简化的,比如 channel
还需要具备阻塞、唤醒 goroutine 的能力,不过为了本文我们更加专注焦点问题,先只关注上面两个问题。
channel 的数据结构
接下来我们分析一下 channel
在实际运行中,它的结构体是怎么样的。当然这又分为两种类型,有缓冲与无缓冲的。我们先来看一个无缓冲的情况。
无缓冲
先把示例代码贴出来。就是两个读的 goroutine 被阻塞在一个无缓冲的 channel 上。
func main() {
ch := make(chan int) // 无缓冲
go goRoutineA(ch)
go goRoutineB(ch)
ch <- 1
time.Sleep(time.Second * 1)
}
func goRoutineA(ch chan int) {
v := <-ch
fmt.Printf("A received data: %d\n", v)
}
func goRoutineB(ch chan int) {
v := <-ch
fmt.Printf("B received data: %d\n", v)
}
复制代码
来看看当代码执行到 ch <- 1
这一行之后 channel
的结构体被填充成什么样子了!
注意其中 buf
字段可存储的长度是0,这是因为 无缓冲 channel 不会用到循环队列来存储数据。它一定是等读、写 goroutine 都准备好了,然后直接把数据交给对方。我们用一副图来看一下无缓冲的数据交换过程。
上图描述的是数据交换过程,再看一下读 goroutine 被阻塞的结构示意图。被阻塞的 goroutine 会挂载到对应的队列上,该队列是一个双端队列。
上面的例子,由于两个读 goroutine 在启动的时候,写还没有准备好,因此读全部被挂起在队列中;当有写goroutine准备好的时候,由于此时读已经就绪,因此写不会阻塞,挂起放到 sendq
中。大家可以修改上面的代码,自己看一下写阻塞,读立马执行的情况。
有缓冲
我们将上面的代码改成有缓冲的通道,然后再来看看有缓冲的情况。
func main() {
ch := make(chan int, 3) // 有缓冲
// 都不会阻塞
ch <- 1
ch <- 2
ch <- 3
// 会阻塞,被挂起到 sendq 中
go func() {
ch <- 4
}()
// 只是为了debug
var a int
fmt.Println(a)
go goRoutineA(ch)
go goRoutineA(ch)
go goRoutineB(ch)
go goRoutineB(ch) // 猜猜这里会被挂起吗?
time.Sleep(time.Second * 2)
}
func goRoutineA(ch chan int) {
v := <-ch
fmt.Printf("A received data: %d\n", v)
}
func goRoutineB(ch chan int) {
v := <-ch
fmt.Printf("B received data: %d\n", v)
}
复制代码
贴出执行到第一行的 go goRoutineA(ch)
时, hchan
的结构填充情况。
在这里可以看到缓冲的大小是3,由于增加了缓冲,只要写 goroutine 没有把缓冲写满,则不会导致协程阻塞。但是一旦缓冲没有多余的空间,则会把写 goroutine 挂起到 sendq
中,直到有空间时将他唤醒(还有其它唤醒的场景,这一略过)。
其实有缓冲的 channel,就是把同步的通信变为了异步的通信。写的 channel 不需要关注读 channel,只要有空间它就写;而读也一样,只要有数据就正常读就可以,如果没有就挂起到队列中,等待被唤醒。下图形象的展示了有缓冲 channel 是如何交换数据的。
我们再来用图的形式看一下此时结构体的样子,这里图有些偷懒,只是在上面图的基础上增加了循环队列部分的描述,实际到该例子中,读 goroutine时不会被阻塞的,看的时候需要注意这一点。
循环队列
今天最重要的是理解 channel 中两个关键的数据结构。为了下一讲阅读源码做准备,我把 channel 中的循环队列部分的代码抽象出来了。
// 队列满了
var ErrQFull = errors.New("circular is full")
// 没有值
var ErrQEmpty = errors.New("circular is empty")
// 定义循环队列
// 如何确定队空,还是队满?q.sendx = (q.sendx+1) % q.dataqsiz
type queue struct {
buf []int // 队列元素存储
dataqsiz uint // circular 队列长度
qcount uint // 有多少元素在buf中 qcount = len(buf)
sendx uint // 可以理解为队尾指针,向队列写入数据
recvx uint // 可以理解为队头指针,从队列读取数据
}
func makeQ(size int) *queue {
q := &queue{
dataqsiz: uint(size),
buf: nil,
}
q.buf = make([]int, q.dataqsiz)
return q
}
// 向buf中写入数据
// 请看 chansend 函数
func (c *queue) insert(ele int) error {
// 检查队列是否有空间
if c.dataqsiz > 0 && c.qcount == c.dataqsiz {
return ErrQFull
}
// 存入数据
c.buf[c.sendx] = ele
c.sendx++ // 尾指针后移
if c.sendx == c.dataqsiz { // 如果相等,说明队列写满了,sendx放到开始位置
c.sendx = 0
}
c.qcount++
return nil
}
// 从buf中读取数据
func (c *queue) read() (int, error) {
// 队列中没有数据了
if c.dataqsiz > 0 && c.qcount == 0 {
return 0, ErrQEmpty
}
ret := c.buf[c.recvx] // 取出元素
c.buf[c.recvx] = 0
c.recvx++
if c.recvx == c.dataqsiz { // 如果相等,说明写到末尾了,recvx放到开始位置
c.recvx = 0
}
c.qcount--
return ret, nil
}
复制代码
上面的代码基本上就是 channel
的循环队列部分的实现。这个队列的实现与我们平常实现的循环队列稍微有些不一样。一般我们为了方便判空,会浪费一个buf的空间来方便判空,公式是: (tail+1)%n=head
;但是在 channel 这里的循环队列,由于有了一个循环队列元素的计数,确保了这个空间不会被浪费,并且同时也能够满足 O(1) 时间复杂度计算有缓冲 channel 元素个数。
总结
总结一下今天的主要信息。
channel 中用到了两个数据结构:循环队列 和 双端链表; 循环队列 只有在有缓冲 channel 中才会使用,它主要是做为消息的缓冲、保证消息的有序性; 双端链表 是用来挂起阻塞的读、写 goroutine 的,在被唤醒时会按照入队顺序公平的进行通知; 无缓冲的 channel 不会用到 循环队列 相关的结构,它必须读写 goroutine 都准备好后才能进行消息交换; 做为缓冲消息的 循环队列 通过一个当前元素个数字段的标记,避免了浪费一个数据空间。
下一章我们就尝试阅读一下 channel
的源码,想要尝试录制一个视频来讲这部分源码!
参考资料
个人公众号:dayuTalk
GitHub:github.com/helei112g
有疑问加站长微信联系(非本文作者)