[译]Go Concurrency Patterns: Pipelines and cancellation - 2

TIEDPAG · · 639 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

接上一节翻译,[点此看前一部分](https://studygolang.com/articles/27916) #### Stopping short 我们创建的管道有固定的模式: - 所有发送操作完成后,由该阶段关闭`下游channel` - 所有阶段均从`上游channel`接收值,直至`上游channel`关闭 这种模式允许每一个接收者使用`for range`循环读取值,并在所有值成功发送到`下游channel`后,退出当前的`goroutine` 但在真实的`pipeline`应用场景中,对应的阶段并不都会接收所有的`上游channel`发送的值。有时是因为设计的原因:接收者只需要值的子集就可以完成它的任务。还有更多情况,当`上游channel`给出了一个错误值,当前阶段需要尽快的退出。无论是那种情况,接收者都不需要等待后续的值到达,我们希望阶段尽快停止不需要的处理。 在我们的示例`pipeline`中,如果某个阶段不接收所有的`上游channel`值,则尝试发送值的`goroutine`将被阻塞: ```golang // 仅接收第一个值 out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // 因为我们只接收了第一个值 // 则负责输出的其中一个goroutine在尝试发送值时将被挂起 } ``` 这里产生了资源泄漏:`goroutine`消耗内存和运行时资源,`goroutine`堆栈中的堆引用将会组织数据被gc回收。`goroutine`不会被回收,它们必须自己退出(译者注:由于主groutine退出后,进程将会结束,进程所消耗的资源将会被os回收,但实际上,这确实是一种泄漏,实际应用中,很有可能程序不会退出,这样程序跑着跑着,泄漏的问题就会出现,原文中只是一个简单的例子) 所以,即使下游阶段无法接收所有入站值的情况下,我们也需要`pipeline`的上游阶段退出。一种方式是将`下游channel`加上缓存区。缓存区可以容纳固定数量的值;如果缓存区中还有空间,发送操作将不会被阻塞: ```golang c := make(chan int, 2) // 缓存区大小为2 c <- 1 // 立即返回 c <- 2 // 立即返回 c <- 3 // 阻塞直至其他goroutine调用 <- c 接收1 ``` 当通道创建时已经直到要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写`gen`函数,将整数列表写入到下游`channel`的缓冲区中,避免创建`goroutine`: ```golang func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out } ``` 为了不返回阻塞的`goroutine`,我们可以在`merge`函数的`下游channel`添加缓存区: ```golang func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // 给未处理的输入足够的缓冲区 // ... 其他部分不变 ... ``` 虽然这可以避免程序中`goroutine`被阻塞,但这是错误的代码。缓冲区大小设为1是因为我们知道上下游`channel`的情况。这是非常脆弱的程序,一旦我们向`gen`方法多传递一个值,那将继续阻塞`goroutine` 相反,正确的做法应是为下游的阶段提供一种方法,告诉发送方,我们将不再接收输入 #### Explicit cancellation 当`main`函数决定不接受来自out的值直接退出时,它必须告诉上游阶段的`goroutine`放弃发送的值。我们将使用一个名为done的`channel`上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送端: ```golang func main() { in := gen(2, 3) // 启动两个sq函数区读取in c1 := sq(in) c2 := sq(in) // 仅接收第一个值 done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // 告诉上游发送者,我们将要退出 done <- struct{}{} done <- struct{}{} } ``` 负责发送的`goroutine`将会使用`select`语句替换发送操作,`select`将在发送完成或从done接收到值时继续执行。done的值类型是空的结构体,因为发送的值没有意义,它只是作为一个通知,指示应该放弃发送。负责输出的`goroutine`将继续从`上游channel`读取数据,因此不会阻塞上游阶段(我们稍后将会讨论如何让此循环尽早返回) ```golang func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为cs的每个channel启动一个goroutine // output将值从c复制到out或done接收到值后调用wg.Done output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... 其他的没有变化 ... ``` 这种方式带来一个新的问题:每一个下游接收者都需要知道可能被阻塞的上游发送者的数量,并在返回前尽早的向发送着发送信号。处理这种计数是无聊且容易出错的。 我们需要在不知道数量的`goroutine`数量的情况下,通知所有`goroutine`停止向下游发送值。在go语言中,我们能使用关闭`channel`的方式来实现,因为但我们关闭一个通道时,会[在关闭的`channel`上执行接收值的操作将会立即返回,并获得对应数据的零值](https://golang.org/ref/spec#Receive_operator) 这意味着`main`方法可以通过关闭`done channel`来解除所有阻塞的发送者。这个关闭操作实际上是产生了一个广播信息。我们将扩展一下每个`pipeline`的处理函数,新增一个done作为参数,并通过defer语句来进行关闭,以便main函数以任何一种方式退出前,都会将退出信号发送给`pipeline`的所有阶段 ```golang func main() { // 设置一个done channel,当我们退出时,关闭这个channel // 通知所有的goroutine我们开始退出了 done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // 启动两个sq函数区读取in c1 := sq(done, in) c2 := sq(done, in) // 仅接收第一个值 out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done将会在defer调用中关闭 } ``` 现在,我们的`pipeline`中的每个阶段都可以在`done channel`关闭后马上返回。而`merge`方法中调用`output`的`goroutine`可以在不读取`上游channel`的所有数据的情况下退出,因为它知道上游发送方sq将在`done channel`关闭是停止尝试发送。`output`将使用defer调用确保退出前调用`wg.Done`: ```golang func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为cs的每个channel启动一个goroutine // output将值从c复制到out,直到c关闭后或done被关闭时退出, // 调用前defer将调用wg.Done output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... 其他的没有变化 ... ``` 同样,`sq`方法可以在`done channel`关闭后立刻返回。`sq`也通过defer语句来实现对关闭`out channel`: ```golang func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out } ``` 以下是使用`pipeline`模式的一些指南: - 所有发送操作完成后,每个阶段都应关闭其`下游channel` - 阶段会一直从`上游channel`接收值,直到`上游channel`被关闭或发送者被阻塞为止 `pipeline`可以通过预留足够大的缓冲区来存储所有发送的数据,或者在接收方可能放弃时,显式的向发送方发送信号,解除发送方的阻塞。 #### Digesting a tree 我们来看一个更真实的`pipeline` MD5时一种消息摘要算法(message-digest algorithm),可以用作为文件校验和(checksum)。使用命令行程序md5sum打印出来的值如下: ```shell % md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go ``` 我们的示例程序就像md5sum一样,但是取一个目录作为参数,并打印该目录下每个普通文件的md5值,并按路径名排序 ```shell % go run serial.go . d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go ``` 我们的程序中mian函数调用了一个辅助方法`MD5All`,它由路径名返回所有文件的md5值,然后对结果进行排序和打印: ```golang func main() { // 计算目录下所有的文件的md5 // 根据路径排序后打印出来 m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } } ``` `MD5All`方法时我们讨论的重点。在[serial.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/serial.go)的实现中,不使用并发,只是遍历树时计算每个文件的md5 ```golang // MD5All读取以root为根的文件树中的所有文件, // 并返回文件路径到文件md5值的map // 如果目录遍历失败或任何读取失败,则MD5All返回错误 func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil } ``` #### Parallel digestion 在[parallel.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/parallel.go)的实现中,我们将MD5All拆分为由两个阶段组成的`pipeline`。在第一个阶段中,sumFiles方法,遍历文件树,创建一个新的`goroutine`处理新的文件,并把计算结果发送到一个接收result类型的`channel`上: ```golang type result struct { path string sum [md5.Size]byte err error } ``` `sumFiles`函数返回两个`channel`:一个用于发送result,另一个用于返回`filepath.Walk`调用产生的错误。`walk`方法启动一个新的`goroutine`来处理每个普通文件,然后检查`done`。如果`done`被关闭了,则立即返回: ```golang func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // 对于每个文件,启动一个goroutine去计算文件md5,并将结果发送到c // 在errc上发送错误 c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk方法已经返回,此时所有调用都完成了wg.Add // 当所有发送完成后,关闭c go func() { wg.Wait() close(c) }() // 因为errc是带缓冲的,所以这里不需要select errc <- err }() return c, errc } ``` MD5All从c接收md5值。MD5All在发送错误可以尽快返回,并通过defer调用关闭`done channel`: ```golang func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All在返回前关闭done // 它可能会在c和errc接收完前退出 done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil } ``` #### Bounded parallelism 在[parallel.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/parallel.go)的实现中,MD5All为每个文件启动了一个新的`goroutine`。在文件较大较多的目录中,这种分配方式可能会需要比计算机可用内存更大的内存。 我们可以限制并行读取的文件数来限制这种分配。在[bounded.go](https://github.com/TIEDPAG/golang.org/blob/master/pipelines/bounded.go)中,我们通过创建固定数量的`goroutine`来读取文件实现这种控制。我们的`pipeline`现在需要包含三个阶段:遍历树、读取文件并计算md5、汇总md5值 第一个阶段,`walkFiles`,发送文件树中普通文件的路径: ```golang func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // 在遍历完成后关闭paths defer close(paths) // errc带有缓冲,此处不需要select errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() retur ``` 中间阶段启动固定数量的`goroutine`运行`digester`方法,它们从`paths`上接收文件名,并将结果发送到`c channel`上: ```golang func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { for path := range paths { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: return } } } ``` 和我们的示例程序不一样的是,`digester`方法不会关闭`下游channel`,因为有多个`goroutine`在共享这个`channel`。相反,`MD5All`中的代码需要在所有`digester`完成后关闭它: ```golang // 启动固定数量的goroutine来读取和计算文件md5 c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }() ``` 我们也可以让每一个`digester`方法创建并返回自己的`下游channel`,但我们需要在后面使用一个新的`goroutine`去对结果进行`fan-in` 最后阶段,从c中接收所有结果,然后从errc中检查是否产生错误。这个检查不能放在更前的地方进行,因为在这之前`walkFiles`可能后阻塞在向`下游channel`发送的地方: ```golang m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // 检查遍历是否失败 if err := <-errc; err != nil { return nil, err } return m, nil } ``` #### Conclusion 本文介绍了在Go中构建流式数据`pipeline`。处理此类`pipeline`的bug非常棘手,因为在`pipeline`下游阶段不再关心传入的数据时,上游阶段都可能会阻塞在向下游阶段发送值的过程中。我们展示了如何使用`close channel`的方式向所有`goroutine`广播`done`信号,并定义了正确构建管道的准则。 #### 译者ps 原文链接:https://blog.golang.org/pipelines

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

639 次点击  
加入收藏 微博
1 回复  |  直到 2020-04-12 16:16:54
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传