sync包 mutex源码阅读

one_zheng · · 589 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

借鉴于Go夜读,加了个人理解:https://reading.developerlearning.cn/articles/sync/sync_mutex_source_code_analysis/

go版本:go1.12 windows/amd64

结构体

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32  // 指代mutex锁当前的状态
    sema  uint32 // 信号量,用于唤醒gotoutine
}
image.png

这里图片颜色有误,末尾3个1依次代表:mutex是否被加锁,mutex是否被唤醒,mutex当前是否处于饥饿状态。

几个常量

const (
    mutexLocked = 1 << iota
    mutexWoken // 相当于 mutexWoken == 1<< 1
    mutexStarving // 相当于 mutexStarving == 1<< 2
    mutexWaiterShift = iota
    starvationThresholdNs = 1e6
)

mutexLocked值为1, 根据mutex.state & mutexLocked得到 mutex的加锁状态,结果为1表示已加锁,0表示未加锁
mutexWoken值为2(二进制:10),根据mutex.state & mutexWoken得到mutex的唤醒状态,结果为1表示已唤醒,0表示未唤醒
mutexStarving值为4(二进制:100),根据mutex.state & mutexStarving得到mutex的饥饿状态,结果为1表示处于饥饿状态,0表示处于正常状态
mutexWaiterShift值为3( 注:iota在const关键字出现时将被重置为0(const内部的第一行之前),const中每新增一行常量声明将使iota计数一次(iota可理解为const语句块中的行索引)),根据 mutex.state >> mutexWaiterShift得到当前等待的goroutine数目
starvationThresholdN值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdN,mutex进入饥饿模式。

饥饿模式与正常模式

  Mutex有两种工作模式:正常模式和饥饿模式

  在正常模式中,等待着按照FIFO的顺序排队获取锁,但是一个被唤醒的等待者有时候并不能获取mutex,它还需要和新到来的goroutine们竞争mutex的使用权。新到来的goroutine有一个优势,它们已经在CPU上运行且它们数量很多,因此一个被唤醒的等待者有很大的概率获取不到锁,在这种情况下它处在等待队列的前面。如果一个goroutine等待mutex释放的时间超过1ms,它就会将mutex切换到饥饿模式;

  在饥饿模式中,mutex的所有权直接从解锁的goroutine递交到等待队列中排在最前方的goroutine。新到达的goroutine们不要尝试去获取mutex,即便它看起来是解锁状态,也不要尝试自旋,而是排到等待队列的尾部

  如果一个等待者获取mutex的所有权,并且看到以下两种情况中的任一种: 1)它是等待队列中的最后一个, 或者2)它等待的时间少于1ms,它便将mutex切换回正常操作模式
——

函数

runtime_canSpin

自旋锁(spinlock)
  是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。

  获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成[busy-waiting]。

  golang对于自旋锁的取舍做了一些限制:1.多核; 2.GOMAXPROCS>1; 3.至少有一个运行的P并且local的P队列为空。golang的自旋尝试只会做几次,并不会一直尝试下去,感兴趣的可以跟一下源码。

func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq on on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}
 
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

runtime_doSpin
   会调用procyield函数,该函数也是汇编语言实现。函数内部[循环]调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。

runtime_SemacquireMutex
// 一个gotoutine的等待队列,如果lifo为true,则插入队列头,否则插入队尾
func runtime_SemacquireMutex(s *uint32, lifo bool)

runtime_Semrelease
// 唤醒被runtime_SemacquireMutex函数挂起的等待goroutine
// If handoff is true, pass count directly to the first waiter.
// 如果handoff为true,唤醒队列头第一个等待者,否则的话可能是随机
func runtime_Semrelease(s *uint32, handoff bool)


Lock

  Lock方法申请对mutex加锁,Lock执行的时候,分三种情况

1.无冲突 通过CAS操作把当前状态设置为加锁状态

2.有冲突 开始runtime_canSpin自旋,并等待锁释放,如果其他goroutine在这段时间内释放了该锁,直接获得该锁;如果没有释放进入3

3.有冲突,且已经过了自旋阶段 通过调用seamacquire函数来让当前goroutine进入等待状态

func (m *Mutex) Lock() {
    // 查看 state 是否为0(空闲状态), 如果是则表示可以加锁,将其状态转换为1,当前 
    // goroutine加锁成功, 函数返回,获得锁
    if atomic.CompareAndSwapInt32(&m.state,0,mutexLocked) {
         return
     }
      
     var waitStartTime int64  // 当前goroutine开始等待时间
     starving := false             // mutex 当前所处的模式
     awoke := false               // 当前 goroutine 是否被唤醒 
     iter := 0                         // 自旋迭代的次数
     old := m.state               // old 保存当前 mutex 的状态
     for {
         // 当mutex 处于加锁非饥饿工作模式且支持自旋操作的时候
          if old &(mutexLocked | mutexStarving) == mutexLocked && runtime_canSpin(iter) {
               // 将 mutex.state 的倒数第二位设置为1,用来告 Unlock 操作,存在 goroutine 即将得到锁,不需要唤醒其他 goroutine
               if  !awoke && old&muteWoken == 0 && old >> mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                   awoke = true
              }
              // 自旋循环
              runtime_doSpin()
              iter++
              old = m.state
              continue
          }
          // 1.能进到此处,则表明不为加锁模式
          new := old
          // 当 mutex 不处于饥饿状态的时候,将 new的第一位设置为 1,即 加锁
          if old&mutexStarving == 0 {
              new |= mutexLocked
          } 
          // 当mutex 处于加锁或饥饿状态的时候,新到来的goroutine进入等待队列
          // 2.此处需要判断是否为加锁状态,因为从1到2的时候可能mutex 重新被其他goroutine加锁了
          if old&(mutexLocked|mutexStarving) != 0 {
              new += 1<< mutexWaiterShift  // 等待队列进1位
          }
          // 当前 goroutine 将 mutex 切换为饥饿状态,但如果当前 mutex 未加锁,则不需要切换 Unlock 操作希望饥饿模式存在等待者
          //  3.starving条件 是为了防止 如果在2处判断mutex没有处于加锁,而在这里判断mutex却加锁了,这时候加入饥饿模式,可是goroutine没有入列
          if starving && old&mutexLocked != 0 {
               new |= mutexStarving
          }
          // 当前goroutine已经被唤醒
          if awoke {
            // 当前 goroutine 被唤醒,将 mutex.state 倒数第二位重置
            if new&mutexWoken == 0 {
              throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // 调用 CAS 更新 state 状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // mutex既不加锁也不饥饿,正常模式下,当前gotoutine获得锁,直接跳出
            if old& (mutexLocked | mutexStarving) == 0 {
                  break
            }
           // queueLifo 为 true 代表当前 goroutine 是等待状态的 goroutine
           queueLifo := waitStartTime != 0
           if waitStartTime == 0 {
              // 记录开始等待时间
              waitStartTime = runtime_nanotime()
           }
           // 将被唤醒却没得到锁的 goroutine 插入当前等待队列的最前端
           runtime_SemacquireMutex(&m.sema, queueLifo)
           // 如果当前 goroutine 等待时间超过starvationThresholdNs,mutex 进入饥饿模式
           starving = starving || runtime_nanotimne()-waitStartTime > starvationThresholdNs
           old = m.state
           if old&mutexStarving != 0 {
                // 如果为饥饿模式,但是不为加锁或者等待队列为0,抛异常
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                } 
                // 等待状态的 goroutine - 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 如果不是饥饿模式了或者当前等待着只剩下一个,退出饥饿模式
                if !starving || old>>mutexWaiterShift == 1 {
                   delta -= mutexStarving
                }
              // 更新状态
              atomic.AddInt32(&m.state, delta)
              break
          }
       }else {
        old = m.state
       }
    }   
}

Unlock

Unlock方法释放所申请的锁

一个Mutex的lock方法并不跟一个特定的goroutine绑定,一个Mutex对象允许被一个goroutine lock,并被另一个goroutine unlock。

func (m *Mutex) Unlock() {
    // mutex 的state减去1, 加锁状态 -> 未加锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
   // 未 Lock 直接 Unlock(),报 panic
   if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
   }
   // mutex 正常模式
   if new&mutexStarving == 0 {
              // 如果没有等待者,或者已经存在一个 goroutine 被唤醒或得到锁,或处于饥饿模式
         // 无需唤醒任何处于等待状态的 goroutine
             // 因为lock方法存在自旋一直在获取锁,所以可能解锁后就已经有goroutine获取到锁了
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
             return
         }
             // 等待者数量减1,并将唤醒位改成1
         new := (old - 1<<mutexWaitShift) | mutexWoken
         if atomic.ComnpareAndSwapInt32(&m.state, old, new) {
               // 唤醒一个阻塞的 goroutine,但不是唤醒第一个等待者
               runtime_Semrelease(&m.sema, false)
               return
         }
    }else {
          // mutex 饥饿模式,直接将 mutex 拥有权移交给等待队列最前端的 goroutine
          runtime_Semrelease(&m.sema, true)
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:one_zheng

查看原文:sync包 mutex源码阅读

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

589 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传