[译]Go Concurrency Patterns: Pipelines and cancellation - 1

TIEDPAG · 2020-04-11 23:39:35 · 1110 次点击 · 预计阅读时间 3 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2020-04-11 23:39:35 的文章,其中的信息可能已经有所发展或是发生改变。

英文原文:https://blog.golang.org/pipelines

简介

go语言的并发原语支持使得开发者构建流式数据pipeline(管道)变得容易,这些管道可以有效的利用I/O和多cup。本文介绍了这类pipeline的示例,重点介绍了操作失败时出现的细微差别,和如何优雅的处理错误的技术

What is a pipeline

go语言中没有对pipeline正式的定义;pipeline只是多种并发模式中的一种。通常而言,pipeline是由channel连接的一系列阶段,其中每个阶段是由运行相同方法的goroutine。在每个阶段中,goroutine会完成

  • 上游channel接收值
  • 对数据进行一些处理,通常会产生新的值
  • 下游channel发送值 除第一个阶段和最后一个阶段外,所有的阶段中都可以有任意数量的上游channel下游channel,它们仅有上游channel下游channel。第一个阶段通常也被称为生产者,最后一个阶段则被称为消费者接收器

下面我们将从一个简单的pipeline开始来解释这些思想和技术。后面,我们会提供一个更现实的示例。

Squaring numbers

构建一个具有三个阶段的管道 对于第一个阶段,gen函数,它将一个整数列表转化为一个channel,然后启动一个goroutine将整数列表中的数据向channel中发送,并在所有值发送完成后关闭channel

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

在第二个阶段中,sq函数,从上游channel中接收整数,并返回一个新的channel作为下游channel,并向下游channel发送接收到的整数的平方。在上游channel关闭后,且该阶段所有的值都发送到下游channel后,关闭下游channel:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函数启动pipeline并运行最后阶段:从第二阶段接收值,并打印出来,直到关闭channel

func main() {
    // 组装pipeline
    c := gen(2, 3)
    out := sq(c)

    // 消费输出
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由于sq的上游channel下游channel是相同类型,所以我们还能进行任意次的组合(译者:套娃警告)。另外,main函数也可以使用for range循环:

func main() {
    // 组装pipeline 并消费 输出
    for n := range sq(sq(gen(2,3))) {
        fmt.Println(n) // 16\n18
    }
}

Fan-out, fan-in

多个方法从同一个通道读取数据,直到通道关闭,这种方式被称为fan-out。这提供了一种新的分配方式,可以帮助我们并行的使用cpu和I/O。 一个方法可以从多个上游channel读取数据,并合并为一个下游channel,并且在所有的上游channel关闭后,将下游channel关闭,这种方式被称为fan-in 我们改变一下上面的管道组装方式,运行两个sq实例,每个实例均从同一个channel中读取。这里需要引入一个新的函数:merge,来实现fan-in打印出结果:

func main() {
    in := gen(2, 3)

    // 启动两个sq函数区读取in
    c1 := sq(in)
    c2 := sq(in)

    // 消费c1、c2合并后的输出
    for n:= range merge(c1, c2) {
        fmt.Println(n) // 4\n9 或 9\n4
    }
}

merge函数为每个上游channel启动了一个goroutine来将所有的上游channel转为为单个channelgoroutine将值复制到唯一的下游channel。在所有负责复制值发送至下游channel的goroutine启动后,再启动一个goroutine等待所有上游channel数据转发完毕后关闭下游channel向一个close的channel发送数据将导致panic,所以必须保证调用close之前所有发送已完成。sync.WaitGroup`提供保持这种同步的简单方式:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs的每个channel启动一个goroutine  
    // output将值从c复制到out,直到c关闭后调用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 启动一个goroutine负责在所有输出goroutine完成后关闭out
    // 必须在wg.Add后调用
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

未完....


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1110 次点击  
加入收藏 微博
2 回复  |  直到 2020-04-12 11:04:22
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传