欢迎来到 Golang 系列教程的第 23 篇。
什么是缓冲信道?
在上一教程里,我们讨论的主要是无缓冲信道。我们在信道的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。
我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
通过向 make
函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。
ch := make(chan type, capacity)
要让一个信道有缓冲,上面语法中的 capacity
应该大于 0。无缓冲信道的容量默认为 0,因此我们在上一教程创建信道时,省略了容量参数。
我们开始编写代码,创建一个缓冲信道。
示例一
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:
naveen
paul
示例二
我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ch
,而第 17 行把 ch
传递给了 write
协程。接下来 Go 主协程休眠了两秒。在这期间,write
协程在并发地运行。write
协程有一个 for 循环,依次向信道 ch
写入 0~4。而缓冲信道的容量为 2,因此 write
协程里立即会向 ch
写入 0 和 1,接下来发生阻塞,直到 ch
内的值被读取。因此,该程序立即打印出下面两行:
successfully wrote 0 to ch
successfully wrote 1 to ch
打印上面两行之后,write
协程中向 ch
的写入发生了阻塞,直到 ch
有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ch
,打印出了读取到的值后又休眠两秒,这个循环一直到 ch
关闭才结束。所以该程序在两秒后会打印下面两行:
read value 0 from ch
successfully wrote 2 to ch
该过程会一直进行,直到信道读取完所有的值,并在 write
协程中关闭信道。最终输出如下:
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
死锁
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
长度 vs 容量
缓冲信道的容量是指信道可以存储的值的数量。我们在使用 make
函数创建缓冲信道的时候会指定容量大小。
缓冲信道的长度是指信道中当前排队的元素个数。
代码可以把一切解释得很清楚。:)
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:
capacity is 3
length is 2
read value naveen
new length is 1
WaitGroup
在本教程的下一节里,我们会讲到工作池(Worker Pools)。而 WaitGroup
用于实现工作池,因此要理解工作池,我们首先需要学习 WaitGroup
。
WaitGroup
用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup
来实现。
理论说完了,我们编写点儿代码吧。:)
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroup 是一个结构体类型,我们在第 18 行创建了 WaitGroup
类型的变量,其初始值为零值。WaitGroup
使用计数器来工作。当我们调用 WaitGroup
的 Add
并传递一个 int
时,WaitGroup
的计数器会加上 Add
的传参。要减少计数器,可以调用 WaitGroup
的 Done()
方法。Wait()
方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。
上述程序里,for 循环迭代了 3 次,我们在循环内调用了 wg.Add(1)
(第 20 行)。因此计数器变为 3。for 循环同样创建了 3 个 process
协程,然后在第 23 行调用了 wg.Wait()
,确保 Go 主协程等待计数器变为 0。在第 13 行,process
协程内调用了 wg.Done
,可以让计数器递减。一旦 3 个子协程都执行完毕(即 wg.Done()
调用了 3 次),那么计数器就变为 0,于是主协程会解除阻塞。
在第 21 行里,传递 wg
的地址是很重要的。如果没有传递 wg
的地址,那么每个 Go 协程将会得到一个 WaitGroup
值的拷贝,因而当它们执行结束时,main
函数并不会知道。
该程序输出:
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)
工作池的实现
缓冲信道的重要应用之一就是实现工作池。
一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。
我们工作池的核心功能如下:
- 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
- 将作业添加到该输入型缓冲信道中。
- 作业完成后,再将结果写入一个输出型缓冲信道。
- 从输出型缓冲信道读取并打印结果。
我们会逐步编写这个程序,让代码易于理解。
第一步就是创建一个结构体,表示作业和结果。
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
所有 Job
结构体变量都会有 id
和 randomno
两个字段,randomno
用于计算其每位数之和。
而 Result
结构体有一个 job
字段,表示所对应的作业,还有一个 sumofdigits
字段,表示计算的结果(每位数字之和)。
第二步是分别创建用于接收作业和写入结果的缓冲信道。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
工作协程(Worker Goroutine)会监听缓冲信道 jobs
里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 results
。
如下所示,digits
函数的任务实际上就是计算整数的每一位之和,最后返回该结果。为了模拟出 digits
在计算过程中花费了一段时间,我们在函数内添加了两秒的休眠时间。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
然后,我们写一个创建工作协程的函数。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
上面的函数创建了一个工作者(Worker),读取 jobs
信道的数据,根据当前的 job
和 digits
函数的返回值,创建了一个 Result
结构体变量,然后将结果写入 results
缓冲信道。worker
函数接收了一个 WaitGroup
类型的 wg
作为参数,当所有的 jobs
完成的时候,调用了 Done()
方法。
createWorkerPool
函数创建了一个 Go 协程的工作池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 wg.Add(1)
方法,于是 WaitGroup
计数器递增。接下来,我们创建工作协程,并向 worker
函数传递 wg
的地址。创建了需要的工作协程后,函数调用 wg.Wait()
,等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 results
信道。因为所有协程都已经执行完毕,于是不再需要向 results
信道写入数据了。
现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
上面的 allocate
函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 Job
结构体变量。这个函数把 for 循环的计数器 i
作为 id,最后把创建的结构体变量写入 jobs
信道。当写入所有的 job
时,它关闭了 jobs
信道。
下一步是创建一个读取 results
信道和打印输出的函数。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
result
函数读取 results
信道,并打印出 job
的 id
、输入的随机数、该随机数的每位数之和。result
函数也接受 done
信道作为参数,当打印所有结果时,done
会被写入 true。
现在一切准备充分了。我们继续完成最后一步,在 main()
函数中调用上面所有的函数。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
我们首先在 main
函数的第 2 行,保存了程序的起始时间,并在最后一行(第 12 行)计算了 endTime
和 startTime
的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来做一点基准指标(Benchmark),所以需要这么做。
我们把 noOfJobs
设置为 100,接下来调用了 allocate
,向 jobs
信道添加作业。
我们创建了 done
信道,并将其传递给 result
协程。于是该协程会开始打印结果,并在完成打印时发出通知。
通过调用 createWorkerPool
函数,我们最终创建了一个有 10 个协程的工作池。main
函数会监听 done
信道的通知,等待所有结果打印结束。
为了便于参考,下面是整个程序。我还引用了必要的包。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
为了更精确地计算总时间,请在你的本地机器上运行该程序。
该程序输出:
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。
现在我们把 main
函数里的 noOfWorkers
增加到 20。我们把工作者的数量加倍了。由于工作协程增加了(准确说来是两倍),因此程序花费的总时间会减少(准确说来是一半)。在我的例子里,程序会打印出 10.004364685 秒。
...
total time taken 10.004364685 seconds
现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。你们可以练习一下:在 main
函数里修改 noOfJobs
和 noOfWorkers
的值,并试着去分析一下结果。
本教程到此结束。祝你愉快。
上一教程 - 信道
下一教程 - Select
via: https://golangbot.com/buffered-channels-worker-pools/
作者:Nick Coghlan 译者:Noluye 校对:polaris1119
本文由 GCTT 原创翻译,Go语言中文网 首发。也想加入译者行列,为开源做一些自己的贡献么?欢迎加入 GCTT!
翻译工作和译文发表仅用于学习和交流目的,翻译工作遵照 CC-BY-NC-SA 协议规定,如果我们的工作有侵犯到您的权益,请及时联系我们。
欢迎遵照 CC-BY-NC-SA 协议规定 转载,敬请在正文中标注并保留原文/译文链接和作者/译者等信息。
文章仅代表作者的知识和看法,如有不同观点,请楼下排队吐槽
有疑问加站长微信联系(非本文作者))

func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ { randomno := rand.Intn(999) job := Job{i, randomno} jobs <- job } close(jobs) } 如果for循环结束后,jobs里面的数据worker还来不及取走,这时执行到close,会不会导致works取数据失败?或者取不到足额的任务? 谢谢!!
这里
close
和关闭文件的概念不同。golang 里的
close
只是用于通知信道的接收方,所有数据都已经发送完毕,信道没有真正关闭。若用
for range
接收数据时,对于关闭了的信道,会接收完剩下的有效数据,并退出循环。如果没有close
提示数据发送完毕的话,for range
会接收完剩下所有有效数据后发生阻塞。所以接收方
worker
是可以把jobs
剩下的数据取走的。后面垃圾收集器会自动回收掉该信道的内存。可以参见《the go programming language》的说法:
不会的,close相当于给这个chan掷了一个标志,forrange会自动把chan里的数据取完,如果不用forrange而是用<-chan的话取到的则是零值,并且会有个ok的bool返回
个人认为done这个chan是多余的,在createWorkerPool中的wait时已经阻塞了
这个是用来等待消费者,也就是等待 go result 协程去完成任务以后才可以推出程序,否则可能出现提前退出的情况,也就是还没有消费完,go 主进程已经结束(守护进程)
createWorkerPool 里面的等待只是用来等待一些列子协程完成各自任务,完成任务并投放到results通道
sync.Pool
了解一哈哦对是这样
你好!若是把 第68行 go result(done) 放在 createWorkerPool 之后调用,会发生死锁,是因为 wg.Wait() 阻塞了 导致 go result(done) 执行不了 引起的吗?
因该是吧,createworkerpool不是协程,main函数走进去,创建完所有worker协程后走到这就阻塞了,而result只有写入,没人读取,就死锁了吧
由于 1.results的缓冲区只有10条 2.createworkerpool 阻塞了后续读取结果操作 所以造成了死锁 你可以尝试 把 results 的缓冲区设置为100 这样就不会死锁了
我有一个问题比较困惑,一个程序跑下来会有很多不同的地方通过go xxMethod()来启动协程, 但是应该在哪个上面用 sync.WaitGroup 呢? 有没有一句话可以总结的规律?
done这个chan是多余的,for result := range results {}这个已经可以判断 results close的状态结速
9楼 @zhqu1148980644 是的,如果修改成
go createWorkerPool(noOfWorkers)
之后go result(done)
就可以挪到最后了,但是done这个chan就必须要有,否则主进程会退出是的,这个我觉得本意可能是要应对
go createWorkerPool(noOfWorkers)
,这样的话这个done才生效这样
go allocate(noOfJobs)
noOfWorkers := 20
go createWorkerPool(noOfWorkers)
result()
就不需要done这个chan了,在result里已经通过for result := range results
阻塞了那个
<-done
应该是这个样子的go result(done)
的位置不对为什么信道done是有必要的,不能去掉?
<-done肯定不是多余的。 这里的go result(done)函数是在另一个协程中进行的,只会阻塞住当前的协程,不会阻塞主协程。所以12楼14楼@skyfly 解释不正确。 15楼通过去掉result的go让其阻塞主协程也ok。不过没有解释到<-done的原因。 ... ...
文中代码尝试去掉 <-done还会运行成功0~99的id数据,主要原因是跑在主协程的createWorkerPool(noOfWorkers)中wait函数会等所有携程执行完毕,而所有携程执行完毕则表示所有的数据已经从 jobs搬运到results中。 由于go result(done)函数执行很快没什么耗时,所以go result(done)协程几乎是和主协程同步完成。
那是不是在go result(done)模拟耗时场景就能复现问题了? bingo 函数提前结束,而协程go result(done)还未执行完毕。
问题场景复现如下:
1、去掉30行代码time.Sleep(2 time.Second) 主要是为了快速调试减少不必要耗时
2、58行后添加time.Sleep(1 time.Second) 模拟耗时场景
3、65行代码noOfJobs := 12 原因同1
此时观察去掉<-done不阻塞的情况下,主协程会提前结束。
加上<-done, 会打印12条数据,程序正常。
有一点一直绕不过来弯,jobs与results的容量都是10,最后怎么能打印出来一百条数据的,超过10条不就应该死锁了吗
初学者。感谢您提供的工作池的教程。确实是学到了!
但是初看您的教程简直可以用一脸懵逼来形容我自己。只有自己在打完代码执行后才能完全理解您的意思。我发现,如果将函数的书写顺序改变一下,应该可以更好地帮助其他初学者的理解其中的含义。
以下是我的拙见:
茅塞顿开