英文原文:https://blog.golang.org/pipelines
简介
go语言的并发原语支持使得开发者构建流式数据pipeline(管道)
变得容易,这些管道可以有效的利用I/O和多cup。本文介绍了这类pipeline的示例,重点介绍了操作失败时出现的细微差别,和如何优雅的处理错误的技术
What is a pipeline
go语言中没有对pipeline
正式的定义;pipeline
只是多种并发模式中的一种。通常而言,pipeline
是由channel
连接的一系列阶段,其中每个阶段是由运行相同方法的goroutine
。在每个阶段中,goroutine
会完成
- 从
上游channel
接收值 - 对数据进行一些处理,通常会产生新的值
- 向
下游channel
发送值 除第一个阶段和最后一个阶段外,所有的阶段中都可以有任意数量的上游channel
和下游channel
,它们仅有上游channel
或下游channel
。第一个阶段通常也被称为生产者
或源
,最后一个阶段则被称为消费者
或接收器
下面我们将从一个简单的pipeline
开始来解释这些思想和技术。后面,我们会提供一个更现实的示例。
Squaring numbers
构建一个具有三个阶段的管道
对于第一个阶段,gen函数,它将一个整数列表转化为一个channel
,然后启动一个goroutine
将整数列表中的数据向channel
中发送,并在所有值发送完成后关闭channel
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
}()
return out
}
在第二个阶段中,sq函数,从上游channel
中接收整数,并返回一个新的channel
作为下游channel
,并向下游channel
发送接收到的整数的平方。在上游channel
关闭后,且该阶段所有的值都发送到下游channel
后,关闭下游channel
:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main函数启动pipeline
并运行最后阶段:从第二阶段接收值,并打印出来,直到关闭channel
:
func main() {
// 组装pipeline
c := gen(2, 3)
out := sq(c)
// 消费输出
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
由于sq的上游channel
和下游channel
是相同类型,所以我们还能进行任意次的组合(译者:套娃警告)。另外,main函数也可以使用for range
循环:
func main() {
// 组装pipeline 并消费 输出
for n := range sq(sq(gen(2,3))) {
fmt.Println(n) // 16\n18
}
}
Fan-out, fan-in
多个方法从同一个通道读取数据,直到通道关闭,这种方式被称为fan-out
。这提供了一种新的分配方式,可以帮助我们并行的使用cpu和I/O。
一个方法可以从多个上游channel
读取数据,并合并为一个下游channel
,并且在所有的上游channel
关闭后,将下游channel
关闭,这种方式被称为fan-in
我们改变一下上面的管道组装方式,运行两个sq实例,每个实例均从同一个channel
中读取。这里需要引入一个新的函数:merge
,来实现fan-in
打印出结果:
func main() {
in := gen(2, 3)
// 启动两个sq函数区读取in
c1 := sq(in)
c2 := sq(in)
// 消费c1、c2合并后的输出
for n:= range merge(c1, c2) {
fmt.Println(n) // 4\n9 或 9\n4
}
}
merge
函数为每个上游channel
启动了一个goroutine
来将所有的上游channel
转为为单个channel
,goroutine
将值复制到唯一的下游channel
。在所有负责复制值发送至下游channel
的goroutine启动后,再启动一个
goroutine等待所有
上游channel数据转发完毕后关闭
下游channel向一个close的
channel发送数据将导致
panic,所以必须保证调用close之前所有发送已完成。
sync.WaitGroup`提供保持这种同步的简单方式:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out,直到c关闭后调用wg.Done
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 启动一个goroutine负责在所有输出goroutine完成后关闭out
// 必须在wg.Add后调用
go func() {
wg.Wait()
close(out)
}()
return out
}
未完....
有疑问加站长微信联系(非本文作者))
