【golang源码分析】sync.Mutex
概述
Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。
goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。
在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个waiter等待获取锁的时间超过1ms,会将mutex转换成starvation模式。
在starvation模式,当mutex被unlock时,持有权会直接移交到队首的goroutine中。新加入的goroutine不会再参与锁的竞争,而是直接放入等待队列的尾部。
mutex通过一个state变量来维护锁的状态信息。
- 低一位mutexLocked:表示锁是否被锁定
- 低二位mutexWoken:表示是否有goroutine在竞争锁,这个主要应用于unlock时判断是否有必要唤醒等待队列中的goroutine
- 低三位mutexStarving:表示锁的模式starvation或normal,剩余高位用于标志等待队列的长度。
代码分析
定义
Mutex实现了Locker接口,维护两个变量——state用来维护锁的状态,同时通过操作sema来唤醒和休眠等待goroutine
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
Lock
完整代码
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
详解
- 首先默认state为0,通过cas操作将锁的状态更新为mutexLocked,尝试快速获取锁。在并发度较小的情况下,mutex多数处于初始状态,可以提供加锁性能。如果快速获取锁失败,goroutine会不断地进行自旋抢锁、休眠、唤醒抢锁,直到抢到锁后break。在这期间,需要通过cas对state进行更新,只有更新成功的goroutine才能进行下一阶段的操作。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
- 竞争锁的goroutine会先自旋一段时间直到mutex被unlocked、或切换到starvation模式、或自旋次数达到上限。自旋过程中,goroutine也会尝试通过cas将mutexWoken位置1,以通知Unlock操作不必唤醒等待队列中的goroutine。starvatione模式是不需要自旋的,因为锁的持有权会直接移交给等待队列队首的goroutine,自旋操作没有实际意义。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 当前goroutine未更新成功过woken位,woken位为0,等待队列非空,通过cas尝试更新woken位
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // pause一段时间
iter++
old = m.state
continue
}
- 从前面代码可以看到,退出自旋有三种原因,及其对应的处理如下:
- mutex被Unlock:将mutexLocked置1,代表尝试抢锁,然后cas尝试更新state
- mutex切换到starvation模式:不改变mutexLocked,waiter数量+1,cas更新state成功后后直接加入到等待队列里。
- 自旋次数达到上限:不改变mutexLocked,waiter数量+1,如果starving变量为true,则将mutexStarving位置1。
starving是goroutine被唤醒时通过计算等待时间获取的,大于1ms则为true,表示需要切换到starvation模式。
这里需要注意,当mutex为unlocked的时候,不需要转换mutex模式。因为starving为true,说明该goroutine是normal模式下等待队列中刚被唤醒的waiter(原因需要看后面的代码,在starvation模式。会直接将mutex的持有权交给唤醒的goroutine,走不到这一步)。如果此时做了状态切换,可能会出现在starvation模式,等待队列为空的情况。因此,只有自旋抢锁失败的情况下,才会由等待时间超过阈值的goroutine将mutex模式切换到starvation。
new := old
// 锁被unlocked,将mutexLocked置1,代表尝试抢锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// starving或自旋次数到上限, 等待数量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果已经是starvation状态,这一步没什么意义
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 把awoke位给清掉
new &^= mutexWoken
}
- cas 更新mutex状态,失败则重复上述操作,直到状态更新成功。
- 如果是获取锁的动作,则直接break。
- 否则将goroutine放到等待队列中,若waitStartTime不为0,则说明是waiter抢锁失败,应该重新返回队首。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 获取锁成功,break。接下来要处理starving和自旋次数达到上限的情况
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// waitStartTime说明是被唤醒的goroutine抢锁失败了,应该重新放回队首(queueLifo=true)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 休眠
runtime_SemacquireMutex(&m.sema, queueLifo)
- 被唤醒后,如果当前starving为true,或等待时间大于阈值,则将starving置为true,下一次自旋抢锁失败会将mutex切换到starvation模式。在starvation模式,该goroutine直接获取锁,否则的话要重复前面的操作跟新来的goroutine抢锁
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// starving模式不应该出现mutex被locked或woken,等待队列长度也不能为0,有的话说明这段代码逻辑有bug
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 获取锁并将等待数-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 等待时间小于阈值,或该goroutine为等待队列中最后一个waiter,切换到normal模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 把状态更新过去,获取锁并退出。这里不用cas,因为前面操作保证了低三位都不会被更新,高位本来是原子计数,所以直接add就可以
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
Unlock
// mutex允许一个goroutine加锁,另一个goroutine解锁,不要求两个操作保证同一个goroutine
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 直接解锁,然后判断一下状态,如果不一致的话则抛异常
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// starvation模式,则直接唤醒blocked队列队首goroutine
// normal模式,如果等待队列为空,或此时锁的状态发生了变化,则不用唤醒等待队列中的goroutine
// 否则的话,将等待数减1,cas更新state,成功则唤醒一个waiter
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// starving模式,直接将持有权交给下一个waiter。mutexLocked是在队首waiter被唤醒后更新的,所以此时mutexLocked还是0,但是在starving
// 模式下,新来的goroutine不会更新mutexLocked位。配合Lock代码看。
runtime_Semrelease(&m.sema, true)
}
}
\
practice/golang/源码分析
有疑问加站长微信联系(非本文作者)