深度解析sync WaitGroup源码及其实现原理

memo012 · · 1020 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

![在这里插入图片描述](https://img-blog.csdnimg.cn/img_convert/dd80a6028a66bd592130590f7c1fac58.png#pic_center) # 目录 - WaitGroup介绍 - WaitGroup的实现 - Add - Done - Wait # WaitGroup介绍 `waitGroup` ,也是在go语言并发中比较常用的语法,所以在这里我们一起剖析 waitGroup 的使用方式及其源码解读。 `WaitGroup` 也是sync 包下一份子,用来解决任务编排的一个并发原语。它主要解决了并发-等待问题:比如现在有三个`goroutine`,分别为`goroutineA`,`goroutineB`,`goroutineC`,而`goroutineA`需要等待`goroutineB`和`goroutineC`这一组goroutine全部执行完毕后,才可以执行后续业务逻辑。此时就可以使用 `WaitGroup` 轻松解决。 在这个场景中,`goroutineA`为主goroutine,`goroutineB`和`goroutineC`为子goroutine。`goroutineA`则需要在**检查点(checkout point)** 等待`goroutineB`和`goroutineC`全部执行完毕,如果在执行任务的`goroutine`还没全部完成,那么`goroutineA`就会阻塞在检查点,直到所有`goroutine`都完成后才能继续执行。 **代码实现:** ```go package main import ( "fmt" "sync" ) func goroutineB(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("goroutineB Execute") time.Sleep(time.Second) } func goroutineC(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("goroutineC Execute") time.Sleep(time.Second) } func main() { var wg sync.WaitGroup wg.Add(2) go goroutineB(&wg) go goroutineC(&wg) wg.Wait() fmt.Println("goroutineB and goroutineC finished...") } ``` **运行结果:** ```go goroutineC Execute goroutineB Execute goroutineB and goroutineC finished... ``` 上述就是**WaitGroup** 的简单操作,它的语法也是比较简单,提供了三个方法,如下所示: ```go func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait() ``` - Add:用来设置WaitGroup的计数值(子goroutine的数量) - Done:用来将WaitGroup的计数值减1,起始就是调用Add(-1) - Wait:调用这个方法的goroutine会一直阻塞,直到WaitGroup的技术值变为0 接下来,我们进行剖析 WaitGroup 的源码实现,让其无处可遁,它源码比较少,除去注释,也就几十行,对新手来说也是一种不错的选择。 # WaitGroup的实现 首先,我们看看 WaitGroup 的数据结构,它包括了一个noCopy 的辅助字段,一个具有复合意义的state1字段。 - noCopy 的辅助字段:主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段 - state1:具有复合意义的字段,包含WaitGroup计数值,阻塞在检查点的主gooutine和信号量 ```go type WaitGroup struct { // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则 noCopy noCopy // 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数 // 另外32bit是用作信号量的 // 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法 // 总之,会找到对齐的那64bit作为state,其余的32bit做信号量 state1 [3]uint32 } // 得到state的地址和信号量的地址 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } } ``` 因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。 在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/img_convert/367c0ea5ead347acc6cf779554d9727c.png#pic_center) 在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/img_convert/334a529b22c44f4a5a77ebfffe9ecf48.png#pic_center) 接下里,我们一一看 Add 方法、 Done 方法、 Wait 方法的实现原理。 ## Add **Add方法实现思路:** Add方法主要操作的state1字段中计数值部分。当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位),然后内部通过原子操作将这个值加到计数值上。需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的。 **代码实现如下:** ```go func (wg *WaitGroup) Add(delta int) { // statep表示wait数和计数值 // 低32位表示wait数,高32位表示计数值 statep, semap := wg.state() // uint64(delta)<<32 将delta左移32位 // 因为高32位表示计数值,所以将delta左移32,增加到技术上 state := atomic.AddUint64(statep, uint64(delta)<<32) // 当前计数值 v := int32(state >> 32) // 阻塞在检查点的wait数 w := uint32(state) if v > 0 || w == 0 { return } // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量 // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } ``` ## Done 内部就是调用Add(-1)方法,这里就不细讲了。 ```go // Done方法实际就是计数器减1 func (wg *WaitGroup) Done() { wg.Add(-1) } ``` ## Wait **wait实现思路:** 不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。 **代码实现如下:** ```go func (wg *WaitGroup) Wait() { // statep表示wait数和计数值 // 低32位表示wait数,高32位表示计数值 statep, semap := wg.state() for { state := atomic.LoadUint64(statep) // 将state右移32位,表示当前计数值 v := int32(state >> 32) // w表示waiter等待值 w := uint32(state) if v == 0 { // 如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回 return } // 否则使用原子操作将state值加一。 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 阻塞休眠等待 runtime_Semacquire(semap) // 被唤醒,不再阻塞,返回 return } } } ``` 到此,waitGroup的基本使用和实现原理已介绍完毕了,相信大家已有不一样的收获,咱们下期见。 > 文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/2021022314195433.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxMDY2MDY2,size_16,color_FFFFFF,t_70#pic_center)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1020 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传