信号量,锁和 golang 相关源码分析

mb5fdb0a1b25659 · · 283 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

  • 1资源同步

    • 1.1 解决方案

  • 2 信号量

    • 2.1 共享变量

    • 2.2 信号量

  • 3 锁

    • 3.1 死锁

    • 3.2 活锁

    • 3.3 饥饿

  • 4 Golang sync 包

    • 4.4.1 数据结构

    • 4.4.2 NewCond函数

    • 4.4.3 Wait方法

    • 4.4.4 Singal方法

    • 4.4.5 Broadcast方法

    • 4.3.1 数据结构

    • 4.3.2 Add和Done方法

    • 4.3.3 Wait方法

    • 4.2.1 常量和结构

    • 4.2.1 RLock和RUnlock方法

    • 4.2.2 Lock和Unlock方法

    • 4.1.1 接口和结构

    • 4.1.2 Lock 方法

    • 4.1.3 Unlock方法

    • 4.1 sync.mutex.go

    • 4.2 sync.rwmutex.go

    • 4.3 sync.waitgroup.go

    • 4.4 sync.cond.go

1资源同步

并发已经成为现代程序设计中的重要考虑内容,但是并发涉及到一个很重要的内容就是资源同步。当两个或者多个线程访问同样的资源的时候,运行的结果取决于线程运行时精确的时序。这样导致结果与期望的结果大相径庭,因此我们需要对资源的访问顺序进行控制,已达到资源同步的目的。

1.1 解决方案

将共享资源(也就是共享内存)的程序片段成为临界区域,通过适当安排,使得两个者线程同时位于临界区域。对于临界区域访问解决方案,需要满足如下4个条件

  • 任何两个线程不能同时位于临界区

  • 不对CPU执行速度和时间做任何假设

  • 临界区外运行的线程不阻塞其他线程

  • 不能使线程无限期等待进入临界区

常见的互斥解决方案:

屏蔽中断线程的切换是由CPU中断机制提供的,如果一个线程进入临界区域后,CPU关闭中断响应;在离开临界区域后,再打开中断机制。那么在临界区域将不会有其他线程来竞争资源。 当时将屏蔽中断权利交给用户空间执行是不明智的,而且对于多核CPU而言没有效果。

锁变量几乎每一个编程语言都提供了资源同步方式:锁机制。该机制通过对资源进行LockUnlock,以达到对关键资源有序访问。

严格轮换法线程不停的执行CPU时间,连续测试某一个值是否出现。但是如果认为等待的时间非常短,可以使用该方式浪费CPU时间,用于等待的锁也成为自旋锁

2 信号量

2.1 共享变量

在理解信号量之前,先了解采用共享变量使用多线程会出现什么问题。下面是一个C代码片段

1for (i=0; i<niters; i++){
2    cnt ++;
3}

cnt为全局变量,一个线程执行该代码片段的时候的汇编代码如下:

 1   movq (%rdi), %rcx
2    testq %rcx, %rcx
3    jle .L2
4    movl $0, %eax
5.L3:
6    movq cnt(%rip), %rdx
7    addq %eax
8    movq %eax, cnt(%rip)
9    addq $1, %rax
10    cmpq %rcx, %rax
11    jne .L3
12.L2


2.2 信号量其中6-8行分别对应对应着加载cnt,更新cnt和存储cnt。将cnt变量从内存位置读出,加载到CPU寄存器中,在CPU运算器中加1,然后存储到cnt的内存位置。虽然代码中cnt++只有一行,但是转换为汇编代码的时候不只有一个操作,也就是说该语句不是原子操作。如果多个线程同时执行代码,按照之前的条件,不对CPU的执行顺序做任何假设,如果其中线程a在执行7行汇编代码,而线程b执行6行汇编代码,那么b将"看不到"线程a对全局变量cnt加1的操作,那么每次执行的结果cnt也不完全一致。


计算机领域先驱Dijkstra提出经典的解决上述问题的方法:信号量(semaphore)。它是一个非负整数的全局变量。而且该变量只能有两个特殊操作来处理: PV

  • P(s): 如果s非零,那么Ps1,并且立即返回。如果s为零,那么就挂起这个线程,知道s为非零。

  • V(s): V操作将s1。如果有任何线程阻塞在P操作等待s非零,那么V将重启其中线程中的一个。

Posix标准定义需要操作信号量的函数

1#include <semaphore.h>
2int sem_init(sem_t *sem, 0unsigned int value);
3int sem_wait(sem_t *s)/*P(s)*/
4int sem_post(sem_t *s)/*P(s)*/

那么如何使用信号量是的2.1小节出现同步问题解决呢?首先定义全局信号量

1volatile long cnt = 0/* global variable */
2sem_t mutex; /*global semaphore*/

初始化信号量,在这里初始值为1

1sem_init(&mutex, 0, 1);

最后使用信号量操作函数将临界区域代码包含起来

1for (i =0; i<niters; i++){
2    sem_wait(&mutex);
3    cnt++;
4    sem_post(&mutex);
5}

3 锁

3.1 死锁

首先看一下死锁的规范定义:

如果一个线程(进程)集合中的每一个线程(进程)都在等待只能由该线程(进程)集合中的其他线程(进程)才能引发的事件,那么该线程(进程)集合是死锁的。

举一个例子,如果线程 a 和线程 b 同是执行,线程a获取了资源r1,等待获取资源r2;而线程b获取了资源r2,等待获取资源r1。那么线程a和线程b组成的集合是死锁的。

预防死锁

  • 破坏占有等待条件

    对于需要获取多个资源的线程,一次性获取全部资源,而不是依次获取各个资源。

  • 破坏环路等待条件

    死锁集合的线程按照等占有线程和等待线程可以组成有向环图。那么如果对所有资源进行排序,所有线程按照资源顺序获取资源。

3.2 活锁

在某些情况下,当线程意识它不能获下一个资源的时候,它会“礼貌性”地释放已经获得的资源,然后等待1ms,在尝试一次。如果另一个线程也在相同的时候做了相同的操作, 那么同步的步调将导致两个线程都无法前进。

3.3 饥饿

在信号量小节中,当执行V操作后,将恢复挂起线程中的一个,那么问题出现了:如果有多个线程被挂起,那么选择哪个线程恢复呢?如果随机选择一个线程恢复,如果源源不断的线程到达临界区域并且挂起,那么很有可能出现某一个线程一直等待资源,而导致"饥饿"。当然也有好的FILO调度策略来解决调用问题。当时问题在于刚刚到达的线程有很好的局部性,也就是CPU的寄存器、缓存等包含了该线程的局部变量,如果程获得资源锁,很好的避免了线程上下文切换,对性能提高很有帮助。

go语言的互斥锁中采用结合上述两种策略,接下来小节中,将会仔细分析源码。

4 Golang sync 包

4.1 sync.mutex.go

4.1.1 接口和结构

包含了Locker接口和Mutex结构:

1type Locker interface {
2    Lock()
3    Unlock()
4}
5type Mutex struct {
6    state int32
7    sema  uint32
8}

Mutex实现了Locker接口,该结构包含了state的字段,用来表示该锁当前状态;sema则为一个信号量。state是一个32位的整数,不同比特位包含了不同的意义,其中源码中的有很详细的注释,该注释很好解释mutex如何工作:

互斥锁有两种状态:正常状态和饥饿状态。在正常状态下,所有挂起等待的goroutine按照FIFO顺序等待。唤醒的goroutine将会和刚刚到达的goroutine竞争互斥锁的拥有权,因为刚刚到达的goroutine具有优势:它刚刚正在CPU上执行,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。如果一个等待的goroutine超过1ms没有获取互斥锁,那么它将会把互斥锁转变为饥饿模式。在饥饿模式下,互斥锁的所有权将移交给等待队列中的第一个。新来的goroutine将不会尝试去获得互斥锁,也不会去尝试自旋操作,而是放在队列的最后一个。如果一个等待的goroutine获取的互斥锁,如何它满足一下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将互斥锁的转台转换为正常状态。正常状态有很好的性能表现,饥饿模式也是非常重要的的,因为它能阻止尾部延迟的现象。

1const (
2    mutexLocked = 1 << iota // mutex is locked
3    mutexWoken
4    mutexStarving
5    mutexWaiterShift = iota
6    starvationThresholdNs = 1e6
7)
  • mutexLocked

该值为1, 第一位比特位1,代表了该是否该互斥锁已经被锁住。mutex.state与它进行&操作,如果为1表示已经锁住,0则表示未被锁住。

  • mutexWoken

该值为2,第二位比特位1,代表了该互斥锁是否被唤醒,mutex.state与它进行&操作,如果为1表示已经被唤醒,0代表未被唤醒

  • mutexStarving

该值为4,第三位比特为1,代表了该互斥锁是否处于饥饿状态,mutex.state与它进行&操作,如果为1表示处于饥饿转态,0表示处于正常状态。

  • mutexWaiterShift

该值为3,表示mutex.state右移3位后为等待的goroutine的数量。

  • starvationThresholdNs

goroutine将互斥锁转换状态的时间等待的临界值:一百万纳秒,也就是1ms。

4.1.2 Lock 方法

1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2        if race.Enabled {
3            race.Acquire(unsafe.Pointer(m))
4        }
5        return
6}

CompareAndSwapInt32是一个原子操作,它判断是一个参数的值是否等于第二个参数,如果相等,则将第一个参数设置为第三个参数,并返回true;否则对一个参数不做任何操作并且返回false。这一段是代码是处理第一次goroutine进行尝试Lock操作,如果一切都是初始状态,则m.state.....0000001并且返回,进入临界区域代码,否则代码继续往下走。

1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state

首先定义了一下变量:goroutine等待时间,是否饥饿转台,是否唤醒和自旋迭代次数和保存当前互斥锁状态。接下来是一个for循环,只有退出循环才能进入临界区域代码,纵观代码只有两处使用break来退出循环。

 1for {
2    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5                awoke = true
6            }
7            runtime_doSpin()
8            iter++
9            old = m.state
10            continue
11        }
12}

首先判断锁状态是被锁而且不是处于饥饿模式,加上还能自旋额次数,进入下一层判断。如果当前goroutine没有被唤醒其他goroutine也没有被唤醒等待的goroutine超过1可以将m.state设置为唤醒转态四个条件同时满足,将awoke设置true`。然后进行自旋操作,进行一轮循环。

 1new := old
2if old&mutexStarving == 0 {
3            new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6        new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9            new |= mutexStarving
10}

这三个判断条件做了如下工作:如果当前的mutex.state处于正常模式,则将new的锁位设置为1,如果当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1。如果starving变量为true并且处于锁定状态,则new的饥饿状态位打开。

1if awoke {
2    if new&mutexWoken == 0 {
3                throw("sync: inconsistent mutex state")
4    }
5    new &^= mutexWoken
6}

如果 goroutine已经被唤醒,则清空new的唤醒位。

1if atomic.CompareAndSawpInt32(&m.state, old, new){
2    //...
3}else{
4    //...
5}

如果更新m.state成功

1if old&(mutexLocked|mutexStarving) == 0 {
2                break 
3}

如果未被锁定并且并不是出于饥饿状态,到达第一个break,进入代码临界区域。

1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3    waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)

runtime_SemacquireMutex(s *uint32, lifo bool)函数类似P操作,如果lifotrue则将等待goroutine插入到队列的前面。在这里,对于每一个到达的goroutine,如果CompareAndSawpInt32成功,并且到达时候如果锁出于锁定状态,那么将该goroutine插入到等待队列的最后,否则插入到最前面。此时goroutine将会被挂起,等待UnlockV操作,将唤醒goroutines

 1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5                    throw("sync: inconsistent mutex state")
6                }
7                delta := int32(mutexLocked - 1<<mutexWaiterShift)
8                if !starving |
| old>>mutexWaiterShift == 1 {
9                    delta -= mutexStarving
10                }
11                atomic.AddInt32(&m.state, delta)
12                break
13}
14

判断被唤醒的线程是否为达到饥饿状态,也就是等待时间超过1ms,如果之前的m.state不是饥饿状态,继续循环,给新到来goroutine让出互斥锁。如果已经饥饿状态,则修改等待goroutine数量和饥饿状态位,并返回进入临界代码区域。

4.1.3 Unlock方法

1new := atomic.AddInt32(&m.state, -mutexLocked)

首先创建变量new,该变量的锁位为0。接下来是饥饿状态判断

 1if new&mutexStarving == 0 {
2        old := new
3        for {
4                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5                return
6            }
7            new = (old - 1<<mutexWaiterShift) | mutexWoken
8            if atomic.CompareAndSwapInt32(&m.state, old, new) {
9                runtime_Semrelease(&m.sema, false)
10                return
11            }
12            old = m.state
13        }
14    } else {
15        runtime_Semrelease(&m.sema, true)
16    }

如果是正常状态,则判断如果等待的goroutine为零,或者已经被锁定、唤醒、或者已经变成饥饿状态,返回,不需要唤醒任何其他被挂起的goroutine,因为互斥锁已经被其他goroutine抢占。否则更新new值(修改等待的goroutine数量)并设置唤醒为,如果CompareAndSwapInt32成功,则通过runtime_Semrelease(&m.sema, false)恢复挂起的goroutine.r如果为 true表明将唤醒第一个阻塞的goroutine,这第一点在else饥饿的分支中体现。

4.2 sync.rwmutex.go

读写锁也是一种常见的锁机制,它允许多个线程读共享资源,只有一个线程写共享资源,接下来看看go中如何实现读写锁。

4.2.1 常量和结构

1type RWMutex struct {
2    w           Mutex  
3    writerSem   uint32 
4    readerSem   uint32 
5    readerCount int32  
6    readerWait  int32 
7}
8const rwmutexMaxReaders = 1 << 30

RWMutex结构包含了如下的字段

  • goroutine数量。

4.2.1 RLock和RUnlock方法

1func (rw *RWMutex) RLock() {
2    // [...]
3    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4        runtime_Semacquire(&rw.readerSem)
5    }
6//[...]
7}

首先是readerCount值+1, 如果小于零,则挂起goroutine等待readerSem。是不是很奇怪,为什么会小于零判断呢?在这里先卖一个关子,接下来会看到为什么是这样的设计逻辑。

 1func (rw *RWMutex) RUnlock() {
2    //[...]
3    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4        if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5            race.Enable()
6            throw("sync: RUnlock of unlocked RWMutex")
7        }
8        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9            runtime_Semrelease(&rw.writerSem, false)
10        }
11    }
12    //[...]
13}

首先将readerCount减去1,如果小于零,再讲readWait减去1,如果是离开读的goroutine数量为零,则对writerSem信号量进行V操作。

4.2.2 Lock和Unlock方法

1func (rw *RWMutex) Lock() {
2    //[...]
3    rw.w.Lock()
4    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6        runtime_Semacquire(&rw.writerSem)
7    }
8    //[...]
9}

首先rw.w.Lock操作,来防止其他goroutine对共享资源的写访问。然后将readerCount减去rwmutexMaxReaders,表明还剩多少goroutine可以进行读访问,这也解释在RLock中小于零的判断,如果还可以还可以进行读访问,则必须获得readerSem信号量。在Lock中接下来是对readWait判断,如果该数量不为零,则需要对writerSem进行P操作,而V操作只在RUnlock方法中,如果最后一个读goroutine离开,则进行V操作。

 1func (rw *RWMutex) Unlock() {
2    //[...]
3    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4    if r >= rwmutexMaxReaders {
5        race.Enable()
6        throw("sync: Unlock of unlocked RWMutex")
7    }
8    for i := 0; i < int(r); i++ {
9        runtime_Semrelease(&rw.readerSem, false)
10    }
11    //[...]
12}

首先恢复readCounter为正数,然后对readerSem信号量进行rV操作,唤醒在RLock中被挂起的goroutine

4.3 sync.waitgroup.go

WaitGroup通常用在等待一组goroutine全部完成。调用Add方法指明要等待的goroutine的数量,调用Done方法说明该goroutine已经完成,而Wait方法是阻塞等待的goroutine

4.3.1 数据结构

1type WaitGroup struct {
2    noCopy noCopy
3    state1 [12]byte
4    sema   uint32
5}

noCopy字段说明WaitGroup不允许拷贝,而state1字段是一个非常tricky的方法,用其中的8个字节(64bit)来保存一些状态。高位的32bit用来表示需要等待的goroutine的数量,地位的32bit用来表示被挂起的goroutine的数量。至于为什么不直接使用64bit的数据主要是为了考虑32为编译器无法保证64位对齐。sema则是一个信号量。

1func (wg *WaitGroup) state() *uint64 {
2    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3        return (*uint64)(unsafe.Pointer(&wg.state1))
4    } else {
5        return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6    }
7}

该方法是一个辅助方法,用来获取state,一个64为的无符号整数。

4.3.2 Add和Done方法

1func (wg *WaitGroup) Done() {
2    wg.Add(-1)
3}

Done方法其实调用了Add(-1)方法,所以我们着重讨论Add方法。

 1func (wg *WaitGroup) Add(delta int) {
2    statep := wg.state()
3    //[...]
4    state := atomic.AddUint64(statep, uint64(delta)<<32)
5    v := int32(state >> 32)
6    w := uint32(state)
7    if race.Enabled && delta > 0 && v == int32(delta) {
8            race.Read(unsafe.Pointer(&wg.sema))
9    }
10    if v < 0 {
11        panic("sync: negative WaitGroup counter")
12    }
13    if w != 0 && delta > 0 && v == int32(delta) {
14        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15    }
16    if v > 0 || w == 0 {
17        return
18    }
19    if *statep != state {
20        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21    }
22    // Reset waiters count to 0.
23    *statep = 0
24    for ; w != 0; w-- {
25        runtime_Semrelease(&wg.sema, false)
26    }
27}

首先是获取state,然后将delta右移32位,加上等待的goroutine数量。vw分别代表了需要等待的goroutine和被阻塞的goroutine的数量。接下来v==int32(delta)判断条件表明如果是第一次Add操作,则必须与等待的goroutine同步,在Wait方法中可以看到同样的操作。接下来是一些抛异常操作,如果等待的数量为负数,如何第一次Add操作没有同步。if >0 || w==0条件表明如何v没有降到零,或者被阻塞的goroutine数量为零,直接返回。如何v为零,则按照w的数量,依次对信号量ws.sema进行V操作。

4.3.3 Wait方法

 1func (wg *WaitGroup) Wait() {
2    //[...]
3    for {
4        state := atomic.LoadUint64(statep)
5        v := int32(state >> 32)
6        w := uint32(state)
7        //[...]
8        // Increment waiters count.
9        if atomic.CompareAndSwapUint64(statep, state, state+1) {
10            if race.Enabled && w == 0 {
11                race.Write(unsafe.Pointer(&wg.sema))
12            }
13            runtime_Semacquire(&wg.sema)
14            if *statep != 0 {
15                panic("sync: WaitGroup is reused before previous Wait has returned")
16            }
17            //[...]
18            return
19        }
20    }
21}

Wait方法同样也是CAS算法,首先获取需要等待的goroutine的数量v和阻塞的goroutine数量w, 然后将阻塞的goroutine数量+1,如果之前的w为零,表示是第一次等待,则与Add操作进行同步,最后后对信号量wg.sema进行P操作。

4.4 sync.cond.go

在编程中使用Cond也叫管程(monitor),它可以用来使不同线程完成互斥条件,也可以使某个线程等待某个条件的发生。常见的使用模式如下:

 1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7    cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal() 

为什么使用for循环作为判断进入Wait的条件而不是if呢?主要是防止为被唤醒的goroutine在返回Wait调用的时候,恰好有别的goroutine修改了conditon的值,所以需要使用for循环作为条件判断。

4.4.1 数据结构

1type Cond struct {
2    noCopy noCopy
3    L Locker
4    notify  notifyList
5    checker copyChecker
6}

Cond结构不允许拷贝,包含了Locker的接口字段,和一个notifyList的集合字段。

4.4.2 NewCond函数

1func NewCond(l Locker) *Cond {
2    return &Cond{L: l}
3}

实现Locker接口的类型都可以,一般为MutexRWMutex

4.4.3 Wait方法

1func (c *Cond) Wait() {
2    c.checker.check()
3    t := runtime_notifyListAdd(&c.notify)
4    c.L.Unlock()
5    runtime_notifyListWait(&c.notify, t)
6    c.L.Lock()
7}

在使用Wait方法之前,要调用c.L.Lock来进入临界区域,将当前等待的goroutine加入到通知队列中,然后调用c.L.Unlock()来退出临界区域,以便让其他goroutine可以进入等待区域。紧接着挂起goroutine,等待消息。

4.4.4 Singal方法

1func (c *Cond) Signal() {
2    c.checker.check()
3    runtime_notifyListNotifyOne(&c.notify)}

runtime_notifyListNotifyOne唤起其中的等待的goroutine

4.4.5 Broadcast方法

1func (c *Cond) Broadcast() {
2    c.checker.check()
3    runtime_notifyListNotifyAll(&c.notify)
4}

runtime_notifyListNotifyAll唤起全部等待的goroutine



有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mb5fdb0a1b25659

查看原文:信号量,锁和 golang 相关源码分析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

283 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传