英文原文:https://blog.golang.org/pipelines
#### 简介
go语言的并发原语支持使得开发者构建流式数据`pipeline(管道)`变得容易,这些管道可以有效的利用I/O和多cup。本文介绍了这类pipeline的示例,重点介绍了操作失败时出现的细微差别,和如何优雅的处理错误的技术
#### What is a pipeline
go语言中没有对`pipeline`正式的定义;`pipeline`只是多种并发模式中的一种。通常而言,`pipeline`是由`channel`连接的一系列阶段,其中每个阶段是由运行相同方法的`goroutine`。在每个阶段中,`goroutine`会完成
- 从`上游channel`接收值
- 对数据进行一些处理,通常会产生新的值
- 向`下游channel`发送值
除第一个阶段和最后一个阶段外,所有的阶段中都可以有任意数量的`上游channel`和`下游channel`,它们仅有`上游channel`或`下游channel`。第一个阶段通常也被称为`生产者`或`源`,最后一个阶段则被称为`消费者`或`接收器`
下面我们将从一个简单的`pipeline`开始来解释这些思想和技术。后面,我们会提供一个更现实的示例。
#### Squaring numbers
构建一个具有三个阶段的管道
对于第一个阶段,gen函数,它将一个整数列表转化为一个`channel`,然后启动一个`goroutine`将整数列表中的数据向`channel`中发送,并在所有值发送完成后关闭`channel`
```golang
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
}()
return out
}
```
在第二个阶段中,sq函数,从`上游channel`中接收整数,并返回一个新的`channel`作为`下游channel`,并向`下游channel`发送接收到的整数的平方。在`上游channel`关闭后,且该阶段所有的值都发送到`下游channel`后,关闭`下游channel`:
```golang
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
```
main函数启动`pipeline`并运行最后阶段:从第二阶段接收值,并打印出来,直到关闭`channel`:
```golang
func main() {
// 组装pipeline
c := gen(2, 3)
out := sq(c)
// 消费输出
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
```
由于sq的`上游channel`和`下游channel`是相同类型,所以我们还能进行任意次的组合(译者:套娃警告)。另外,main函数也可以使用`for range`循环:
```golang
func main() {
// 组装pipeline 并消费 输出
for n := range sq(sq(gen(2,3))) {
fmt.Println(n) // 16\n18
}
}
```
#### Fan-out, fan-in
多个方法从同一个通道读取数据,直到通道关闭,这种方式被称为`fan-out`。这提供了一种新的分配方式,可以帮助我们并行的使用cpu和I/O。
一个方法可以从多个`上游channel`读取数据,并合并为一个`下游channel`,并且在所有的`上游channel`关闭后,将`下游channel`关闭,这种方式被称为`fan-in`
我们改变一下上面的管道组装方式,运行两个sq实例,每个实例均从同一个`channel`中读取。这里需要引入一个新的函数:`merge`,来实现`fan-in`打印出结果:
```golang
func main() {
in := gen(2, 3)
// 启动两个sq函数区读取in
c1 := sq(in)
c2 := sq(in)
// 消费c1、c2合并后的输出
for n:= range merge(c1, c2) {
fmt.Println(n) // 4\n9 或 9\n4
}
}
```
`merge`函数为每个`上游channel`启动了一个`goroutine`来将所有的`上游channel`转为为单个`channel`,`goroutine`将值复制到唯一的`下游channel`。在所有负责复制值发送至`下游channel`的goroutine`启动后,再启动一个`goroutine`等待所有`上游channel`数据转发完毕后关闭`下游channel`
向一个close的`channel`发送数据将导致`panic`,所以必须保证调用close之前所有发送已完成。`sync.WaitGroup`提供保持这种同步的简单方式:
```golang
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out,直到c关闭后调用wg.Done
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 启动一个goroutine负责在所有输出goroutine完成后关闭out
// 必须在wg.Add后调用
go func() {
wg.Wait()
close(out)
}()
return out
}
```
未完....
有疑问加站长微信联系(非本文作者))