接上一节翻译,[点此看前一部分](https://studygolang.com/articles/27916)
#### Stopping short
我们创建的管道有固定的模式:
- 所有发送操作完成后,由该阶段关闭`下游channel`
- 所有阶段均从`上游channel`接收值,直至`上游channel`关闭
这种模式允许每一个接收者使用`for range`循环读取值,并在所有值成功发送到`下游channel`后,退出当前的`goroutine`
但在真实的`pipeline`应用场景中,对应的阶段并不都会接收所有的`上游channel`发送的值。有时是因为设计的原因:接收者只需要值的子集就可以完成它的任务。还有更多情况,当`上游channel`给出了一个错误值,当前阶段需要尽快的退出。无论是那种情况,接收者都不需要等待后续的值到达,我们希望阶段尽快停止不需要的处理。
在我们的示例`pipeline`中,如果某个阶段不接收所有的`上游channel`值,则尝试发送值的`goroutine`将被阻塞:
```golang
// 仅接收第一个值
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// 因为我们只接收了第一个值
// 则负责输出的其中一个goroutine在尝试发送值时将被挂起
}
```
这里产生了资源泄漏:`goroutine`消耗内存和运行时资源,`goroutine`堆栈中的堆引用将会组织数据被gc回收。`goroutine`不会被回收,它们必须自己退出(译者注:由于主groutine退出后,进程将会结束,进程所消耗的资源将会被os回收,但实际上,这确实是一种泄漏,实际应用中,很有可能程序不会退出,这样程序跑着跑着,泄漏的问题就会出现,原文中只是一个简单的例子)
所以,即使下游阶段无法接收所有入站值的情况下,我们也需要`pipeline`的上游阶段退出。一种方式是将`下游channel`加上缓存区。缓存区可以容纳固定数量的值;如果缓存区中还有空间,发送操作将不会被阻塞:
```golang
c := make(chan int, 2) // 缓存区大小为2
c <- 1 // 立即返回
c <- 2 // 立即返回
c <- 3 // 阻塞直至其他goroutine调用 <- c 接收1
```
当通道创建时已经直到要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写`gen`函数,将整数列表写入到下游`channel`的缓冲区中,避免创建`goroutine`:
```golang
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
```
为了不返回阻塞的`goroutine`,我们可以在`merge`函数的`下游channel`添加缓存区:
```golang
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 给未处理的输入足够的缓冲区
// ... 其他部分不变 ...
```
虽然这可以避免程序中`goroutine`被阻塞,但这是错误的代码。缓冲区大小设为1是因为我们知道上下游`channel`的情况。这是非常脆弱的程序,一旦我们向`gen`方法多传递一个值,那将继续阻塞`goroutine`
相反,正确的做法应是为下游的阶段提供一种方法,告诉发送方,我们将不再接收输入
#### Explicit cancellation
当`main`函数决定不接受来自out的值直接退出时,它必须告诉上游阶段的`goroutine`放弃发送的值。我们将使用一个名为done的`channel`上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送端:
```golang
func main() {
in := gen(2, 3)
// 启动两个sq函数区读取in
c1 := sq(in)
c2 := sq(in)
// 仅接收第一个值
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 告诉上游发送者,我们将要退出
done <- struct{}{}
done <- struct{}{}
}
```
负责发送的`goroutine`将会使用`select`语句替换发送操作,`select`将在发送完成或从done接收到值时继续执行。done的值类型是空的结构体,因为发送的值没有意义,它只是作为一个通知,指示应该放弃发送。负责输出的`goroutine`将继续从`上游channel`读取数据,因此不会阻塞上游阶段(我们稍后将会讨论如何让此循环尽早返回)
```golang
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out或done接收到值后调用wg.Done
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... 其他的没有变化 ...
```
这种方式带来一个新的问题:每一个下游接收者都需要知道可能被阻塞的上游发送者的数量,并在返回前尽早的向发送着发送信号。处理这种计数是无聊且容易出错的。
我们需要在不知道数量的`goroutine`数量的情况下,通知所有`goroutine`停止向下游发送值。在go语言中,我们能使用关闭`channel`的方式来实现,因为但我们关闭一个通道时,会[在关闭的`channel`上执行接收值的操作将会立即返回,并获得对应数据的零值](https://golang.org/ref/spec#Receive_operator)
这意味着`main`方法可以通过关闭`done channel`来解除所有阻塞的发送者。这个关闭操作实际上是产生了一个广播信息。我们将扩展一下每个`pipeline`的处理函数,新增一个done作为参数,并通过defer语句来进行关闭,以便main函数以任何一种方式退出前,都会将退出信号发送给`pipeline`的所有阶段
```golang
func main() {
// 设置一个done channel,当我们退出时,关闭这个channel
// 通知所有的goroutine我们开始退出了
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 启动两个sq函数区读取in
c1 := sq(done, in)
c2 := sq(done, in)
// 仅接收第一个值
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done将会在defer调用中关闭
}
```
现在,我们的`pipeline`中的每个阶段都可以在`done channel`关闭后马上返回。而`merge`方法中调用`output`的`goroutine`可以在不读取`上游channel`的所有数据的情况下退出,因为它知道上游发送方sq将在`done channel`关闭是停止尝试发送。`output`将使用defer调用确保退出前调用`wg.Done`:
```golang
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out,直到c关闭后或done被关闭时退出,
// 调用前defer将调用wg.Done
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... 其他的没有变化 ...
```
同样,`sq`方法可以在`done channel`关闭后立刻返回。`sq`也通过defer语句来实现对关闭`out channel`:
```golang
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
```
以下是使用`pipeline`模式的一些指南:
- 所有发送操作完成后,每个阶段都应关闭其`下游channel`
- 阶段会一直从`上游channel`接收值,直到`上游channel`被关闭或发送者被阻塞为止
`pipeline`可以通过预留足够大的缓冲区来存储所有发送的数据,或者在接收方可能放弃时,显式的向发送方发送信号,解除发送方的阻塞。
#### Digesting a tree
我们来看一个更真实的`pipeline`
MD5时一种消息摘要算法(message-digest algorithm),可以用作为文件校验和(checksum)。使用命令行程序md5sum打印出来的值如下:
```shell
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
```
我们的示例程序就像md5sum一样,但是取一个目录作为参数,并打印该目录下每个普通文件的md5值,并按路径名排序
```shell
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
```
我们的程序中mian函数调用了一个辅助方法`MD5All`,它由路径名返回所有文件的md5值,然后对结果进行排序和打印:
```golang
func main() {
// 计算目录下所有的文件的md5
// 根据路径排序后打印出来
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
```
`MD5All`方法时我们讨论的重点。在[serial.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/serial.go)的实现中,不使用并发,只是遍历树时计算每个文件的md5
```golang
// MD5All读取以root为根的文件树中的所有文件,
// 并返回文件路径到文件md5值的map
// 如果目录遍历失败或任何读取失败,则MD5All返回错误
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
```
#### Parallel digestion
在[parallel.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/parallel.go)的实现中,我们将MD5All拆分为由两个阶段组成的`pipeline`。在第一个阶段中,sumFiles方法,遍历文件树,创建一个新的`goroutine`处理新的文件,并把计算结果发送到一个接收result类型的`channel`上:
```golang
type result struct {
path string
sum [md5.Size]byte
err error
}
```
`sumFiles`函数返回两个`channel`:一个用于发送result,另一个用于返回`filepath.Walk`调用产生的错误。`walk`方法启动一个新的`goroutine`来处理每个普通文件,然后检查`done`。如果`done`被关闭了,则立即返回:
```golang
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 对于每个文件,启动一个goroutine去计算文件md5,并将结果发送到c
// 在errc上发送错误
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk方法已经返回,此时所有调用都完成了wg.Add
// 当所有发送完成后,关闭c
go func() {
wg.Wait()
close(c)
}()
// 因为errc是带缓冲的,所以这里不需要select
errc <- err
}()
return c, errc
}
```
MD5All从c接收md5值。MD5All在发送错误可以尽快返回,并通过defer调用关闭`done channel`:
```golang
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All在返回前关闭done
// 它可能会在c和errc接收完前退出
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
```
#### Bounded parallelism
在[parallel.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/parallel.go)的实现中,MD5All为每个文件启动了一个新的`goroutine`。在文件较大较多的目录中,这种分配方式可能会需要比计算机可用内存更大的内存。
我们可以限制并行读取的文件数来限制这种分配。在[bounded.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/bounded.go)中,我们通过创建固定数量的`goroutine`来读取文件实现这种控制。我们的`pipeline`现在需要包含三个阶段:遍历树、读取文件并计算md5、汇总md5值
第一个阶段,`walkFiles`,发送文件树中普通文件的路径:
```golang
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// 在遍历完成后关闭paths
defer close(paths)
// errc带有缓冲,此处不需要select
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
retur
```
中间阶段启动固定数量的`goroutine`运行`digester`方法,它们从`paths`上接收文件名,并将结果发送到`c channel`上:
```golang
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
```
和我们的示例程序不一样的是,`digester`方法不会关闭`下游channel`,因为有多个`goroutine`在共享这个`channel`。相反,`MD5All`中的代码需要在所有`digester`完成后关闭它:
```golang
// 启动固定数量的goroutine来读取和计算文件md5
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
```
我们也可以让每一个`digester`方法创建并返回自己的`下游channel`,但我们需要在后面使用一个新的`goroutine`去对结果进行`fan-in`
最后阶段,从c中接收所有结果,然后从errc中检查是否产生错误。这个检查不能放在更前的地方进行,因为在这之前`walkFiles`可能后阻塞在向`下游channel`发送的地方:
```golang
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查遍历是否失败
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
```
#### Conclusion
本文介绍了在Go中构建流式数据`pipeline`。处理此类`pipeline`的bug非常棘手,因为在`pipeline`下游阶段不再关心传入的数据时,上游阶段都可能会阻塞在向下游阶段发送值的过程中。我们展示了如何使用`close channel`的方式向所有`goroutine`广播`done`信号,并定义了正确构建管道的准则。
#### 译者ps
原文链接:https://blog.golang.org/pipelines
有疑问加站长微信联系(非本文作者))