问题
当main goroutine
为了等待work goroutine
都运行完毕,不得不在程序末尾使用time.Sleep()
来休眠一段时间,等待work goroutine
充分运行。
$ vim ./test/goroutine_test.go
package test
import (
"fmt"
"testing"
"time"
)
func TestGoRoutine(t *testing.T) {
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
time.Sleep(time.Second)
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
3
1
2
4
5
6
7
8
0
--- PASS: TestGoRoutine (1.00s)
PASS
ok command-line-arguments 1.291s
但对于实际应用中,休眠1秒是完全不够的,同时大部分时间都无法预知for
循环内代码运行时间的长短,此时就不能使用time.Sleep()
来完成等待操作。
可以使用管道来完成上述操作
func TestGoRoutine(t *testing.T) {
count := 10
ch := make(chan bool, count)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
ch <- true
}(i)
}
for i := 0; i < count; i++ {
<-ch
}
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
0
5
6
7
8
2
1
4
3
--- PASS: TestGoRoutine (0.00s)
PASS
ok command-line-arguments 0.304s
使用管道可以达到目的,但有些大材小用,因为管道被设计出来不仅仅只是在这里做简单的同步处理的,因此这里使用管道实际上是不合适的。假如有上万、上十万、上百万的循环,也要申请同样数量大小的管道,对内存会是一个不小的开销。
对于这种情况,Golang中有一种工具sync.WaitGroup
能更加方便地帮助达到目的。
sync.WaitGroup
Golang中除了使用Channel通道和Mutex互斥锁实现两个并发程序之间的同步外,还可以通过WaitGroup等待组实现多个任务的同步,WaitGroup可以保证在并发环境中完成指定数量的任务。
-
WaitGroup
在Golang中用于goroutine
同步,解决同步阻塞等外的问题。
通俗来讲goroutine
分为两类角色,一种gorouine
作为一个worker
小弟,老老实实的干活。另一种goroutine
作为master
管理者来监督小弟干活,当然master
自身也是一个worker
。
当有很多worker
干活时,master
没事干歇着,但同时master
又希望得到一个通知,了解所有worker
们什么时候干完。
从程序开发角度来看,就是维护一个worker
总数和一个channel
,每个worker
干完就向channel
发送一个空message
。master
阻塞在channel
的监听上,来一个message
就说明有一个worker
干完活了,记录下有多少message
,message
和worker
总数一致则说明全干完活。master
就可以关闭channel
,验收worker
的工作成果。
-
WaitGroup
是指等待(Wait)一系列执行(Group)完成后才会继续向下执行 -
WaitGroup
能一直等到所有的work goroutine
执行完毕,同时阻塞main goroutine
的执行,直到所有的goroutine
执行完成。 -
WaitGroup
类似发布订阅,只不过订阅者接收到的不是消息,而是一种事件信号。
计数器
WaitGroup
内部拥有一个计数器,最初从0开始。
type WaitGroup struct{
noCopy noCopy
state1 [3]byte
}
- Counter:Worker计数器
master gortouine
调用WaitGroup.Add(delta int)
时会增加delta
,调用WaitGroup.Done()
时会减少1。 - Waiter:Waiter计数器
调用WaitGroup.Wait()
时Waiter
计数器加1,worker goroutine
计数器降低到0时,会重置Waiter
计数器。 - Sema:信号量
用于阻塞master goroutine
,调用WaitGroup.Wait()
时会通过runtime_Semacquire
获取信号量。降低Waiter
计数器时,通过runtime_Semrelease
释放信号量。
方法
WaitGroup
拥有三个方法分别是Add()
、Done()
、Wait()
用来控制计数器的数量
-
Add()
将计数器设置为n
,用于增加或减少worker goroutine
的数量。
func (wg *WaitGroup) Add(delta int)
-
Done()
每次会将计数器减少1
func (wg *WaitGroup) Done()
WaitGroup.Done()
和WaitGroup.Add(-1)
完全等价
-
Wait()
会阻塞代码的运行,直到计数器的值减少为0。
func (wg *WaitGroup) Wait()
使用方法
-
master goroutine
通过调用WaitGroup.Add(delta int)
来设置worker goroutine
的个数,然后创建work goroutine
。 -
worker goroutine
执行结束后需调用WaitGroup.Done()
-
master goroutine
调用WaitGroup.Wait()
且被block
阻塞,直到所有的worker goroutine
全部执行结束后返回。
例如:
$ vim ./test/sync_test.go
package test
import (
"fmt"
"sync"
"testing"
)
func TestWaitGroup(t *testing.T) {
count := 10
//添加goroutine数量
wg := sync.WaitGroup{}
wg.Add(count)
//循环模拟并发
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
wg.Done() //设置gorooutine为-1
}(i)
}
//执行main goroutine阻塞,直到所有WaitGroup数量为0。
wg.Wait()
}
$ go test -v -run TestWaitGroup sync_test.go
=== RUN TestWaitGroup
9
4
5
6
7
8
2
3
1
0
--- PASS: TestWaitGroup (0.00s)
PASS
ok command-line-arguments 0.294s
注意
- WaitGroup对象不是一个引用类型,函数传值时需使用地址(地址传值)。
- WaitGroup的计数器不能为负数,不能使用
Add()
给WaitGroup对象设置一个负值。
应用
需要一个用户的画像服务,当一个请求到来时需要
- 从请求中解析出用户ID和用户画像维度参数
- 根据用户ID从五个服务比如数据库、存储、RPC等拉取不同维度的数据
- 将读取到的数据进行整合返回给调用方
假如每个服务的响应时间是20ms到50ms,如果顺序调用服务读取数据不考虑数据整合消耗的时间,服务端整体的响应时间将会在100ms到250ms。先不说业务能不能接受,响应时间显然存在很大的优化空间。最直接的优化方向是取数逻辑总时间应该是单个服务最大消耗时间。
func TestTask(t *testing.T) {
var wg sync.WaitGroup
for _,task := range tasks{
task := task
wg.Add(1)
go func(){
defer wg.Done()
task()
}()
}
wg.Wait()
}
使用注意
-
WaitGroup.Done()
必须在所有WaitGroup.Add()
之后执行,要保证两个函数都在master goroutine
中调用。 -
WaitGroup.Done()
在worker goroutine
中调用,尤其要保证调用一次,不能因为panic
或任何原因导致没有执行,因此建议使用defer WaitGroup.Done()
。 -
WaitGroup.Done()
和WaitGroup.Wait()
在时序上没有先后顺序
task := task
由于Golang对切片遍历时runtime
会将tasks[i]
拷贝到task
的内存地址中,下标i
会变化,而task
的内存地址是不会改变的。如果不做此次赋值操作,所有的goroutine
可能读取到的都是最后一个task
。
例如:
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc000006040 0xc00003c500
task2 0xc000006040 0xc00003c508
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.296s
执行结果说明
- 遍历时数据的内存地址不变
unsafe.Pointer(&task)
- 遍历时通过下标获取数据时内存地址不同
unsafe.Pointer(&tasks[index])
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task := task
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc0000c0030 0xc0000884f0
task2 0xc0000c0038 0xc0000884f8
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.320s
执行结果说明
- 遍历内部创建的局部变量,即使名称相同,内存地址也不会复用。
- 遍历时数据的内存地址不同
unsafe.Pointer(&task)
- 遍历时通过下标获取数据时内存地址不同
unsafe.Pointer(&tasks[index])
有疑问加站长微信联系(非本文作者)