golang并发编程——goroutine使用指南

skh2015java · · 1049 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

并发是golang最有核心竞争力的功能,golang的并发依赖的并不是线程,而是协程。协程和线程有什么区别呢?最大的区别就是协程比线程更为轻量。默认情况中一个进程最大可以启动254个线程,这个数值也可以改为无限制,但主机资源消耗就会非常严重。而使用协程就不同了,一个进程可以轻轻松松启动上万个协程而毫无压力。

因此本篇文章就来说说在golang中如何创建使用协程。

golang设计协程的目的,一方面是为了提高并发效率,另外一方面就是尽可能发挥多核CPU的能力。golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。通过这样的设计,把每个CPU都充分调动起来,减少CPU空闲时间,提高了CPU吞吐量,无形当中也提高了I/O效率。

提到golang的协程,就不得不提到一个名词:管道(pipeline)。这里的管道和Linux系统中的pipe不是同一个意思,这里的管道指的是使用channel将多个处理步骤相连,形成的具有多级channel的数据流。一般来说,管道都是通过流入口读取数据,从流出口发送数据,读取数据之后都会调用某些函数来处理这些数据。

管道中的每一级都可以拥有多个流入口和流出口,但管道的首级和末级一般情况只有一个流入口或者流出口。拥有流出口的首级一般称之为数据源或者生产者,拥有流入口的末级一般称之为终点或者消费者。

这些技术解释,看上去很枯燥,我们通过一些简单的实例逐渐深入讲解。首先看下面的实例。在这个实例中,共分有三步来处理数据,第一步gen函数负责将传入的数据放到channel之中,当数据传完之后,关闭channel。代码如下:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二步,sq函数从channel中读取数据,并对每个数值进行相乘运算,然后再将运算后的数据发送到下一个channel当中。代码如下:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

最后一步,就是main函数了。main函数接受二阶段中发送的数据,然后输出这些数据知道channel关闭。代码如下:

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

因为sq函数的参数类型和返回类型一致,所以sq函数可以合并处理,修改后的代码如下:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

到这里,以上三步就完成了一个非常基本的golang并发模型。但还存在很多缺陷,我们继续对它进行优化。首先第一步,就是将每一步处理单个channel,改为处理多个channel。

在golang并发模型中,存在两个概念:Fan-in(扇入)和Fan-out(扇出)。扇入指的是一个程序可以同时从多个channel中读取数据并且对其进行处理,直到收到明确的停止信号或者所有的channel被关闭。 
扇出指的是多个程序可以同时从一个channel中读取数据并且对其进行处理,直到channel关闭。扇出值越大,CPU利用率越高,IO使用率也就越高。

下面的优化,就是针对扇入和扇出入手的。

我们将调用一次sq函数变为调用两次sq函数,同时引入一个merge函数来扇入处理结果数据。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge函数会通过启动一个协程将多个channel中的数据合并到一个channel之中。Golang语言中,向一个已经关闭的channel中发送数据会引发一个运行时异常,所以有必要在发送数据之前需要确保channel未被关闭。这里,我们使用sync.WaitGroup 做同步,只有数据发送完了,才会关闭channel。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

现在我们已经有了一个这样的模型:

  • 只有所有的数据都发送完成之后,才会关闭channel
  • 其它协程会一直接受数据,直到所有channel被关闭。

通过这个模型,我们可以循环接受并且处理数据。但我们的脚步不会就此停止,让我们继续往下优化。

目前所有的协程都是独立运行的,负责发送的协程可以不停的发送数据,接受数据的协程会不停的接受数据。那如果接受数据的协程不再需要这些数据了,那么又该如何通知上游的协程呢?

在上面的示例中,如果某一个阶段发生异常而退出,那么其他协程无法获知这个事件,就会发生一些资源泄漏。

    // Consume the first value from output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

所以下一步优化的方向就是协程之间的协同工作。先拿channel开刀,因为channel是可以带缓冲的。所以我们声明一个带有缓冲的channel:

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

channel的buffer是2,所以一次只能放入两个值,只有这两个值被处理了之后才能继续往里面放入新值。

这样,我们就可以修改一个gen函数。

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到merge函数中,我们也可以考虑在merge函数中使用一个带有缓冲的channel:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... the rest is unchanged ...

直接声明一个buffer=1的channel,不是一个好主意。因为目前这个值是已知的,但以后如果发生变化,那么还要修改代码,所以最好写成通用代码。但现在先这样用着吧。

这些貌似和协同工作,没关系。那下面就是有关系的代码了,加入main函数中准备要退出了,也就是不再接受数据了。main函数需要通知上游的协程停止发送数据,main函数如何做到这点呢?

main函数使用另外一个channel来完成这件事情,当需要退出时,main就通过done这个新增的channel发送消息,如下:

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

main给done发送了一个空的结构体,但这个没有关系,我们关心的是done里面是否有值,而不是有什么值。其它协程如果需要接受信号,那么就需要使用select来处理done。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

这种方法虽然可以实现通知的目的,但还有问题:main函数需要明确知道一共有多少个协程需要通知到,因此done <- struct{}{}需要不停的调用,直到所有的协程都被通知到位。如果有一些协程没有被通知到,呵呵,等着看异常吧。

为了解决这个问题,我们通过关闭done的方式来通知所有的协程。因为从一个已经关闭的channel中接受数据,会使当前协程立即退出。所以main函数中关闭了done,那么所有等待着从done接受关闭信号的协程们,会老老实实的自动退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.
}

这样,merge函数就可以明确得知其下游已经不再需要处理数据了,merge就可以放心退出了。而sq也可以通过得知done已经关闭,而不再向外发送数据了。但这些函数再退出之时都会调用wg.Done来告诉main,它们都已经合法退出。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...
func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

到这里,才算是真正的完成了协程之间的协同工作。



原文地址:http://blog.csdn.net/vikings_1001


有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:skh2015java

查看原文:golang并发编程——goroutine使用指南

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:1006366459

1049 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传