mutex.go

killtl · · 920 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

概述

mutex.go是golang中针对互斥锁的实现,内部仅提供两个方法,分别是Lock()Unlock,同时定义了几个常量和一个Mutex结构,如下

type Mutex struct {
    state int32     // 互斥锁上锁状态枚举值如下所示
    sema  uint32    // 信号量,向处于Gwaitting的G发送信号
}

const (
    mutexLocked = 1 << iota // 1 互斥锁是锁定的
    mutexWoken              // 2 唤醒锁
    mutexWaiterShift = iota // 2 统计阻塞在这个互斥锁上的goroutine数目需要移位的数值
)

如果对Mutex进行复制,可能会导致锁失效,因为内部都是值复制,相当于复制了一把新锁,mutexLocked标识Mutex.state的最低位的值,mutexWoken标识Mutex.state的倒数第二低位的值,mutexWaiterShift标识阻塞等待锁的goroutine的数量(计算方式为 Mutex.state >> mutexWaiterShift),所以可以用于表示阻塞数量的二进制位数为32-2=30

前置知识

  • doc.go中的原子操作,参见上一篇博文doc.go
  • runtime_canSpin golang中实现的自旋(类似发动机空转),在Mutex中主要用于短暂占用cpu时间避免当前goroutine进入睡眠状态(因为大量的Mutex使用场景都是在小片段代码,锁和解锁的操作间隔很短,新的goroutine可以自旋一段时间尝试获取锁)
  • runtime_doSpin 让CPU pause一段时间,配合runtime_canSpin使用
  • runtime_SemacquireMutex 睡眠
  • runtime_Semrelease 唤醒

小实现

基于锁的原理,我们可以自己通过原子操作实现一把非常简单的锁,如下代码

package main

import (
    "sync/atomic"
    "time"
    "sync"
)

type Mutex struct {
    state int32  // 锁状态 0未锁/1已锁
}

func (m *Mutex) Lock() {
    for {
                // 原子cas操作
        if atomic.CompareAndSwapInt32(&m.state, 0, 1) {
            break
        }
                // 睡眠一秒
        time.Sleep(time.Second)
    }
}

func (m *Mutex) Unlock() {
        // 重复解锁或者未锁状态解锁报异常
    if !atomic.CompareAndSwapInt32(&m.state, 1, 0) {
        panic("lock state error")
    }
}

func main() {
    var wg sync.WaitGroup
    var mu Mutex
    wg.Add(100)

    f := func(index int) {
        defer wg.Done()
        mu.Lock()
        time.Sleep(time.Microsecond * 10)
        mu.Unlock()
    }

    for i := 100; i > 0; i-- {
        go f(i)
    }

    wg.Wait()
}

上面这个实现是十分简陋的,后面可以看到google的大神们是怎么玩出花来的

Lock

原型

func (m *Mutex) Lock()

源码

func (m *Mutex) Lock() {
        // 先使用CAS尝试获取锁
        // 上面我们的简版实现就用到了 CompareAndSwapInt32
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 这里不需要管它,是用于竞争检测的
        // 下同 
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        // 成功获取返回
        return
    }

    awoke := false  // 唤醒标记
    iter := 0       // 自旋计数器
    for {
        old := m.state // 获取当前锁状态
        // 将当前状态最后一位指定1
        // 没拿到就是锁住了,拿到了也会锁住,反正都是锁住
        new := old | mutexLocked  
        if old&mutexLocked != 0 {  // 如果被锁住了
            if runtime_canSpin(iter) {  // 检查是否可以进入自旋锁
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { 
                    awoke = true  // 设置唤醒标记为true,主要配合Unlock
                } 
                runtime_doSpin()  // 等一会 
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift  // 没有获取到锁,当前goroutine进入等待队列
        }
        if awoke {
            // 检查状态是否一致,是否真的被Unlock唤醒
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
                        //清除标记
            new &^= mutexWoken
        }
        // 更新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // old锁是未锁定,new锁前面已经设置成锁定并通过原子操作更新成功,那就是当前goroutine获取到锁了
            if old&mutexLocked == 0 {
                break
            }
                         
            // 锁请求失败,进入休眠状态,等待信号唤醒后重新开始循环
            runtime_SemacquireMutex(&m.sema)
            // 被唤醒,设置唤醒标志
            awoke = true
            // 清零自旋次数 
            iter = 0
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

总结下上面一些有意思的点

  • 使用自旋
  • 针对写锁状态,务必使用原子操作
  • 使用锁状态的old版本进行判断,使用new版本进行更新,更新过程务必保证当前锁状态=old且更新过程必须是原子操作

Unlock

原型

func (m *Mutex) Unlock()

源码

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 移除标记
    new := atomic.AddInt32(&m.state, -mutexLocked)
       // 判断是否重复unlock
       // 这里先原子更改再判断更改后的值
       // 比先判断后更改更安全且更简单,不信你可以试试,注意考虑并发场景
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        //当休眠队列内的等待计数为0或者已有goroutine获得锁或者已有运行中的获取锁goroutine
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        // 没有goroutine主动来获取锁
        // 好吧,只有我来唤醒睡眠中的来取锁了
        // 等待队列数量减1,设置唤醒标志位
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
                        // 发送释放信号
            runtime_Semrelease(&m.sema)
            return
        }
        // 上面都没命中,循环搞
        old = m.state
    }
}

Unlock相对来说更简单点


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:killtl

查看原文:mutex.go

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

920 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传