这篇文章较长将会分两篇来翻译。原文地址:https://blog.golang.org/pipelines
介绍
Go并发模型使构建能有效利用IO和多核CPU的实时流式数据的pipeline非常方便。这篇文章将对此进行介绍,同时会着重强调一些在实践中的易犯错误以及对应的解决方法。
什么是Pipeline
在GO中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多的goroutine。这些goroutine
- 从输入channel接收数据
- 对数据作相应处理,例如在此基础上产生新数据
- 再通过输出channel把数据发送出去
除了开始和结束,每个阶段都会包含任意多个输入和输出channel。开始阶段只有输出channel,结束阶段只有输入channel。相应地,开始阶段可被称为生产者,结束阶段可被称为消费者。
我们先通过一个简单的例子来说明。
并发计算平方数
首先来举一个三阶段pipeline的例子
第一阶段,创建输入参数为可变长int整数的gen
函数,它通过goroutine发送所有输入参数,并在发送完成后关闭相应channel:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
reutrn out
}
第二阶段,sq函数,负责从输入channel中接收数据并作平方处理再发送到输出channel中。在输入channel关闭并把所有数据都成功发送至输出channel,关闭输出channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
主函数main
中创建了pipeline,并执行了最后阶段的任务,从管道中接收了第二阶段的数据并打印了出来:
func main() {
// Set up the pipeline
c := gen(2, 3)
out := sql(c)
// Consume the output
fmt.Println(<-out)
fmt.Println(<-out)
}
此处sq
函数的输入和输出参数为相同类型的channel,因此我们可以对其进行组合。重写main
函数,如下:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n)
}
}
此处相等于在pipeline中增加了一个阶段,即涉及到了三个阶段,其中2、3阶段的goroutine由同一函数产生。
Fan-out和Fan-in (扇出和扇入)
多个函数可同时从同一个channel中读取数据,直到channel关闭,称为fan-out
。这为我们提供了一种将任务分发给多个worker的途径,从而实现CPU和I/O的高效利用。
通过多路复用技术将多个channel合并到单个channel实现从多个输入读取数据的能力,只有当所有的输入都关闭,才会停止数据的读取。这个称作 fan-in
。
重写之前的main
,我们调用两次sq
,且两次都从同一个channel中读取数据。我们将引入一个新的函数,通过fan-in方式获取数据:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in
c1 := sq(in)
c2 := sq(in)
// Comsume the merged output from c1 and c2
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函数通过为每个输入channel启动一个goroutine实现将数据发送同一个channel中,从完成将channel列表转化为单个channel的功能。一旦所有的输出channel(生产者)启动,merge
就会启动一或多个goroutine接收所有数据并在结束后关闭对应channel。
在已关闭的channel发送数据会导致panic,因此保证关闭channel前所有数据都发送完毕是非常重要的。sync.WaitGroup
为我们提供了一种实现该同步的方式。示例如下:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs)
for _, c := range(cs) {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
goroutine突然停止
一些使用原则:
- 在数据发送完毕后,关闭输出channel;
- 持续不停地从输入中接收数据,直到channel关闭;
我们可使用for循环来接收数据,一旦数据所有数据接收完毕将会自动退出。
但在真实场景中,并非所有情况都需要接收完所有的数据。有时设计如此,我们只需一部分的数据并可运行。更常见地,如果输入早早就抛出了一个错误,这时该阶段便会早早地退出。还有一些情况,如接收者不再等待数据接收,此时也需停止数据的生产。
在我们的例子中,如果一个阶段没有成功接收完数据就退出,我们的goroutine仍会尝试发送数据,这将会导致channel进入无限期的阻塞。
func main() {
// Consume the first value from the output
out := merge(c1, c2)
fmt.Println(<-out)
return
// Since we didn't receive the second value from out,
// one of the output goroutine is hung attempting to send it
}
这是资源泄露:goroutine会继续消耗内存、运行时资源,而且在栈中的堆引用也不能被回收。goroutine必须退出,才能启动垃圾回收机制。
当下游不能完全接收所有数据时,我们需要准备将上游goroutine退出。一种方式,我们可以把上游的channel设定为一个buffer。当buffer还有空间时,发送操作将会立刻完成。
c := make(chan int, 2)
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
当channel被创建时,如果我们已经知道数据的大小,可以如此来简化我们的代码。比如,我们重写gen函数,拷贝数据到buffer channel中,可以避免创建新的goroutine。
func gen(nums ...int) int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
我们可以考虑给merge函数输出channel指定固定大小空间。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan, int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
}
虽然这解决了程序中goroutine阻塞的问题,但并不是好代码。指定buffer的长度我们需知道merge将接收值的数量。如果下游读取少量数据便结束,依然会阻塞。
我们需要一种方式,使下游通知上游以表明它们不再接收信息。