并发是golang最有核心竞争力的功能,golang的并发依赖的并不是线程,而是协程。协程和线程有什么区别呢?最大的区别就是协程比线程更为轻量。默认情况中一个进程最大可以启动254个线程,这个数值也可以改为无限制,但主机资源消耗就会非常严重。而使用协程就不同了,一个进程可以轻轻松松启动上万个协程而毫无压力。
因此本篇文章就来说说在golang中如何创建使用协程。
golang设计协程的目的,一方面是为了提高并发效率,另外一方面就是尽可能发挥多核CPU的能力。golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。通过这样的设计,把每个CPU都充分调动起来,减少CPU空闲时间,提高了CPU吞吐量,无形当中也提高了I/O效率。
提到golang的协程,就不得不提到一个名词:管道(pipeline)。这里的管道和Linux系统中的pipe不是同一个意思,这里的管道指的是使用channel将多个处理步骤相连,形成的具有多级channel的数据流。一般来说,管道都是通过流入口读取数据,从流出口发送数据,读取数据之后都会调用某些函数来处理这些数据。
管道中的每一级都可以拥有多个流入口和流出口,但管道的首级和末级一般情况只有一个流入口或者流出口。拥有流出口的首级一般称之为数据源或者生产者,拥有流入口的末级一般称之为终点或者消费者。
这些技术解释,看上去很枯燥,我们通过一些简单的实例逐渐深入讲解。首先看下面的实例。在这个实例中,共分有三步来处理数据,第一步gen函数负责将传入的数据放到channel之中,当数据传完之后,关闭channel。代码如下:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二步,sq函数从channel中读取数据,并对每个数值进行相乘运算,然后再将运算后的数据发送到下一个channel当中。代码如下:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
最后一步,就是main函数了。main函数接受二阶段中发送的数据,然后输出这些数据知道channel关闭。代码如下:
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
因为sq函数的参数类型和返回类型一致,所以sq函数可以合并处理,修改后的代码如下:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
到这里,以上三步就完成了一个非常基本的golang并发模型。但还存在很多缺陷,我们继续对它进行优化。首先第一步,就是将每一步处理单个channel,改为处理多个channel。
在golang并发模型中,存在两个概念:Fan-in(扇入)和Fan-out(扇出)。扇入指的是一个程序可以同时从多个channel中读取数据并且对其进行处理,直到收到明确的停止信号或者所有的channel被关闭。
扇出指的是多个程序可以同时从一个channel中读取数据并且对其进行处理,直到channel关闭。扇出值越大,CPU利用率越高,IO使用率也就越高。
下面的优化,就是针对扇入和扇出入手的。
我们将调用一次sq函数变为调用两次sq函数,同时引入一个merge函数来扇入处理结果数据。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函数会通过启动一个协程将多个channel中的数据合并到一个channel之中。Golang语言中,向一个已经关闭的channel中发送数据会引发一个运行时异常,所以有必要在发送数据之前需要确保channel未被关闭。这里,我们使用sync.WaitGroup 做同步,只有数据发送完了,才会关闭channel。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
现在我们已经有了一个这样的模型:
- 只有所有的数据都发送完成之后,才会关闭channel
- 其它协程会一直接受数据,直到所有channel被关闭。
通过这个模型,我们可以循环接受并且处理数据。但我们的脚步不会就此停止,让我们继续往下优化。
目前所有的协程都是独立运行的,负责发送的协程可以不停的发送数据,接受数据的协程会不停的接受数据。那如果接受数据的协程不再需要这些数据了,那么又该如何通知上游的协程呢?
在上面的示例中,如果某一个阶段发生异常而退出,那么其他协程无法获知这个事件,就会发生一些资源泄漏。
// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
所以下一步优化的方向就是协程之间的协同工作。先拿channel开刀,因为channel是可以带缓冲的。所以我们声明一个带有缓冲的channel:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
channel的buffer是2,所以一次只能放入两个值,只有这两个值被处理了之后才能继续往里面放入新值。
这样,我们就可以修改一个gen函数。
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
回到merge函数中,我们也可以考虑在merge函数中使用一个带有缓冲的channel:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
直接声明一个buffer=1的channel,不是一个好主意。因为目前这个值是已知的,但以后如果发生变化,那么还要修改代码,所以最好写成通用代码。但现在先这样用着吧。
这些貌似和协同工作,没关系。那下面就是有关系的代码了,加入main函数中准备要退出了,也就是不再接受数据了。main函数需要通知上游的协程停止发送数据,main函数如何做到这点呢?
main函数使用另外一个channel来完成这件事情,当需要退出时,main就通过done这个新增的channel发送消息,如下:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
main给done发送了一个空的结构体,但这个没有关系,我们关心的是done里面是否有值,而不是有什么值。其它协程如果需要接受信号,那么就需要使用select来处理done。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
这种方法虽然可以实现通知的目的,但还有问题:main函数需要明确知道一共有多少个协程需要通知到,因此done <- struct{}{}需要不停的调用,直到所有的协程都被通知到位。如果有一些协程没有被通知到,呵呵,等着看异常吧。
为了解决这个问题,我们通过关闭done的方式来通知所有的协程。因为从一个已经关闭的channel中接受数据,会使当前协程立即退出。所以main函数中关闭了done,那么所有等待着从done接受关闭信号的协程们,会老老实实的自动退出。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
这样,merge函数就可以明确得知其下游已经不再需要处理数据了,merge就可以放心退出了。而sq也可以通过得知done已经关闭,而不再向外发送数据了。但这些函数再退出之时都会调用wg.Done来告诉main,它们都已经合法退出。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
到这里,才算是真正的完成了协程之间的协同工作。
原文地址:http://blog.csdn.net/vikings_1001
有疑问加站长微信联系(非本文作者)