[译]Go Concurrency Patterns: Pipelines and cancellation - 1

TIEDPAG · · 913 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

英文原文:https://blog.golang.org/pipelines #### 简介 go语言的并发原语支持使得开发者构建流式数据`pipeline(管道)`变得容易,这些管道可以有效的利用I/O和多cup。本文介绍了这类pipeline的示例,重点介绍了操作失败时出现的细微差别,和如何优雅的处理错误的技术 #### What is a pipeline go语言中没有对`pipeline`正式的定义;`pipeline`只是多种并发模式中的一种。通常而言,`pipeline`是由`channel`连接的一系列阶段,其中每个阶段是由运行相同方法的`goroutine`。在每个阶段中,`goroutine`会完成 - 从`上游channel`接收值 - 对数据进行一些处理,通常会产生新的值 - 向`下游channel`发送值 除第一个阶段和最后一个阶段外,所有的阶段中都可以有任意数量的`上游channel`和`下游channel`,它们仅有`上游channel`或`下游channel`。第一个阶段通常也被称为`生产者`或`源`,最后一个阶段则被称为`消费者`或`接收器` 下面我们将从一个简单的`pipeline`开始来解释这些思想和技术。后面,我们会提供一个更现实的示例。 #### Squaring numbers 构建一个具有三个阶段的管道 对于第一个阶段,gen函数,它将一个整数列表转化为一个`channel`,然后启动一个`goroutine`将整数列表中的数据向`channel`中发送,并在所有值发送完成后关闭`channel` ```golang func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } }() return out } ``` 在第二个阶段中,sq函数,从`上游channel`中接收整数,并返回一个新的`channel`作为`下游channel`,并向`下游channel`发送接收到的整数的平方。在`上游channel`关闭后,且该阶段所有的值都发送到`下游channel`后,关闭`下游channel`: ```golang func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } ``` main函数启动`pipeline`并运行最后阶段:从第二阶段接收值,并打印出来,直到关闭`channel`: ```golang func main() { // 组装pipeline c := gen(2, 3) out := sq(c) // 消费输出 fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 } ``` 由于sq的`上游channel`和`下游channel`是相同类型,所以我们还能进行任意次的组合(译者:套娃警告)。另外,main函数也可以使用`for range`循环: ```golang func main() { // 组装pipeline 并消费 输出 for n := range sq(sq(gen(2,3))) { fmt.Println(n) // 16\n18 } } ``` #### Fan-out, fan-in 多个方法从同一个通道读取数据,直到通道关闭,这种方式被称为`fan-out`。这提供了一种新的分配方式,可以帮助我们并行的使用cpu和I/O。 一个方法可以从多个`上游channel`读取数据,并合并为一个`下游channel`,并且在所有的`上游channel`关闭后,将`下游channel`关闭,这种方式被称为`fan-in` 我们改变一下上面的管道组装方式,运行两个sq实例,每个实例均从同一个`channel`中读取。这里需要引入一个新的函数:`merge`,来实现`fan-in`打印出结果: ```golang func main() { in := gen(2, 3) // 启动两个sq函数区读取in c1 := sq(in) c2 := sq(in) // 消费c1、c2合并后的输出 for n:= range merge(c1, c2) { fmt.Println(n) // 4\n9 或 9\n4 } } ``` `merge`函数为每个`上游channel`启动了一个`goroutine`来将所有的`上游channel`转为为单个`channel`,`goroutine`将值复制到唯一的`下游channel`。在所有负责复制值发送至`下游channel`的goroutine`启动后,再启动一个`goroutine`等待所有`上游channel`数据转发完毕后关闭`下游channel` 向一个close的`channel`发送数据将导致`panic`,所以必须保证调用close之前所有发送已完成。`sync.WaitGroup`提供保持这种同步的简单方式: ```golang func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为cs的每个channel启动一个goroutine // output将值从c复制到out,直到c关闭后调用wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // 启动一个goroutine负责在所有输出goroutine完成后关闭out // 必须在wg.Add后调用 go func() { wg.Wait() close(out) }() return out } ``` 未完....

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

913 次点击  
加入收藏 微博
2 回复  |  直到 2020-04-12 11:04:22
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传