作者:Sameer Ajmani
原文:https://blog.golang.org/pipelines
介绍
Go并发模型使构建能高效利用IO和多核CPU的实时流式数据的pipeline非常方便。这篇文章将对此进行介绍,同时会着重强调一些在实践中的易犯错误以及对应的解决方法。
什么是Pipeline
在GO中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多的goroutine。这些goroutine
- 从输入channel接收数据
- 对数据作相应处理,例如在此基础上产生新数据
- 再通过输出channel把数据发送出去
除了开始和结束,每个阶段都会包含任意多个输入和输出channel。开始阶段只有输出channel,结束阶段只有输入channel。相应地,开始阶段可被称为生产者,结束阶段可被称为消费者。
我们先通过一个简单的例子来说明。
并发计算平方数
首先来举一个涉及三阶段的pipeline例子
第一阶段,创建输入参数为可变长int整数的gen函数,它通过goroutine发送所有输入参数,并在发送完成后关闭相应channel:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶段,sq函数,负责从输入channel中接收数据并作平方处理再发送到输出channel中。在输入channel关闭并把所有数据都成功发送至输出channel,关闭输出channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
主函数main中创建pipeline并执行了最后阶段的任务,即从第二阶段接收数据并打印出来:
func main() {
// Set up the pipeline
c := gen(2, 3)
out := sql(c)
// Consume the output
fmt.Println(<-out)
fmt.Println(<-out)
}
此处sq函数的输入和输出参数类型相同,为channel。因此我们可以对其进行组合。重写main函数,如下:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n)
}
}
此处相等于在pipeline中增加一阶段,即涉及三个阶段,其中2、3阶段的goroutine由同一函数产生。
Fan-out和Fan-in (扇出和扇入)
多个goroutine同时从同一channel中读取数据,直到channel关闭,称为`fan-out`。这为我们提供了一种将任务分发给多个worker而实现多CPU和IO的高效利用的途径。
通过多goroutine从多个输入channel接收信息并发送给单个输出channel,直到所有的输入channel都关闭才会停止数据读取。这个称作 `fan-in`。
重写之前的main:我们调用两次`sq`,且两次都从相同channel中读取数据。这里我们将引入一个新函数,通过fan-in方式获取数据:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in
c1 := sq(in)
c2 := sq(in)
// Comsume the merged output from c1 and c2
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函数为每个输入channel启动一个goroutine,将数据发送至同一个channel,从而实现将多channel转化为单channel。一旦所有的输出channel(生产者)启动,merge就会启动一或多个goroutine接收所有数据并在结束后关闭生产者的channel。
需要注意,向已关闭的channel发送数据会导致panic,因此需保证在关闭channel前所有数据都发送完毕是非常重要的。sync.WaitGroup为我们提供了一种实现该同步的方式。示例如下:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs)
for _, c := range(cs) {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
goroutine突然停止
注意一些使用原则,如下:
- 在数据发送完毕后,关闭输出channel;
- 持续不停地从输入中接收数据,直到channel关闭;
我们可使用for循环来接收数据,一旦数据所有数据接收完毕将会自动退出。
但在真实场景中,并非所有情况都需接收完所有数据。有时设计的是我们只需接收一部分的数据并可运行。更常见地,如果上游早早就抛出了一个错误,它便会早早地退出。还有一些情况,如接收者不再等待数据接收,此时也需停止生产发送数据。
在我们的例子中,如果一个阶段没有成功接收完数据就退出,我们的goroutine仍会尝试发送数据,这将会导致channel进入无限期的阻塞。
func main() {
// Consume the first value from the output
out := merge(c1, c2)
fmt.Println(<-out)
return
// Since we didn't receive the second value from out,
// one of the output goroutine is hung attempting to send it
}
这是资源泄露:goroutine会继续消耗内存、运行时资源,而且在栈中的堆引用也不能被回收。goroutine必须退出,才能启动垃圾回收机制。
当下游不能完全接收所有数据时,我们需要准备将上游goroutine退出。一种方式,我们可以把上游的channel设定为一个buffer。当buffer还有空间时,发送操作将会立刻完成。
c := make(chan int, 2)
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
当channel被创建时,如果我们已经知道数据的大小,可以如此来简化我们的代码。比如我们重写gen函数,拷贝数据到buffer channel中就可以避免创建新的goroutine。
func gen(nums ...int) int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
我们可以考虑给merge函数输出channel指定固定大小空间。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan, int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
}
虽然这解决了程序中goroutine阻塞的问题,但并不是优秀的代码。指定buffer的长度的前提是,我们需知道merge将接收值的长度。如果下游读取少量数据便结束,goroutine依然会阻塞。
我们需要一种使下游通知上游以表明它们不再接收信息。
明确取消
当主函数在没有从输出channel中接收完所有值便退出时,它必须告诉上游停止数据发送。可以通过向done channel发送停止信号实现。此处有两个可能阻塞的goroutine,所以需发两个值。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutine that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out)
// Tell the remaining senders we're leaving
done <- struct{}{}
done <- struct{}{}
}
merge中的发送goroutine用select语句取代了原来的发送操作,它将负责将数据发出和接收done channel的消息。Done将接收的值是空结构体,因为该值没有任何意义:它仅仅是用来表明应该停止向输出channel发送数据了。该goroutine将会不停循环地从输入channel中接收数据,以确保上游不被阻塞。(待会我们将会讨论怎么提早从循环退出)。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
}
这种方式的问题是:每个下游都需知道上游将发送的数据量,以便向其发送消息实现提早退出。但毫无疑问,时刻监控已发送数量是非常荒诞,也是非常容易出错的。
我们需要一种在上游goroutine数量未知或无限大的情况下使其停止的方式。在GO中,我们可以通过关闭channel来实现,因为在已关闭的channel上接收数据会被立刻处理并返回一个零值。
这意味着main函数中可仅仅通过关闭done channel来使发送方解除阻塞。该关闭操作会产生一个有效的广播信号并传递给发送方。我们可以扩展pipeline中的函数,使其可以多接受一个done参数,然后通过defer语句对执行关闭以便于在main退出时发送给各阶段完成信号来实现退出。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
一旦done channel关闭,各个阶段就可以成功返回退出。当done被关闭,merge就会知道上游会停止发送数据,merge函数就会停止从输入channel接收数据并返回。输出channel通过defer语句确保所有的wg.Done在函数时能被调用。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
}
相似地,只要done channel一关闭,sq函数也会立刻返回。通过defer语句,sql函数确保它们输出channel一定能被顺利关闭。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
pipeline几个指导原则:
- 各个阶段在所有的发送操作完成便会关闭输出channels;
- 各个阶段会不停的接收数据,直到这些channel都被关闭或者发送方不再阻塞;
Pipelines中可以通过为数据发送提供足够的buffer大小或在接收方确定放弃继续接收数据时发送完成信号来解除发送方的阻塞。
目录树摘要
让我们来看一个更实际的例子.
MD5是一种消息摘要算法,在checksum校验文件方面非常有用。通过命令行工具md5sum,我们打印了一系列文件的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的例子是一个类似于md5sum的程序,它接受单一目录作为参数,并打印该目录下每个文件的摘要值。文件是按文件名升序排列打印。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
程序的main主函数调用了一个名为MD5All的函数,它返回的是一个map,key为路径名,value为摘要值。最后,对结果进行了排序和打印。
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All函数我们将重点讨论。在serial.go文件中,并没有使用并发技术,仅仅是依次读取每个文件内容并对其调用md5.Sum。
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
并行摘要
在parellel.go文件中,我们把MD5ALL拆成了两阶段。第一阶段,在sumFiles函数中,它遍历目录并在各个goroutine中执行文件摘要,最后将结果发送给channel。
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles函数返回了两个channels:一个用于传输结果,另一个用于返回filepath.Walk的错误。walk函数为每个文件启动了一个新的goroutine来处理它们,同时也检查是否done 。如果done被关闭,walk函数将立刻返回。
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
MD5All函数从c(channel)中接收摘要值。但发现错误,它会提早返回,并通过defer语句关闭done。
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
并行限制
parallel.go中的MD5All通过为每个文件系统一个新的goroutine实现。如果一个目录中有太多的大文件,这可能会导致分配的内存超过机器的可用内存。
我们可以通过限制并行读取文件的数量来限制内容分配。在bounded.go文件中,我们创建了固定数量的goroutines来读取文件。现在我们的pipeline涉及了三个阶段:遍历目录树、读取文件并执行摘要和收集摘要结果。
第一阶段,walkFiles,负责发送目录树中文件路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
第二阶段,我们为digester函数启动了固定数量的goroutine,它将从paths中接收文件名处理并发送摘要结果给channel c:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
不同于之前的例子,digester不会关闭输出channel,因为太多的goroutine共用了一个channel。取而代之,在所有digester执行完毕,MD5All会着手关闭channel。
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
我们可以为每个digester创建独立的channel,然后返回。但是这样我们就需要增加额外的goroutines来对结果进行合并。
最后阶段,我们从channel c中接收所有的结果并检查errc是否返回了错误。该检查无法过早执行,因为过早检查,可能会导致walkFile阻塞。
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
总结
这篇文章给我们介绍了在GO中构建流式数据pipeline的方法。pipeline中的失败处理是需要一定技巧,每个尝试给下游发送数据的阶段都可能阻塞,下游可能不再接收上游的输入数据。我们展示了如何通过关闭channel来实现给所有goroutine发送 "done" 信号,以及定义了如何正确构建pipeline的原则。
有疑问加站长微信联系(非本文作者)