[译]Go Concurrency Patterns: Pipelines and cancellation - 2

TIEDPAG · 2020-04-12 16:13:54 · 800 次点击 · 预计阅读时间 13 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2020-04-12 16:13:54 的文章,其中的信息可能已经有所发展或是发生改变。

接上一节翻译,点此看前一部分

Stopping short

我们创建的管道有固定的模式:

  • 所有发送操作完成后,由该阶段关闭下游channel
  • 所有阶段均从上游channel接收值,直至上游channel关闭 这种模式允许每一个接收者使用for range循环读取值,并在所有值成功发送到下游channel后,退出当前的goroutine

但在真实的pipeline应用场景中,对应的阶段并不都会接收所有的上游channel发送的值。有时是因为设计的原因:接收者只需要值的子集就可以完成它的任务。还有更多情况,当上游channel给出了一个错误值,当前阶段需要尽快的退出。无论是那种情况,接收者都不需要等待后续的值到达,我们希望阶段尽快停止不需要的处理。 在我们的示例pipeline中,如果某个阶段不接收所有的上游channel值,则尝试发送值的goroutine将被阻塞:

    // 仅接收第一个值
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // 因为我们只接收了第一个值
    // 则负责输出的其中一个goroutine在尝试发送值时将被挂起
}

这里产生了资源泄漏:goroutine消耗内存和运行时资源,goroutine堆栈中的堆引用将会组织数据被gc回收。goroutine不会被回收,它们必须自己退出(译者注:由于主groutine退出后,进程将会结束,进程所消耗的资源将会被os回收,但实际上,这确实是一种泄漏,实际应用中,很有可能程序不会退出,这样程序跑着跑着,泄漏的问题就会出现,原文中只是一个简单的例子)

所以,即使下游阶段无法接收所有入站值的情况下,我们也需要pipeline的上游阶段退出。一种方式是将下游channel加上缓存区。缓存区可以容纳固定数量的值;如果缓存区中还有空间,发送操作将不会被阻塞:

c := make(chan int, 2) // 缓存区大小为2
c <- 1  // 立即返回
c <- 2  // 立即返回
c <- 3  // 阻塞直至其他goroutine调用 <- c 接收1

当通道创建时已经直到要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写gen函数,将整数列表写入到下游channel的缓冲区中,避免创建goroutine

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

为了不返回阻塞的goroutine,我们可以在merge函数的下游channel添加缓存区:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 给未处理的输入足够的缓冲区
    // ... 其他部分不变 ...

虽然这可以避免程序中goroutine被阻塞,但这是错误的代码。缓冲区大小设为1是因为我们知道上下游channel的情况。这是非常脆弱的程序,一旦我们向gen方法多传递一个值,那将继续阻塞goroutine 相反,正确的做法应是为下游的阶段提供一种方法,告诉发送方,我们将不再接收输入

Explicit cancellation

main函数决定不接受来自out的值直接退出时,它必须告诉上游阶段的goroutine放弃发送的值。我们将使用一个名为done的channel上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送端:

func main() {
    in := gen(2, 3)

    // 启动两个sq函数区读取in
    c1 := sq(in)
    c2 := sq(in)

    // 仅接收第一个值
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告诉上游发送者,我们将要退出
    done <- struct{}{}
    done <- struct{}{}
}

负责发送的goroutine将会使用select语句替换发送操作,select将在发送完成或从done接收到值时继续执行。done的值类型是空的结构体,因为发送的值没有意义,它只是作为一个通知,指示应该放弃发送。负责输出的goroutine将继续从上游channel读取数据,因此不会阻塞上游阶段(我们稍后将会讨论如何让此循环尽早返回)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs的每个channel启动一个goroutine 
    // output将值从c复制到out或done接收到值后调用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... 其他的没有变化 ...

这种方式带来一个新的问题:每一个下游接收者都需要知道可能被阻塞的上游发送者的数量,并在返回前尽早的向发送着发送信号。处理这种计数是无聊且容易出错的。

我们需要在不知道数量的goroutine数量的情况下,通知所有goroutine停止向下游发送值。在go语言中,我们能使用关闭channel的方式来实现,因为但我们关闭一个通道时,会在关闭的channel上执行接收值的操作将会立即返回,并获得对应数据的零值

这意味着main方法可以通过关闭done channel来解除所有阻塞的发送者。这个关闭操作实际上是产生了一个广播信息。我们将扩展一下每个pipeline的处理函数,新增一个done作为参数,并通过defer语句来进行关闭,以便main函数以任何一种方式退出前,都会将退出信号发送给pipeline的所有阶段

func main() {
    // 设置一个done channel,当我们退出时,关闭这个channel
    // 通知所有的goroutine我们开始退出了
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // 启动两个sq函数区读取in
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 仅接收第一个值
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done将会在defer调用中关闭
}

现在,我们的pipeline中的每个阶段都可以在done channel关闭后马上返回。而merge方法中调用outputgoroutine可以在不读取上游channel的所有数据的情况下退出,因为它知道上游发送方sq将在done channel关闭是停止尝试发送。output将使用defer调用确保退出前调用wg.Done:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs的每个channel启动一个goroutine  
    // output将值从c复制到out,直到c关闭后或done被关闭时退出,
    // 调用前defer将调用wg.Done
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... 其他的没有变化 ...

同样,sq方法可以在done channel关闭后立刻返回。sq也通过defer语句来实现对关闭out channel

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

以下是使用pipeline模式的一些指南:

  • 所有发送操作完成后,每个阶段都应关闭其下游channel
  • 阶段会一直从上游channel接收值,直到上游channel被关闭或发送者被阻塞为止

pipeline可以通过预留足够大的缓冲区来存储所有发送的数据,或者在接收方可能放弃时,显式的向发送方发送信号,解除发送方的阻塞。

Digesting a tree

我们来看一个更真实的pipeline MD5时一种消息摘要算法(message-digest algorithm),可以用作为文件校验和(checksum)。使用命令行程序md5sum打印出来的值如下:

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序就像md5sum一样,但是取一个目录作为参数,并打印该目录下每个普通文件的md5值,并按路径名排序

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的程序中mian函数调用了一个辅助方法MD5All,它由路径名返回所有文件的md5值,然后对结果进行排序和打印:

func main() {
    // 计算目录下所有的文件的md5
    // 根据路径排序后打印出来
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All方法时我们讨论的重点。在serial.go的实现中,不使用并发,只是遍历树时计算每个文件的md5

// MD5All读取以root为根的文件树中的所有文件,
// 并返回文件路径到文件md5值的map
// 如果目录遍历失败或任何读取失败,则MD5All返回错误
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

Parallel digestion

parallel.go的实现中,我们将MD5All拆分为由两个阶段组成的pipeline。在第一个阶段中,sumFiles方法,遍历文件树,创建一个新的goroutine处理新的文件,并把计算结果发送到一个接收result类型的channel上:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles函数返回两个channel:一个用于发送result,另一个用于返回filepath.Walk调用产生的错误。walk方法启动一个新的goroutine来处理每个普通文件,然后检查done。如果done被关闭了,则立即返回:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // 对于每个文件,启动一个goroutine去计算文件md5,并将结果发送到c
    // 在errc上发送错误
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk方法已经返回,此时所有调用都完成了wg.Add
        // 当所有发送完成后,关闭c
        go func() {
            wg.Wait()
            close(c)
        }()
        // 因为errc是带缓冲的,所以这里不需要select
        errc <- err
    }()
    return c, errc
}

MD5All从c接收md5值。MD5All在发送错误可以尽快返回,并通过defer调用关闭done channel:

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All在返回前关闭done
    // 它可能会在c和errc接收完前退出
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Bounded parallelism

parallel.go的实现中,MD5All为每个文件启动了一个新的goroutine。在文件较大较多的目录中,这种分配方式可能会需要比计算机可用内存更大的内存。

我们可以限制并行读取的文件数来限制这种分配。在bounded.go中,我们通过创建固定数量的goroutine来读取文件实现这种控制。我们的pipeline现在需要包含三个阶段:遍历树、读取文件并计算md5、汇总md5值 第一个阶段,walkFiles,发送文件树中普通文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // 在遍历完成后关闭paths
        defer close(paths)
        // errc带有缓冲,此处不需要select
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    retur

中间阶段启动固定数量的goroutine运行digester方法,它们从paths上接收文件名,并将结果发送到c channel上:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

和我们的示例程序不一样的是,digester方法不会关闭下游channel,因为有多个goroutine在共享这个channel。相反,MD5All中的代码需要在所有digester完成后关闭它:

     // 启动固定数量的goroutine来读取和计算文件md5
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们也可以让每一个digester方法创建并返回自己的下游channel,但我们需要在后面使用一个新的goroutine去对结果进行fan-in

最后阶段,从c中接收所有结果,然后从errc中检查是否产生错误。这个检查不能放在更前的地方进行,因为在这之前walkFiles可能后阻塞在向下游channel发送的地方:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // 检查遍历是否失败
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Conclusion

本文介绍了在Go中构建流式数据pipeline。处理此类pipeline的bug非常棘手,因为在pipeline下游阶段不再关心传入的数据时,上游阶段都可能会阻塞在向下游阶段发送值的过程中。我们展示了如何使用close channel的方式向所有goroutine广播done信号,并定义了正确构建管道的准则。

译者ps

原文链接:https://blog.golang.org/pipelines


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

800 次点击  
加入收藏 微博
1 回复  |  直到 2020-04-12 16:16:54
TIEDPAG
TIEDPAG · #1 · 5年之前

一边读原文一边翻译

读完发现自己之前设计的一个pipeline有bug....

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传