源码级剖析Go标准库中的sync.RWMutex。
概述
RWMutex,读写锁,又称“读写互斥锁”。
读写锁简单来说就是可以由任意数量的读者同时使用,或者只由一个写者使用的锁。
读写锁和互斥量(Mutex
)类似,但是比起互斥量有着更高的并行性,它允许多个读者同时读取,因此有一些特殊的应用场景。
在并发编程的很多场景下,数据的读取可能比写入更加频繁,这时就要允许多个线程同时读取一块内容。
用例
Go中,RWMutex的零值是一个未加锁的互斥量。
RWMutex使用起来相对比较简单,这里举一个简单的例子:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rw := new(sync.RWMutex)
for i := 0; i < 2; i++ { // 建立两个写者
go func() {
for j := 0; j < 3; j++ {
rw.Lock()
// 写
rw.Unlock()
}
}()
}
for i := 0; i < 5; i++ { // 建立两个读者
go func() {
for j := 0; j < 3; j++ {
rw.RLock()
// 读
rw.RUnlock()
}
}()
}
time.Sleep(time.Second)
fmt.Println("Done")
}
一个(神奇)优秀的(大坑)特性
读者在读的时候,不能够假定别的读者也能够获得锁。因此,禁止读锁嵌套。
是不是有点儿绕?下面举个“七秒例”:?
- 第一秒:读者1在第1秒成功申请了读锁
- 第二秒:写者1在第2秒申请写锁,申请失败,阻塞,但它会防止新的读者获锁
- 第三秒:读者2在第3秒申请读锁,申请失败
- 第四秒:读者1释放读锁,写者1获得写锁
- 第五秒:写者1释放写锁,读者2获得读锁
- 第六秒:读者1再次申请读锁,申请成功,与读者2共享
- 第七秒:读者1、读者2释放读锁,结束
当写锁阻塞时,新的读锁是无法申请的,这可以有效防止写者饥饿。如果一个线程因为某种原因,导致得不到CPU运行时间,这种状态被称之为 饥饿。
然而,这种机制也禁止了读锁嵌套。读锁嵌套可能造成死锁:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rw := new(sync.RWMutex)
var deadLockCase time.Duration = 1
go func() {
time.Sleep(time.Second * deadLockCase)
fmt.Println("Writer Try")
rw.Lock()
fmt.Println("Writer Fetch")
time.Sleep(time.Second * 1)
fmt.Println("Writer Release")
rw.Unlock()
}()
fmt.Println("Reader 1 Try")
rw.RLock()
fmt.Println("Reader 1 Fetch")
time.Sleep(time.Second * 2)
fmt.Println("Reader 2 Try")
rw.RLock()
fmt.Println("Reader 2 Fetch")
time.Sleep(time.Second * 2)
fmt.Println("Reader 1 Release")
rw.RUnlock()
time.Sleep(time.Second * 1)
fmt.Println("Reader 2 Release")
rw.RUnlock()
time.Sleep(time.Second * 2)
fmt.Println("Done")
}
读者1和读者2是嵌套关系,按照这种时间安排,上述程序会导致死锁。
而有些死锁的可怕之处就在于,它不一定会发生。假设上面程序中的time.Sleep都是随机的时间,那么这一段代码每次的结果有可能不一致,这会给Debug带来极大的困难。
吾闻读锁莫嵌套,写锁嵌套长已矣。(读锁嵌套了还有概率成功,写锁嵌套了100%完蛋?)
源码剖析
(源码具体内容、行数,以版本go version 1.8.1
为例。)
为了方便理解,可以把所有的if race.Enabled {...}
扔掉不看。接下来,我们重述“七秒例”。?
第一秒,读者1请求读锁。
Line41:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
读者数量readerCount
开始是0,这个时候加1,变成了1,不符合判负条件所以跳出,成功获得读锁一枚。
第二秒,写者尝试获取写锁。第85行获取w的锁。不管这个读写锁有没有获取成功,先排斥别的写者。
Line85:
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
刚才说了,一个写者阻塞在这里的时候,也不会让新的读者去读了,所以它干了一件非常坏的事情:
把readerCount变成了1-rwmutexMaxReaders。
这样就能卡住新来的读者了。
接下来,算出r等于1。这意味着有当前有写者存在。
因为有读者,所以写者卡在了信号量writerSem
上。但是它不甘心啊,心想“等完现在的这几个读者,我就要去写!”,它默默地把现在占有读锁的人记在了小本本rw.readerWait上。在本例子中,readerWait被设置为了1。
第三秒,读者2尝试获得读锁,它又来到了第41行,结果发现读者的数量是1-rwmutexMaxReaders,好吧,它只好卡在信号量readerSem
上。
第四秒,读者1调用RUnlock(),它首先把读者数量减一,毕竟自己已经不读了。
Line61:
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem)
}
}
在读者数量减一的时候,它发现读者数量是负数,这回读者1明白了,有一个写者在等待写。估计读者1自己已经在这个写者的小本本readerWait上了,因此它把readerWait减一,表示自己不读了。这时候读者1发现自己就是最后一个读者了,所以赶紧祭出writerSem,让写者可以去写。
读者1释放了writerSem信号量以后,写者很快就收到了这个提醒,兴高采烈地获得了写锁,开始自己的写作生涯。
读者2还卡着呢…
第五秒,写者1写完了一稿便不想写了,调用Unlock()准备释放读锁。
Line114:
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem)
}
只见他重新为readerCount加上rwmutexMaxReaders,使他重新变为了正数。这个正数恰好也是阻塞的读者的数量。
接下来,写者按照这个读者的数量,释放了这么多的readerSem信号量,相当于将所有阻塞的读者一一唤醒。读者2在收到readerSem的那一刻喜极而泣,它终于可以读了。
第六秒,读者1又来了,它把读者数量加1,发现它是正数哎,写者现在又没来,它再次幸运地瞬间获得读锁,与读者2一起读了起来。
第七秒,读者1和读者2都释放了自己的读锁。至此,结束。
名词解释
中文 | 英文 | 解释 |
---|---|---|
信号量 (也称信号灯) | Semaphore | |
条件变量 | Condition | |
互斥量 | Mutex |