探究sync.Mutex代码流程细节

何裕发 · · 588 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

探究目的

互斥锁对于日常使用来说非常简单, 但是sync.Mutex里的状态变更, 并发控制, 原子操作, 循环体等表示很复杂, 让我探究一下里面是什么葫芦药呢!

Lock

mutex.Lock()里的流程很简单, 只是判断m.state能不能用atomic.CompareAndSwapInt32上锁, 可以就直接退出, 不能则执行lockSlow()函数, 如下图:

lockSlow()是个既复杂又重要的函数, 只要不是即时能获取锁的都会到这里来.

在开始时先初始化几个变量: waitStartTime

waitStartTime int64	    // 开始等待时间(纳秒), 用于判断是新来的g还是唤醒的g, 还用于判断能不能切换饥饿模式.
starving := false       // 当前是否饥饿
awoke := false          // 当前是否已唤醒
iter := 0              // 自旋次数
old := m.state         // 最近一次获取的状态
复制代码
接下来进入循环体, 逻辑复杂只能拆分来分析:

上码:

if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    ...(同步局部变量唤醒)
    runtime_doSpin()
    iter++
    old = m.state
    continue
}
复制代码

这段代码用来执行自旋, 不过执行前要先判断能不能自旋, 条件比较苛刻: 当前是正常模式且已锁定 (old&(mutexLocked|mutexStarving) == mutexLocked) 自旋次数小于5次 (runtime_canSpin(iter)) cpu核数大于1个 (同上) P大于1 (同上) 有一个正在运行的P并且runq为空. (同上) 执行自旋并且更新最近状态, 直到不允许自旋.

new := old
if old&mutexStarving == 0 { // 非饥饿模式下才能加锁
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 已加锁或者在饥饿模式下, 累计加上一个等待的g
    new += 1 << mutexWaiterShift		 // 十进制: new += 8 (第四位开始就是等待数量)
}
if starving && old&mutexLocked != 0 { // 准许切换饥饿模式并且已锁定
    new |= mutexStarving // 设置饥饿模式
}
if awoke {
    if new&mutexWoken == 0 {
        // 唤醒状态不一致
        throw("sync: inconsistent mutex state")
    }
    new &^= mutexWoken // mutexWoken位由 1 => 0 // 重置唤醒状态
}
复制代码

new为即将要改变状态的变量, 对下的4个判断用来对new的计算. 涉及到承上启下及并发逻辑, 第一次看应该比较混乱.

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...下面代码
} else{ 
    old = m.state
}
复制代码

如果对比交换值m.state失败, 则代表m.state被其它修改, 只能赋上新的状态并重新循环一次. 如果成功则进入以下代码:

if old&(mutexLocked|mutexStarving) == 0 { // 此处表示饥饿模式下不会获取锁
    break // 已利用CAS获取锁
}
// waitStartTime 开始等待时间
// queueLifo 是否后入先出, 唤醒的g后入先出, 新来的g则排在队列后面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
}

runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 运行时信号量互斥
// 等待唤醒
复制代码

waitStartTime不等于0表示唤醒的g, 否则表示新来的g runtime_SemacquireMutex()进入内部信号量互斥(不开放), 实际上跟channel的阻塞原理是一样的, 都是通过goparkunlock实现. (具体看runtime.sync_runtime_SemacquireMutex())

// 如果大于1毫秒, 准可切换饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs 
old = m.state // 获取最新状态, 睡眠前与唤醒后的状态有可能不一致
if old&mutexStarving != 0 { // 如果已经是饥饿模式
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
        // 状态不一致
        throw("sync: inconsistent mutex state")
    }
    delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7: 减小一个等待者并设置为锁定状态
    if !starving || old>>mutexWaiterShift == 1 {     // 如果当前g不准可饥饿模式且只有一个等待者
        delta -= mutexStarving  // -11: 在delta上再退出饥饿模式
    }
    atomic.AddInt32(&m.state, delta)
    break   // 退出循环(即当前g已获取锁)
}
awoke = true // 代表当前已是唤醒后的g
iter = 0    // 重置自旋次数
复制代码

中间的判断只要是进入饥饿模式都能获取锁, 新来的g永远排在后面.

Unlock

Lock()Unlock()都容易阅读理解, new表示为有多个等待者. unlockSlow()也相对比lockSlow()简单多了.

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || 
            old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}
复制代码

非饥饿模式下: 如果没有等待者, 或者m.state带有状态(新g抢到锁), 直接返回. 否则唤醒一个g继续执行.

处于饥饿模式下: rumtime_Semreleasehandoff为真, 表示需要阻塞其他的g, 并以优先级执行等待队列的g.

小结

原理很简单, 实现确很难. 并发时控制m.state既复杂又重要的事情, 看多几次源码和调试就能知道处理并发时的画面, 调试时注意看m.state的变化. sync.Mutex的锁实现依赖着信号量, 都在这个文件实现rumtime/sema.go, semacquire1semrelease1分别是获取锁和释放锁, 代码比sync.Mutex更加复杂.


有疑问加站长微信联系(非本文作者)

本文来自:掘金

感谢作者:何裕发

查看原文:探究sync.Mutex代码流程细节

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

588 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传