接上一节翻译,点此看前一部分
Stopping short
我们创建的管道有固定的模式:
- 所有发送操作完成后,由该阶段关闭
下游channel
- 所有阶段均从
上游channel
接收值,直至上游channel
关闭 这种模式允许每一个接收者使用for range
循环读取值,并在所有值成功发送到下游channel
后,退出当前的goroutine
但在真实的pipeline
应用场景中,对应的阶段并不都会接收所有的上游channel
发送的值。有时是因为设计的原因:接收者只需要值的子集就可以完成它的任务。还有更多情况,当上游channel
给出了一个错误值,当前阶段需要尽快的退出。无论是那种情况,接收者都不需要等待后续的值到达,我们希望阶段尽快停止不需要的处理。
在我们的示例pipeline
中,如果某个阶段不接收所有的上游channel
值,则尝试发送值的goroutine
将被阻塞:
// 仅接收第一个值
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// 因为我们只接收了第一个值
// 则负责输出的其中一个goroutine在尝试发送值时将被挂起
}
这里产生了资源泄漏:goroutine
消耗内存和运行时资源,goroutine
堆栈中的堆引用将会组织数据被gc回收。goroutine
不会被回收,它们必须自己退出(译者注:由于主groutine退出后,进程将会结束,进程所消耗的资源将会被os回收,但实际上,这确实是一种泄漏,实际应用中,很有可能程序不会退出,这样程序跑着跑着,泄漏的问题就会出现,原文中只是一个简单的例子)
所以,即使下游阶段无法接收所有入站值的情况下,我们也需要pipeline
的上游阶段退出。一种方式是将下游channel
加上缓存区。缓存区可以容纳固定数量的值;如果缓存区中还有空间,发送操作将不会被阻塞:
c := make(chan int, 2) // 缓存区大小为2
c <- 1 // 立即返回
c <- 2 // 立即返回
c <- 3 // 阻塞直至其他goroutine调用 <- c 接收1
当通道创建时已经直到要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写gen
函数,将整数列表写入到下游channel
的缓冲区中,避免创建goroutine
:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
为了不返回阻塞的goroutine
,我们可以在merge
函数的下游channel
添加缓存区:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 给未处理的输入足够的缓冲区
// ... 其他部分不变 ...
虽然这可以避免程序中goroutine
被阻塞,但这是错误的代码。缓冲区大小设为1是因为我们知道上下游channel
的情况。这是非常脆弱的程序,一旦我们向gen
方法多传递一个值,那将继续阻塞goroutine
相反,正确的做法应是为下游的阶段提供一种方法,告诉发送方,我们将不再接收输入
Explicit cancellation
当main
函数决定不接受来自out的值直接退出时,它必须告诉上游阶段的goroutine
放弃发送的值。我们将使用一个名为done的channel
上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送端:
func main() {
in := gen(2, 3)
// 启动两个sq函数区读取in
c1 := sq(in)
c2 := sq(in)
// 仅接收第一个值
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 告诉上游发送者,我们将要退出
done <- struct{}{}
done <- struct{}{}
}
负责发送的goroutine
将会使用select
语句替换发送操作,select
将在发送完成或从done接收到值时继续执行。done的值类型是空的结构体,因为发送的值没有意义,它只是作为一个通知,指示应该放弃发送。负责输出的goroutine
将继续从上游channel
读取数据,因此不会阻塞上游阶段(我们稍后将会讨论如何让此循环尽早返回)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out或done接收到值后调用wg.Done
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... 其他的没有变化 ...
这种方式带来一个新的问题:每一个下游接收者都需要知道可能被阻塞的上游发送者的数量,并在返回前尽早的向发送着发送信号。处理这种计数是无聊且容易出错的。
我们需要在不知道数量的goroutine
数量的情况下,通知所有goroutine
停止向下游发送值。在go语言中,我们能使用关闭channel
的方式来实现,因为但我们关闭一个通道时,会在关闭的channel
上执行接收值的操作将会立即返回,并获得对应数据的零值
这意味着main
方法可以通过关闭done channel
来解除所有阻塞的发送者。这个关闭操作实际上是产生了一个广播信息。我们将扩展一下每个pipeline
的处理函数,新增一个done作为参数,并通过defer语句来进行关闭,以便main函数以任何一种方式退出前,都会将退出信号发送给pipeline
的所有阶段
func main() {
// 设置一个done channel,当我们退出时,关闭这个channel
// 通知所有的goroutine我们开始退出了
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 启动两个sq函数区读取in
c1 := sq(done, in)
c2 := sq(done, in)
// 仅接收第一个值
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done将会在defer调用中关闭
}
现在,我们的pipeline
中的每个阶段都可以在done channel
关闭后马上返回。而merge
方法中调用output
的goroutine
可以在不读取上游channel
的所有数据的情况下退出,因为它知道上游发送方sq将在done channel
关闭是停止尝试发送。output
将使用defer调用确保退出前调用wg.Done
:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs的每个channel启动一个goroutine
// output将值从c复制到out,直到c关闭后或done被关闭时退出,
// 调用前defer将调用wg.Done
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... 其他的没有变化 ...
同样,sq
方法可以在done channel
关闭后立刻返回。sq
也通过defer语句来实现对关闭out channel
:
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
以下是使用pipeline
模式的一些指南:
- 所有发送操作完成后,每个阶段都应关闭其
下游channel
- 阶段会一直从
上游channel
接收值,直到上游channel
被关闭或发送者被阻塞为止
pipeline
可以通过预留足够大的缓冲区来存储所有发送的数据,或者在接收方可能放弃时,显式的向发送方发送信号,解除发送方的阻塞。
Digesting a tree
我们来看一个更真实的pipeline
MD5时一种消息摘要算法(message-digest algorithm),可以用作为文件校验和(checksum)。使用命令行程序md5sum打印出来的值如下:
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的示例程序就像md5sum一样,但是取一个目录作为参数,并打印该目录下每个普通文件的md5值,并按路径名排序
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的程序中mian函数调用了一个辅助方法MD5All
,它由路径名返回所有文件的md5值,然后对结果进行排序和打印:
func main() {
// 计算目录下所有的文件的md5
// 根据路径排序后打印出来
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All
方法时我们讨论的重点。在serial.go的实现中,不使用并发,只是遍历树时计算每个文件的md5
// MD5All读取以root为根的文件树中的所有文件,
// 并返回文件路径到文件md5值的map
// 如果目录遍历失败或任何读取失败,则MD5All返回错误
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
Parallel digestion
在parallel.go的实现中,我们将MD5All拆分为由两个阶段组成的pipeline
。在第一个阶段中,sumFiles方法,遍历文件树,创建一个新的goroutine
处理新的文件,并把计算结果发送到一个接收result类型的channel
上:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles
函数返回两个channel
:一个用于发送result,另一个用于返回filepath.Walk
调用产生的错误。walk
方法启动一个新的goroutine
来处理每个普通文件,然后检查done
。如果done
被关闭了,则立即返回:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 对于每个文件,启动一个goroutine去计算文件md5,并将结果发送到c
// 在errc上发送错误
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk方法已经返回,此时所有调用都完成了wg.Add
// 当所有发送完成后,关闭c
go func() {
wg.Wait()
close(c)
}()
// 因为errc是带缓冲的,所以这里不需要select
errc <- err
}()
return c, errc
}
MD5All从c接收md5值。MD5All在发送错误可以尽快返回,并通过defer调用关闭done channel
:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All在返回前关闭done
// 它可能会在c和errc接收完前退出
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Bounded parallelism
在parallel.go的实现中,MD5All为每个文件启动了一个新的goroutine
。在文件较大较多的目录中,这种分配方式可能会需要比计算机可用内存更大的内存。
我们可以限制并行读取的文件数来限制这种分配。在bounded.go中,我们通过创建固定数量的goroutine
来读取文件实现这种控制。我们的pipeline
现在需要包含三个阶段:遍历树、读取文件并计算md5、汇总md5值
第一个阶段,walkFiles
,发送文件树中普通文件的路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// 在遍历完成后关闭paths
defer close(paths)
// errc带有缓冲,此处不需要select
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
retur
中间阶段启动固定数量的goroutine
运行digester
方法,它们从paths
上接收文件名,并将结果发送到c channel
上:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
和我们的示例程序不一样的是,digester
方法不会关闭下游channel
,因为有多个goroutine
在共享这个channel
。相反,MD5All
中的代码需要在所有digester
完成后关闭它:
// 启动固定数量的goroutine来读取和计算文件md5
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
我们也可以让每一个digester
方法创建并返回自己的下游channel
,但我们需要在后面使用一个新的goroutine
去对结果进行fan-in
最后阶段,从c中接收所有结果,然后从errc中检查是否产生错误。这个检查不能放在更前的地方进行,因为在这之前walkFiles
可能后阻塞在向下游channel
发送的地方:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查遍历是否失败
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Conclusion
本文介绍了在Go中构建流式数据pipeline
。处理此类pipeline
的bug非常棘手,因为在pipeline
下游阶段不再关心传入的数据时,上游阶段都可能会阻塞在向下游阶段发送值的过程中。我们展示了如何使用close channel
的方式向所有goroutine
广播done
信号,并定义了正确构建管道的准则。
译者ps
有疑问加站长微信联系(非本文作者))

一边读原文一边翻译
读完发现自己之前设计的一个
pipeline
有bug....