Go Mutex 源码学习

大二小的宝 · · 665 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

概述

互斥锁是并发程序中对共享资源进行访问控制的主要手段,Mutex是go语言提供的简单易用的互斥锁。Mutex的结构很简单,暴露的方法也只有2个,一个加锁 一个解锁。那么我们每天用的Mutex互斥锁是如何实现的呢?
其实使用的是go语言automic包中的院子操作,具体如何使用可以参考之前写的文章
在Mutex中的state是状态码,在mutex中把state分成4段。如下图:
图片描述

  • Locked:表示是否上锁 上锁为1 未上锁为0
  • Woken:表示是否被唤醒,唤醒为1 未唤醒为0
  • Starving:表示是否为饥饿模式,饥饿模式为1 非饥饿模式为0
  • waiter:剩余的29位则为等待的goroutine数量

互斥锁的实现其实就是争夺Locked,当goroutineA 抢到了锁之后,第二个GoroutineB获取锁则会被阻塞等到GoroutineA释放锁之后GoroutineB将会被唤醒。当然具体实现则不会有这么简单,其中还有饥饿模式,自旋函数等一些概念。

前置概念

自旋

什么是自旋

加锁时,如果当前Locked位为1,说明该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测Locked位是否变为0,这个过程即为自旋过程。自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。此时即便有协程被唤醒也无法获取锁,只能再次阻塞。
自旋的好处是,当加锁失败时不必立即转入阻塞,有一定机会获取到锁,这样可以避免协程的切换。

自旋的问题

如果自旋过程中获得锁,则马上执行该goroutine。如果永远在自旋模式中那么之前阻塞的goroutine则很难获得锁,这样一来一些goroutine则会被阻塞时间过长。如何解决这个问题,go mutex中引入了两种模式,具体请看下文。

Mutex的两种模式

普通模式

在普通模式下等待者以 FIFO 的顺序排队来获取锁,但被唤醒的等待者发现并没有获取到 mutex,并且还要与新到达的 goroutine 们竞争 mutex 的所有权。

饥饿模式

在饥饿模式下,mutex 的所有权直接从对 mutex 执行解锁的 goroutine 传递给等待队列前面的等待者。新到达的 goroutine 们不要尝试去获取 mutex,即使它看起来是在解锁状态,也不要试图自旋。

源码分析

Mutex对象

type Mutex struct {
    // 状态码
    state int32
    // 信号量,用于向处于 Gwaitting 的 G 发送信号
    sema  uint32
}

const(
    // 值=1 表示是否锁住 1=锁 0=未锁
    mutexLocked = 1 << iota // mutex is locked
    // 值=2 表示是否被唤醒 1=唤醒  0=未唤醒
    mutexWoken
    // 是否为饥渴模式(等待超过1秒则为饥渴模式)
    mutexStarving
    // 右移3位,为等待的数量
    mutexWaiterShift = iota
    // 饥饿模式的时间 
    starvationThresholdNs = 1e6
)

加锁

func (m *Mutex) Lock() {
    // 利用atomic包中的cas操作判断是否上锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 判断是否启用了race检测
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        // m.state = 1 直接返回 其他goroutine调用lock会发现已经被上锁
        return
    }
    
    // 等待时间
    var waitStartTime int64
    // 饥饿模式标志位
    starving := false
    // 唤醒标志位
    awoke := false
    // 自旋迭代的次数
    iter := 0
    // 保存 mutex 当前状态
    old := m.state
    // 循环
    for {
        // 判断 如果不是饥饿模式并且是否能够执行自旋函数(判断自旋次数)
        // old&(0001|0100) == 0001 ==> old&0101 
        // 当old为0001为非饥饿模式  0001 == 0001 true     当old为0101饥饿模式  0101 == 0001 false  
        // runtime_canSpin 判断自旋少于4次,并且是多核机器上并且GOMAXPROCS>1
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 判断条件:
            // 未被唤醒 && 等待数量不为0 && 使用CAS设置状态为已唤醒 
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                // 设置激活为true
                awoke = true
            }
            // 自旋函数 自旋次数+1
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        
        // 如果不能执行自旋函数 记录一个new状态 然后判断改变new 最终使用CAS替换尝试设置state属性
        new := old
        // 当前的mutex.state处于正常模式,则将new的锁位设置为1
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 如果当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // 如果starving变量为true并且处于锁定状态,则new的饥饿状态位打开
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 对于状态的验证
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // new已经判断设置完,如果mutex的state没有变动过的话 则替换成new 
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果未被锁定并且并不是出于饥饿状态 退出循环 goroutine获取到锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 如果当前的 goroutine 之前已经在排队了,就排到队列的前面。
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 进入休眠状态,等待信号唤醒后重新开始循环 如果queueLifo为true,则将等待goroutine插入到队列的前面
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 计算等待时间 确定 mutex 当前所处模式
            // 此时这个goroutine已经被唤醒 
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 判断被唤醒的goroutine是否为饥饿状态
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 当不是饥饿状态或者等待数只有一个,则退出饥饿模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 如果不是饥饿模式 让新到来的 goroutine 先获取锁,继续循环
            awoke = true
            iter = 0
        } else {
            // 如果CAS替换未能成功 则继续循环
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

解锁

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 利用原子操作 设置state锁位置为0
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 判断状态,给未加锁的mutex解锁,抛出错误
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 判断是否为饥饿模式
    if new&mutexStarving == 0 {
        // 正常状态
        old := new
        for {
            // 如果等待的goroutine为零 || 已经被锁定、唤醒、或者已经变成饥饿状态
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 更新new的值,减去等待数量
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // 使用CAS 替换旧值
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果替换成功 则恢复挂起的goroutine.r如果为 true表明将唤醒第一个阻塞的goroutine
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 恢复挂起的goroutine.r如果为 true表明将唤醒第一个阻塞的goroutine
        runtime_Semrelease(&m.sema, true)
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:大二小的宝

查看原文:Go Mutex 源码学习

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

665 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传