探究目的
互斥锁对于日常使用来说非常简单, 但是sync.Mutex
里的状态变更, 并发控制, 原子操作, 循环体等表示很复杂, 让我探究一下里面是什么葫芦药呢!
Lock
mutex.Lock()
里的流程很简单, 只是判断m.state
能不能用atomic.CompareAndSwapInt32
上锁, 可以就直接退出, 不能则执行lockSlow()
函数, 如下图:
lockSlow()
是个既复杂又重要的函数, 只要不是即时能获取锁的都会到这里来.
在开始时先初始化几个变量:
waitStartTime
waitStartTime int64 // 开始等待时间(纳秒), 用于判断是新来的g还是唤醒的g, 还用于判断能不能切换饥饿模式.
starving := false // 当前是否饥饿
awoke := false // 当前是否已唤醒
iter := 0 // 自旋次数
old := m.state // 最近一次获取的状态
复制代码
接下来进入循环体, 逻辑复杂只能拆分来分析:
上码:
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
...(同步局部变量唤醒)
runtime_doSpin()
iter++
old = m.state
continue
}
复制代码
这段代码用来执行自旋, 不过执行前要先判断能不能自旋, 条件比较苛刻:
当前是正常模式且已锁定 (old&(mutexLocked|mutexStarving) == mutexLocked
)
自旋次数小于5次 (runtime_canSpin(iter)
)
cpu核数大于1个 (同上)
P大于1 (同上)
有一个正在运行的P并且runq为空. (同上)
执行自旋并且更新最近状态, 直到不允许自旋.
new := old
if old&mutexStarving == 0 { // 非饥饿模式下才能加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 已加锁或者在饥饿模式下, 累计加上一个等待的g
new += 1 << mutexWaiterShift // 十进制: new += 8 (第四位开始就是等待数量)
}
if starving && old&mutexLocked != 0 { // 准许切换饥饿模式并且已锁定
new |= mutexStarving // 设置饥饿模式
}
if awoke {
if new&mutexWoken == 0 {
// 唤醒状态不一致
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // mutexWoken位由 1 => 0 // 重置唤醒状态
}
复制代码
new
为即将要改变状态的变量, 对下的4个判断用来对new
的计算. 涉及到承上启下及并发逻辑, 第一次看应该比较混乱.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// ...下面代码
} else{
old = m.state
}
复制代码
如果对比交换值m.state
失败, 则代表m.state
被其它修改, 只能赋上新的状态并重新循环一次. 如果成功则进入以下代码:
if old&(mutexLocked|mutexStarving) == 0 { // 此处表示饥饿模式下不会获取锁
break // 已利用CAS获取锁
}
// waitStartTime 开始等待时间
// queueLifo 是否后入先出, 唤醒的g后入先出, 新来的g则排在队列后面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 运行时信号量互斥
// 等待唤醒
复制代码
waitStartTime
不等于0表示唤醒的g, 否则表示新来的g
runtime_SemacquireMutex()
进入内部信号量互斥(不开放), 实际上跟channel
的阻塞原理是一样的, 都是通过goparkunlock
实现. (具体看runtime.sync_runtime_SemacquireMutex()
)
// 如果大于1毫秒, 准可切换饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 获取最新状态, 睡眠前与唤醒后的状态有可能不一致
if old&mutexStarving != 0 { // 如果已经是饥饿模式
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
// 状态不一致
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7: 减小一个等待者并设置为锁定状态
if !starving || old>>mutexWaiterShift == 1 { // 如果当前g不准可饥饿模式且只有一个等待者
delta -= mutexStarving // -11: 在delta上再退出饥饿模式
}
atomic.AddInt32(&m.state, delta)
break // 退出循环(即当前g已获取锁)
}
awoke = true // 代表当前已是唤醒后的g
iter = 0 // 重置自旋次数
复制代码
中间的判断只要是进入饥饿模式都能获取锁, 新来的g永远排在后面.
Unlock
Lock()
与Unlock()
都容易阅读理解, new
表示为有多个等待者. unlockSlow()
也相对比lockSlow()
简单多了.
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 ||
old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
复制代码
非饥饿模式下:
如果没有等待者, 或者m.state
带有状态(新g抢到锁), 直接返回.
否则唤醒一个g继续执行.
处于饥饿模式下:
rumtime_Semrelease
的handoff
为真, 表示需要阻塞其他的g, 并以优先级执行等待队列的g.
小结
原理很简单, 实现确很难. 并发时控制m.state
既复杂又重要的事情, 看多几次源码和调试就能知道处理并发时的画面, 调试时注意看m.state
的变化.
sync.Mutex
的锁实现依赖着信号量, 都在这个文件实现rumtime/sema.go
, semacquire1
和semrelease1
分别是获取锁和释放锁, 代码比sync.Mutex
更加复杂.
有疑问加站长微信联系(非本文作者)