1资源同步
1.1 解决方案
2 信号量
2.1 共享变量
2.2 信号量
3 锁
3.1 死锁
3.2 活锁
3.3 饥饿
4 Golang sync 包
4.4.1 数据结构
4.4.2 NewCond函数
4.4.3 Wait方法
4.4.4 Singal方法
4.4.5 Broadcast方法
4.3.1 数据结构
4.3.2 Add和Done方法
4.3.3 Wait方法
4.2.1 常量和结构
4.2.1 RLock和RUnlock方法
4.2.2 Lock和Unlock方法
4.1.1 接口和结构
4.1.2 Lock 方法
4.1.3 Unlock方法
4.1 sync.mutex.go
4.2 sync.rwmutex.go
4.3 sync.waitgroup.go
4.4 sync.cond.go
1资源同步
并发已经成为现代程序设计中的重要考虑内容,但是并发涉及到一个很重要的内容就是资源同步。当两个或者多个线程访问同样的资源的时候,运行的结果取决于线程运行时精确的时序。这样导致结果与期望的结果大相径庭,因此我们需要对资源的访问顺序进行控制,已达到资源同步的目的。
1.1 解决方案
将共享资源(也就是共享内存)的程序片段成为临界区域
,通过适当安排,使得两个者线程同时位于临界区域。对于临界区域
访问解决方案,需要满足如下4个条件
任何两个线程不能同时位于临界区
不对CPU执行速度和时间做任何假设
临界区外运行的线程不阻塞其他线程
不能使线程无限期等待进入临界区
常见的互斥解决方案:
屏蔽中断线程的切换是由CPU中断机制提供的,如果一个线程进入临界区域后,CPU关闭中断响应;在离开临界区域后,再打开中断机制。那么在临界区域将不会有其他线程来竞争资源。 当时将屏蔽中断权利交给用户空间执行是不明智的,而且对于多核CPU而言没有效果。
锁变量几乎每一个编程语言都提供了资源同步方式:锁机制。该机制通过对资源进行Lock
和Unlock
,以达到对关键资源有序访问。
严格轮换法线程不停的执行CPU时间,连续测试某一个值是否出现。但是如果认为等待的时间非常短,可以使用该方式浪费CPU时间,用于等待的锁也成为自旋锁
。
2 信号量
2.1 共享变量
在理解信号量之前,先了解采用共享变量使用多线程会出现什么问题。下面是一个C代码片段
1for (i=0; i<niters; i++){
2 cnt ++;
3}
cnt
为全局变量,一个线程执行该代码片段的时候的汇编代码如下:
1 movq (%rdi), %rcx
2 testq %rcx, %rcx
3 jle .L2
4 movl $0, %eax
5.L3:
6 movq cnt(%rip), %rdx
7 addq %eax
8 movq %eax, cnt(%rip)
9 addq $1, %rax
10 cmpq %rcx, %rax
11 jne .L3
12.L2
2.2 信号量其中6-8
行分别对应对应着加载cnt
,更新cnt
和存储cnt
。将cnt
变量从内存位置读出,加载到CPU寄存器中,在CPU运算器中加1,然后存储到cnt
的内存位置。虽然代码中cnt++
只有一行,但是转换为汇编代码的时候不只有一个操作,也就是说该语句不是原子操作。如果多个线程同时执行代码,按照之前的条件,不对CPU的执行顺序做任何假设,如果其中线程a
在执行7
行汇编代码,而线程b
执行6
行汇编代码,那么b
将"看不到"线程a
对全局变量cnt
加1的操作,那么每次执行的结果cnt
也不完全一致。
计算机领域先驱Dijkstra
提出经典的解决上述问题的方法:信号量(semaphore)。它是一个非负整数的全局变量。而且该变量只能有两个特殊操作来处理: P
和V
。
P(s): 如果
s
非零,那么P
将s
减1
,并且立即返回。如果s
为零,那么就挂起这个线程,知道s
为非零。V(s):
V
操作将s
加1
。如果有任何线程阻塞在P
操作等待s
非零,那么V
将重启其中线程中的一个。
Posix
标准定义需要操作信号量的函数
1#include <semaphore.h>
2int sem_init(sem_t *sem, 0, unsigned int value);
3int sem_wait(sem_t *s); /*P(s)*/
4int sem_post(sem_t *s); /*P(s)*/
那么如何使用信号量是的2.1小节出现同步问题解决呢?首先定义全局信号量
1volatile long cnt = 0; /* global variable */
2sem_t mutex; /*global semaphore*/
初始化信号量,在这里初始值为1
1sem_init(&mutex, 0, 1);
最后使用信号量操作函数将临界区域代码包含起来
1for (i =0; i<niters; i++){
2 sem_wait(&mutex);
3 cnt++;
4 sem_post(&mutex);
5}
3 锁
3.1 死锁
首先看一下死锁的规范定义:
如果一个线程(进程)集合中的每一个线程(进程)都在等待只能由该线程(进程)集合中的其他线程(进程)才能引发的事件,那么该线程(进程)集合是死锁的。
举一个例子,如果线程 a
和线程 b
同是执行,线程a
获取了资源r1
,等待获取资源r2
;而线程b
获取了资源r2
,等待获取资源r1
。那么线程a
和线程b
组成的集合是死锁的。
预防死锁
破坏占有等待条件
对于需要获取多个资源的线程,一次性获取全部资源,而不是依次获取各个资源。
破坏环路等待条件
死锁集合的线程按照等占有线程和等待线程可以组成有向环图。那么如果对所有资源进行排序,所有线程按照资源顺序获取资源。
3.2 活锁
在某些情况下,当线程意识它不能获下一个资源的时候,它会“礼貌性”地释放已经获得的资源,然后等待1ms
,在尝试一次。如果另一个线程也在相同的时候做了相同的操作, 那么同步的步调将导致两个线程都无法前进。
3.3 饥饿
在信号量小节中,当执行V
操作后,将恢复挂起线程中的一个,那么问题出现了:如果有多个线程被挂起,那么选择哪个线程恢复呢?如果随机选择一个线程恢复,如果源源不断的线程到达临界区域并且挂起,那么很有可能出现某一个线程一直等待资源,而导致"饥饿"。当然也有好的FILO
调度策略来解决调用问题。当时问题在于刚刚到达的线程有很好的局部性,也就是CPU的寄存器、缓存等包含了该线程的局部变量,如果程获得资源锁,很好的避免了线程上下文切换,对性能提高很有帮助。
在go
语言的互斥锁中采用结合上述两种策略,接下来小节中,将会仔细分析源码。
4 Golang sync 包
4.1 sync.mutex.go
4.1.1 接口和结构
包含了Locker
接口和Mutex
结构:
1type Locker interface {
2 Lock()
3 Unlock()
4}
5type Mutex struct {
6 state int32
7 sema uint32
8}
Mutex
实现了Locker
接口,该结构包含了state
的字段,用来表示该锁当前状态;sema
则为一个信号量。state
是一个32位的整数,不同比特位包含了不同的意义,其中源码中的有很详细的注释,该注释很好解释mutex
如何工作:
互斥锁有两种状态:正常状态和饥饿状态。在正常状态下,所有挂起等待的goroutine按照
FIFO
顺序等待。唤醒的goroutine将会和刚刚到达的goroutine竞争互斥锁的拥有权,因为刚刚到达的goroutine具有优势:它刚刚正在CPU上执行,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。如果一个等待的goroutine超过1ms没有获取互斥锁,那么它将会把互斥锁转变为饥饿模式。在饥饿模式下,互斥锁的所有权将移交给等待队列中的第一个。新来的goroutine将不会尝试去获得互斥锁,也不会去尝试自旋操作,而是放在队列的最后一个。如果一个等待的goroutine获取的互斥锁,如何它满足一下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将互斥锁的转台转换为正常状态。正常状态有很好的性能表现,饥饿模式也是非常重要的的,因为它能阻止尾部延迟的现象。
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken
4 mutexStarving
5 mutexWaiterShift = iota
6 starvationThresholdNs = 1e6
7)
mutexLocked
该值为1
, 第一位比特位1
,代表了该是否该互斥锁已经被锁住。mutex.state
与它进行&
操作,如果为1
表示已经锁住,0
则表示未被锁住。
mutexWoken
该值为2
,第二位比特位1
,代表了该互斥锁是否被唤醒,mutex.state
与它进行&
操作,如果为1
表示已经被唤醒,0
代表未被唤醒
mutexStarving
该值为4
,第三位比特为1
,代表了该互斥锁是否处于饥饿状态,mutex.state
与它进行&
操作,如果为1
表示处于饥饿转态,0
表示处于正常状态。
mutexWaiterShift
该值为3
,表示mutex.state
右移3位后为等待的goroutine
的数量。
starvationThresholdNs
goroutine
将互斥锁转换状态的时间等待的临界值:一百万纳秒,也就是1ms。
4.1.2 Lock 方法
1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2 if race.Enabled {
3 race.Acquire(unsafe.Pointer(m))
4 }
5 return
6}
CompareAndSwapInt32
是一个原子操作,它判断是一个参数的值是否等于第二个参数,如果相等,则将第一个参数设置为第三个参数,并返回true
;否则对一个参数不做任何操作并且返回false
。这一段是代码是处理第一次goroutine
进行尝试Lock
操作,如果一切都是初始状态,则m.state
为.....0000001
并且返回,进入临界区域代码,否则代码继续往下走。
1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state
首先定义了一下变量:goroutine
等待时间,是否饥饿转台,是否唤醒和自旋迭代次数和保存当前互斥锁状态。接下来是一个for
循环,只有退出循环才能进入临界区域代码,纵观代码只有两处使用break
来退出循环。
1for {
2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5 awoke = true
6 }
7 runtime_doSpin()
8 iter++
9 old = m.state
10 continue
11 }
12}
首先判断锁状态是被锁而且不是处于饥饿模式,加上还能自旋额次数,进入下一层判断。如果当前goroutine没有被唤醒
,
其他goroutine也没有被唤醒,
等待的goroutine超过1和
可以将m.state设置为唤醒转态四个条件同时满足,将
awoke设置
true`。然后进行自旋操作,进行一轮循环。
1new := old
2if old&mutexStarving == 0 {
3 new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6 new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9 new |= mutexStarving
10}
这三个判断条件做了如下工作:如果当前的mutex.state
处于正常模式,则将new
的锁位设置为1,如果当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1。如果starving
变量为true
并且处于锁定状态,则new
的饥饿状态位打开。
1if awoke {
2 if new&mutexWoken == 0 {
3 throw("sync: inconsistent mutex state")
4 }
5 new &^= mutexWoken
6}
如果 goroutine
已经被唤醒,则清空new
的唤醒位。
1if atomic.CompareAndSawpInt32(&m.state, old, new){
2 //...
3}else{
4 //...
5}
如果更新m.state
成功
1if old&(mutexLocked|mutexStarving) == 0 {
2 break
3}
如果未被锁定并且并不是出于饥饿状态,到达第一个break
,进入代码临界区域。
1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3 waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)
runtime_SemacquireMutex(s *uint32, lifo bool)
函数类似P
操作,如果lifo
为true
则将等待goroutine
插入到队列的前面。在这里,对于每一个到达的goroutine
,如果CompareAndSawpInt32
成功,并且到达时候如果锁出于锁定状态,那么将该goroutine
插入到等待队列的最后,否则插入到最前面。此时goroutine
将会被挂起,等待Unlock
的V
操作,将唤醒goroutines
1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5 throw("sync: inconsistent mutex state")
6 }
7 delta := int32(mutexLocked - 1<<mutexWaiterShift)
8 if !starving || old>>mutexWaiterShift == 1 {
9 delta -= mutexStarving
10 }
11 atomic.AddInt32(&m.state, delta)
12 break
13}
14
判断被唤醒的线程是否为达到饥饿状态,也就是等待时间超过1ms
,如果之前的m.state
不是饥饿状态,继续循环,给新到来goroutine
让出互斥锁。如果已经饥饿状态,则修改等待goroutine
数量和饥饿状态位,并返回进入临界代码区域。
4.1.3 Unlock方法
1new := atomic.AddInt32(&m.state, -mutexLocked)
首先创建变量new
,该变量的锁位为0
。接下来是饥饿状态判断
1if new&mutexStarving == 0 {
2 old := new
3 for {
4 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5 return
6 }
7 new = (old - 1<<mutexWaiterShift) | mutexWoken
8 if atomic.CompareAndSwapInt32(&m.state, old, new) {
9 runtime_Semrelease(&m.sema, false)
10 return
11 }
12 old = m.state
13 }
14 } else {
15 runtime_Semrelease(&m.sema, true)
16 }
如果是正常状态,则判断如果等待的goroutine
为零,或者已经被锁定、唤醒、或者已经变成饥饿状态,返回,不需要唤醒任何其他被挂起的goroutine
,因为互斥锁已经被其他goroutine
抢占。否则更新new
值(修改等待的goroutine数量)并设置唤醒为,如果CompareAndSwapInt32
成功,则通过runtime_Semrelease(&m.sema, false)
恢复挂起的goroutine.r如果为 true
表明将唤醒第一个阻塞的goroutine
,这第一点在else
饥饿的分支中体现。
4.2 sync.rwmutex.go
读写锁也是一种常见的锁机制,它允许多个线程读共享资源,只有一个线程写共享资源,接下来看看go中如何实现读写锁。
4.2.1 常量和结构
1type RWMutex struct {
2 w Mutex
3 writerSem uint32
4 readerSem uint32
5 readerCount int32
6 readerWait int32
7}
8const rwmutexMaxReaders = 1 << 30
RWMutex
结构包含了如下的字段
goroutine
数量。
4.2.1 RLock和RUnlock方法
1func (rw *RWMutex) RLock() {
2 // [...]
3 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4 runtime_Semacquire(&rw.readerSem)
5 }
6//[...]
7}
首先是readerCount
值+1, 如果小于零,则挂起goroutine
等待readerSem
。是不是很奇怪,为什么会小于零判断呢?在这里先卖一个关子,接下来会看到为什么是这样的设计逻辑。
1func (rw *RWMutex) RUnlock() {
2 //[...]
3 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: RUnlock of unlocked RWMutex")
7 }
8 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9 runtime_Semrelease(&rw.writerSem, false)
10 }
11 }
12 //[...]
13}
首先将readerCount
减去1,如果小于零,再讲readWait
减去1,如果是离开读的goroutine
数量为零,则对writerSem
信号量进行V
操作。
4.2.2 Lock和Unlock方法
1func (rw *RWMutex) Lock() {
2 //[...]
3 rw.w.Lock()
4 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6 runtime_Semacquire(&rw.writerSem)
7 }
8 //[...]
9}
首先rw.w.Lock
操作,来防止其他goroutine
对共享资源的写访问。然后将readerCount
减去rwmutexMaxReaders
,表明还剩多少goroutine
可以进行读访问,这也解释在RLock
中小于零的判断,如果还可以还可以进行读访问,则必须获得readerSem
信号量。在Lock
中接下来是对readWait
判断,如果该数量不为零,则需要对writerSem
进行P
操作,而V
操作只在RUnlock
方法中,如果最后一个读goroutine
离开,则进行V
操作。
1func (rw *RWMutex) Unlock() {
2 //[...]
3 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4 if r >= rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: Unlock of unlocked RWMutex")
7 }
8 for i := 0; i < int(r); i++ {
9 runtime_Semrelease(&rw.readerSem, false)
10 }
11 //[...]
12}
首先恢复readCounter
为正数,然后对readerSem
信号量进行r
次V
操作,唤醒在RLock
中被挂起的goroutine
。
4.3 sync.waitgroup.go
WaitGroup
通常用在等待一组goroutine
全部完成。调用Add
方法指明要等待的goroutine
的数量,调用Done
方法说明该goroutine
已经完成,而Wait
方法是阻塞等待的goroutine
。
4.3.1 数据结构
1type WaitGroup struct {
2 noCopy noCopy
3 state1 [12]byte
4 sema uint32
5}
noCopy
字段说明WaitGroup
不允许拷贝,而state1
字段是一个非常tricky
的方法,用其中的8
个字节(64bit)来保存一些状态。高位的32bit用来表示需要等待的goroutine
的数量,地位的32
bit用来表示被挂起的goroutine
的数量。至于为什么不直接使用64bit
的数据主要是为了考虑32为编译器无法保证64位对齐。sema
则是一个信号量。
1func (wg *WaitGroup) state() *uint64 {
2 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3 return (*uint64)(unsafe.Pointer(&wg.state1))
4 } else {
5 return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6 }
7}
该方法是一个辅助方法,用来获取state
,一个64为的无符号整数。
4.3.2 Add和Done方法
1func (wg *WaitGroup) Done() {
2 wg.Add(-1)
3}
Done方法其实调用了Add(-1)
方法,所以我们着重讨论Add
方法。
1func (wg *WaitGroup) Add(delta int) {
2 statep := wg.state()
3 //[...]
4 state := atomic.AddUint64(statep, uint64(delta)<<32)
5 v := int32(state >> 32)
6 w := uint32(state)
7 if race.Enabled && delta > 0 && v == int32(delta) {
8 race.Read(unsafe.Pointer(&wg.sema))
9 }
10 if v < 0 {
11 panic("sync: negative WaitGroup counter")
12 }
13 if w != 0 && delta > 0 && v == int32(delta) {
14 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15 }
16 if v > 0 || w == 0 {
17 return
18 }
19 if *statep != state {
20 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21 }
22 // Reset waiters count to 0.
23 *statep = 0
24 for ; w != 0; w-- {
25 runtime_Semrelease(&wg.sema, false)
26 }
27}
首先是获取state
,然后将delta
右移32位,加上等待的goroutine
数量。v
和w
分别代表了需要等待的goroutine
和被阻塞的goroutine
的数量。接下来v==int32(delta)
判断条件表明如果是第一次Add
操作,则必须与等待的goroutine
同步,在Wait
方法中可以看到同样的操作。接下来是一些抛异常操作,如果等待的数量为负数,如何第一次Add
操作没有同步。if >0 || w==0
条件表明如何v
没有降到零,或者被阻塞的goroutine
数量为零,直接返回。如何v
为零,则按照w
的数量,依次对信号量ws.sema
进行V
操作。
4.3.3 Wait方法
1func (wg *WaitGroup) Wait() {
2 //[...]
3 for {
4 state := atomic.LoadUint64(statep)
5 v := int32(state >> 32)
6 w := uint32(state)
7 //[...]
8 // Increment waiters count.
9 if atomic.CompareAndSwapUint64(statep, state, state+1) {
10 if race.Enabled && w == 0 {
11 race.Write(unsafe.Pointer(&wg.sema))
12 }
13 runtime_Semacquire(&wg.sema)
14 if *statep != 0 {
15 panic("sync: WaitGroup is reused before previous Wait has returned")
16 }
17 //[...]
18 return
19 }
20 }
21}
Wait
方法同样也是CAS算法,首先获取需要等待的goroutine
的数量v
和阻塞的goroutine
数量w
, 然后将阻塞的goroutine
数量+1,如果之前的w
为零,表示是第一次等待,则与Add
操作进行同步,最后后对信号量wg.sema
进行P
操作。
4.4 sync.cond.go
在编程中使用Cond
也叫管程(monitor)
,它可以用来使不同线程完成互斥条件,也可以使某个线程等待某个条件的发生。常见的使用模式如下:
1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7 cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal()
为什么使用for
循环作为判断进入Wait
的条件而不是if
呢?主要是防止为被唤醒的goroutine
在返回Wait
调用的时候,恰好有别的goroutine
修改了conditon
的值,所以需要使用for
循环作为条件判断。
4.4.1 数据结构
1type Cond struct {
2 noCopy noCopy
3 L Locker
4 notify notifyList
5 checker copyChecker
6}
Cond
结构不允许拷贝,包含了Locker
的接口字段,和一个notifyList
的集合字段。
4.4.2 NewCond函数
1func NewCond(l Locker) *Cond {
2 return &Cond{L: l}
3}
实现Locker
接口的类型都可以,一般为Mutex
和RWMutex
4.4.3 Wait方法
1func (c *Cond) Wait() {
2 c.checker.check()
3 t := runtime_notifyListAdd(&c.notify)
4 c.L.Unlock()
5 runtime_notifyListWait(&c.notify, t)
6 c.L.Lock()
7}
在使用Wait
方法之前,要调用c.L.Lock
来进入临界区域,将当前等待的goroutine
加入到通知队列中,然后调用c.L.Unlock()
来退出临界区域,以便让其他goroutine
可以进入等待区域。紧接着挂起goroutine
,等待消息。
4.4.4 Singal方法
1func (c *Cond) Signal() {
2 c.checker.check()
3 runtime_notifyListNotifyOne(&c.notify)}
runtime_notifyListNotifyOne
唤起其中的等待的goroutine
。
4.4.5 Broadcast方法
1func (c *Cond) Broadcast() {
2 c.checker.check()
3 runtime_notifyListNotifyAll(&c.notify)
4}
runtime_notifyListNotifyAll
唤起全部等待的goroutine
。
有疑问加站长微信联系(非本文作者)