借鉴于Go夜读,加了个人理解:https://reading.developerlearning.cn/articles/sync/sync_mutex_source_code_analysis/
go版本:go1.12 windows/amd64
结构体
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 指代mutex锁当前的状态
sema uint32 // 信号量,用于唤醒gotoutine
}
这里图片颜色有误,末尾3个1依次代表:mutex是否被加锁,mutex是否被唤醒,mutex当前是否处于饥饿状态。
几个常量
const (
mutexLocked = 1 << iota
mutexWoken // 相当于 mutexWoken == 1<< 1
mutexStarving // 相当于 mutexStarving == 1<< 2
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
mutexLocked值为1, 根据
mutex.state & mutexLocked
得到 mutex的加锁状态,结果为1表示已加锁,0表示未加锁
mutexWoken值为2(二进制:10),根据mutex.state & mutexWoken
得到mutex的唤醒状态,结果为1表示已唤醒,0表示未唤醒
mutexStarving值为4(二进制:100),根据mutex.state & mutexStarving
得到mutex的饥饿状态,结果为1表示处于饥饿状态,0表示处于正常状态
mutexWaiterShift值为3( 注:iota在const关键字出现时将被重置为0(const内部的第一行之前),const中每新增一行常量声明将使iota计数一次(iota可理解为const语句块中的行索引))
,根据mutex.state >> mutexWaiterShift
得到当前等待的goroutine数目
starvationThresholdN值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdN,mutex进入饥饿模式。
饥饿模式与正常模式
Mutex有两种工作模式:正常模式和饥饿模式
在正常模式中,等待着按照FIFO的顺序排队获取锁,但是一个被唤醒的等待者有时候并不能获取mutex,它还需要和新到来的goroutine们竞争mutex的使用权。新到来的goroutine有一个优势,它们已经在CPU上运行且它们数量很多,因此一个被唤醒的等待者有很大的概率获取不到锁,在这种情况下它处在等待队列的前面。如果一个goroutine等待mutex释放的时间超过1ms,它就会将mutex切换到饥饿模式;
在饥饿模式中,mutex的所有权直接从解锁的goroutine递交到等待队列中排在最前方的goroutine。新到达的goroutine们不要尝试去获取mutex,即便它看起来是解锁状态,也不要尝试自旋,而是排到等待队列的尾部
如果一个等待者获取mutex的所有权,并且看到以下两种情况中的任一种: 1)它是等待队列中的最后一个, 或者2)它等待的时间少于1ms,它便将mutex切换回正常操作模式
——
函数
runtime_canSpin
自旋锁(spinlock)
:
是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。
获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成[busy-waiting]。
golang对于自旋锁的取舍做了一些限制:1.多核; 2.GOMAXPROCS>1; 3.至少有一个运行的P并且local的P队列为空。golang的自旋尝试只会做几次,并不会一直尝试下去,感兴趣的可以跟一下源码。
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
runtime_doSpin
会调用procyield函数,该函数也是汇编语言实现。函数内部[循环]调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。
runtime_SemacquireMutex
// 一个gotoutine的等待队列,如果lifo为true,则插入队列头,否则插入队尾
func runtime_SemacquireMutex(s *uint32, lifo bool)
runtime_Semrelease
// 唤醒被runtime_SemacquireMutex函数挂起的等待goroutine
// If handoff is true, pass count directly to the first waiter.
// 如果handoff为true,唤醒队列头第一个等待者,否则的话可能是随机
func runtime_Semrelease(s *uint32, handoff bool)
Lock
Lock方法申请对mutex加锁,Lock执行的时候,分三种情况
1.无冲突
通过CAS操作把当前状态设置为加锁状态
2.有冲突 开始runtime_canSpin自旋
,并等待锁释放,如果其他goroutine在这段时间内释放了该锁,直接获得该锁;如果没有释放进入3
3.有冲突,且已经过了自旋阶段
通过调用seamacquire函数来让当前goroutine进入等待状态
func (m *Mutex) Lock() {
// 查看 state 是否为0(空闲状态), 如果是则表示可以加锁,将其状态转换为1,当前
// goroutine加锁成功, 函数返回,获得锁
if atomic.CompareAndSwapInt32(&m.state,0,mutexLocked) {
return
}
var waitStartTime int64 // 当前goroutine开始等待时间
starving := false // mutex 当前所处的模式
awoke := false // 当前 goroutine 是否被唤醒
iter := 0 // 自旋迭代的次数
old := m.state // old 保存当前 mutex 的状态
for {
// 当mutex 处于加锁非饥饿工作模式且支持自旋操作的时候
if old &(mutexLocked | mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 将 mutex.state 的倒数第二位设置为1,用来告 Unlock 操作,存在 goroutine 即将得到锁,不需要唤醒其他 goroutine
if !awoke && old&muteWoken == 0 && old >> mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋循环
runtime_doSpin()
iter++
old = m.state
continue
}
// 1.能进到此处,则表明不为加锁模式
new := old
// 当 mutex 不处于饥饿状态的时候,将 new的第一位设置为 1,即 加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 当mutex 处于加锁或饥饿状态的时候,新到来的goroutine进入等待队列
// 2.此处需要判断是否为加锁状态,因为从1到2的时候可能mutex 重新被其他goroutine加锁了
if old&(mutexLocked|mutexStarving) != 0 {
new += 1<< mutexWaiterShift // 等待队列进1位
}
// 当前 goroutine 将 mutex 切换为饥饿状态,但如果当前 mutex 未加锁,则不需要切换 Unlock 操作希望饥饿模式存在等待者
// 3.starving条件 是为了防止 如果在2处判断mutex没有处于加锁,而在这里判断mutex却加锁了,这时候加入饥饿模式,可是goroutine没有入列
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 当前goroutine已经被唤醒
if awoke {
// 当前 goroutine 被唤醒,将 mutex.state 倒数第二位重置
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 调用 CAS 更新 state 状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// mutex既不加锁也不饥饿,正常模式下,当前gotoutine获得锁,直接跳出
if old& (mutexLocked | mutexStarving) == 0 {
break
}
// queueLifo 为 true 代表当前 goroutine 是等待状态的 goroutine
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 记录开始等待时间
waitStartTime = runtime_nanotime()
}
// 将被唤醒却没得到锁的 goroutine 插入当前等待队列的最前端
runtime_SemacquireMutex(&m.sema, queueLifo)
// 如果当前 goroutine 等待时间超过starvationThresholdNs,mutex 进入饥饿模式
starving = starving || runtime_nanotimne()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 如果为饥饿模式,但是不为加锁或者等待队列为0,抛异常
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 等待状态的 goroutine - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不是饥饿模式了或者当前等待着只剩下一个,退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新状态
atomic.AddInt32(&m.state, delta)
break
}
}else {
old = m.state
}
}
}
Unlock
Unlock方法释放所申请的锁
一个Mutex的lock方法并不跟一个特定的goroutine绑定,一个Mutex对象允许被一个goroutine lock,并被另一个goroutine unlock。
func (m *Mutex) Unlock() {
// mutex 的state减去1, 加锁状态 -> 未加锁
new := atomic.AddInt32(&m.state, -mutexLocked)
// 未 Lock 直接 Unlock(),报 panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// mutex 正常模式
if new&mutexStarving == 0 {
// 如果没有等待者,或者已经存在一个 goroutine 被唤醒或得到锁,或处于饥饿模式
// 无需唤醒任何处于等待状态的 goroutine
// 因为lock方法存在自旋一直在获取锁,所以可能解锁后就已经有goroutine获取到锁了
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 等待者数量减1,并将唤醒位改成1
new := (old - 1<<mutexWaitShift) | mutexWoken
if atomic.ComnpareAndSwapInt32(&m.state, old, new) {
// 唤醒一个阻塞的 goroutine,但不是唤醒第一个等待者
runtime_Semrelease(&m.sema, false)
return
}
}else {
// mutex 饥饿模式,直接将 mutex 拥有权移交给等待队列最前端的 goroutine
runtime_Semrelease(&m.sema, true)
}
}
有疑问加站长微信联系(非本文作者)