深入理解 sync.RWMutex:解决读者-写者问题

Alex-liutao · · 8612 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

在某个数据需要被多个线程共享访问的时候,会出现读者-写者问题(这里的「问题」是复数形式的,因为读者-写者问题有多个变种)。访问共享数据的线程有两种类型:读者和写者。读者只会读取数据,而写者则是修改它。当写者拥有了访问数据的权限后,其它的线程(不管是读者还是写者)都不能访问这个数据。这种约束的需求在现实中是存在的,比如:当写者不能原子性地修改某个数据(例如数据库)时,在修改完成之前,要读取这个数据的读者要被阻塞,以免读者获取到损坏的数据(脏数据)。对于读者-写者问题的核心,还有很多修订的限制,比如: - 写者不能饿死(无限地等待执行的机会) - 读者不能饿死 - 所有线程都不能饿死 像 [sync.RWMutex](https://golang.org/pkg/sync/#RWMutex) 这种多读者-单写者互斥锁的实现,可以解决其中的一些读者-写者问题。我们现在来看看要怎么用 Go 解决这些问题,并了解这种解决方案能给到怎样的保证。 作为彩蛋,我们还会了解到怎么对互斥体(mutex)进行性能分析。 ## 用法 在我们深入到代码实现细节之前,我们先来实战演练一下 `sync.RWMutex` 的用法。下面的程序使用读写锁来保护临界区 —— `sleep()`。为了展示整个过程,临界区还添加了一些代码来记录当前有多少读者和写者正在临界区里面。([完整源代码](https://play.golang.org/p/xoiqW0RQQE9)) ```go package main import ( "fmt" "math/rand" "strings" "sync" "time" ) func init() { rand.Seed(time.Now().Unix()) } func sleep() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.RLock() c <- 1 sleep() c <- -1 m.RUnlock() wg.Done() } func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.Lock() c <- 1 sleep() c <- -1 m.Unlock() wg.Done() } func main() { var m sync.RWMutex var rs, ws int rsCh := make(chan int) wsCh := make(chan int) go func() { for { select { case n := <-rsCh: rs += n case n := <-wsCh: ws += n } fmt.Printf("%s%s\n", strings.Repeat("R", rs), strings.Repeat("W", ws)) } }() wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go reader(rsCh, &m, &wg) } for i := 0; i < 3; i++ { wg.Add(1) go writer(wsCh, &m, &wg) } wg.Wait() } ``` > 在 play.golang.org 网站上运行的程序,它们的执行环境是固定的,(比如时间的初始值,time.Now().Unix() 每次都会返回一个固定的值),所以 `rand.Seed(time.Now().Unix())` 会产生相同的随机数种子,从而导致程序输出每次都一样。你可以每次都手动给定一个不一样的随机数种子,或者可以在你的机器上运行上述代码。 示例输出: ``` W R RR RRR RRRR RRRRR RRRR RRR RRRR RRR RR R W R RR RRR RRRR RRR RR R W ``` 当在临界区里面的 go routine 数量发生变化时,程序就会换行。这些输出可以体现 `RWMutex` “要么允许多个读者访问,要么允许一个写者访问”的特性。 还有一个重点是,一旦有一个写者调用 `Lock()` 后 ,后续试图访问临界区的读者将会被阻塞,如果当前已经有读者正在临界区内,则写者会等待这些读者离开临界区后,再锁定临界区。这个特性体现在程序的输出上,就是每次 W 出现的前几行, R 的数量都是逐个减少的。 ``` ... RRRRR RRRR RRR RR R W ... ``` 当写者执行完毕后,前面被阻塞的读者就可以恢复运行,并且新的写者也可以开始尝试进入临界区了。值得一提的是,如果一个写者刚刚执行完毕,而现在同时有读者和写者等待进入临界区的话,那么永远都是在等待的读者们先等到进入临界区的机会,下一个才到写者。这样一来,读者和写者都不会被饿死了。 ## 代码实现 > 注意作者编写本文时分析的代码版本([718d6c58](https://github.com/golang/go/blob/718d6c5880fe3507b1d224789b29bc2410fc9da5/src/sync/rwmutex.go))与最新的版本相比可能会有差异。 `RWMutex` 为读者暴露了两个方法(`RLock` 和 `RUnlock`),同时专门为写者也暴露了两个方法( `Lock` 和 `Unlock` )。 ### RLock 为了代码简洁起见,我们跳过那些跟竞态检测器相关的代码(用 `...` 表示)。 ```go func (rw *RWMutex) RLock() { ... if atomic.AddInt32(&rw.readerCount, 1) < 0 { runtime_SemacquireMutex(&rw.readerSem, false) } ... } ``` `readerCount` 字段的类型是 `int32` ,它表示当前启用的读者——包括了所有正在临界区里面的读者或者被写者阻塞的等待进入临界区读者的数量。相当于是当前调用了 `RLock` 函数并且还没调用 `RUnLock` 函数的读者的数量。 [atomic.AddInt32](https://golang.org/pkg/sync/atomic/#AddInt32) 是下面代码的原子版本: ```go *addr += delta return *addr ``` 其中 `addr` 是 `*int32` 类型的而 `delta` 是 `int32` 类型的。由于这是个原子性的操作,所以增加 `delta` 不会有干扰到其它线程的风险。(更多关于 Fetch-and-add 的资料 [详见这里](https://en.wikipedia.org/wiki/Fetch-and-add)) > 如果我们完全没有用到写者的话,`readerCount` 会一直大于或等于 0 (译注:后面会讲到,一旦有写者调用 `Lock` ,`Lock`函数就会把 `readerCount` 设置为负数),并且读者获取锁的过程会走较快的非阻塞的分支,因为这时候读者获取锁的过程只涉及到 `atomic.AddInt32` 的调用。 ### 信号量(semaphore) 这是一个由 Edsger Dijkstra 提出的数据结构,解决很多关于同步的问题时,它都很好用。它是一个提供了两种操作的整数: - 获取(acquire,又称 wait、decrement 或者 P) - 释放(release,又称 signal、increment 或者 V) 获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。 释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。 Go 语言的运行时提供了 `runtime_SemacquireMutex` 和 `runtime_Semrelease` 函数,像 `sync.RWMutex` 这些对象的实现会用到这两个函数。 ### Lock 方法 ```go func (rw *RWMutex) Lock() { ... rw.w.Lock() // 通过把 rw.readerCount 设置成负数,来告知读者当前有写者正在等待进入临界区 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false) } ... } ``` `Lock` 方法让写者可以获得对共享数据的**独占**访问权: 首先它会获取一个叫 `w` 的互斥量(mutex),这会使得其它的写者无法访问这个共享数据,这个`w` 只有在 `Unlock` 函数快结束的时候,才会被解锁,**从而保证一次最多只能有一个写者进入临界区。** 然后 `Lock` 方法会把 `readerCount` 的值设置成负数,(通过把`readerCount` 减掉 `rwmutexMaxReaders`(即`1 << 30`))。然后接下来任何读者调用 `RLock` 函数时,都会被阻塞掉了: ```go if atomic.AddInt32(&rw.readerCount, 1) < 0 { // rw.readerCount 是负数,说明有写者正在等待进入临界区或者正在临界区内,等待写者执行完成 runtime_SemacquireMutex(&rw.readerSem, false) } ``` 后续来到临界区的读者们将会被阻塞,那正在运行的读者们会怎样呢?`readerWait` 字段就是用来记录当前有多少读者正在运行。写者阻塞在信号量 `rw.writerSem` 里,直到最后一个正在运行的读者执行完毕,它调用的 `RUnlock` 方法会把 `rw.writerSem` 信号量加一(我后面会讲到),这时写者才能被唤醒、进入临界区。 如果没有正在运行的读者,那么写者就可以直接进入临界区了。 ### rwmutexMaxReaders (译注:原文大量使用的 **pending** 这个词常常被翻译为「挂起」(有暂停的语义),但是在本文中,pending 表示的是「等待进入临界区(这时是线程是暂停的)或者正在临界区里面(这时是线程正在运行的)」这个状态。「挂起」不能很好的表达该语义,所以 pending 保留原文不翻译,但读者要注意 pending 在本文的语义,**例如:「一个 pending 的读者」可以理解为是一个调用了 `RLock` 函数但是还没调用 `RUnlock` 函数的读者。「一个 pending 的写者」则相应地表示一个调用了`Lock` 函数但是还没调用 `Unlock` 函数的写者**) 在 [rwmutex.go](https://github.com/golang/go/blob/718d6c5880fe3507b1d224789b29bc2410fc9da5/src/sync/rwmutex.go) 里面有一个常量: ```go const rwmutexMaxReaders = 1 << 30 ``` 这个 `1 << 30` 是什么意思、做什么用的呢? `readerCount` 字段是 [int32](https://golang.org/pkg/builtin/#int32) 类型的,它的有效范围是: ``` [-1 << 31, (1 << 31) - 1] 或者说 [-2147483648, 2147483647] ``` RWMutex 使用这个字段来记录当前 pending 的读者数,并且这个字段还标记着当前是否有写者在 pending 状态。在 `Lock` 方法里面: ```go r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders ``` `readerCount` 字段被减掉了 `1<<30`。当 `readerCount` 的值为负数时,说明当前存在 pending 状态的写者。而 `readerCount` 再加回 `1<<30`,又能表示当前 pending 的读者的数量。最后,`rwmutexMaxReaders` 还限制了 pending 读者的数量。如果我们的当前 pending 的读者数量比 `rwmutexMaxReaders` 还要多的话,那么 `readerCount` 减去 `rwmutexMaxReaders` 就不是负数了,这样整个机制都会被破坏掉。从中我们可以知道,pending 的读者数量不能大于 `rwmutexMaxReaders - 1` ,它的值超过了 10 亿——1073741823。 ### RUnlock ```go func (rw *RWMutex) RUnlock() { ... if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false) } } ... } ``` 这个方法会把 `readerCount` 减一 (之前是 `RLock` 方法把这个值增加了的),如果 `readerCount` 是负数,意味着当前存在 pending 状态的写者,因为正如上面所说的,在写者调用 `Lock` 方法的时候,`readerCount` 的值会减掉 `rwmutexMaxReaders`,从而使 `readerCount` 变成负数。 然后这个方法会检查当前正在临界区里面的读者数是不是已经是 0 了,如果是的话,意味着等待进入临界区的写者可以获取到 `rw.writerSem` 信号量、进入临界区了。 ### Unlock ```go func (rw *RWMutex) Unlock() { ... r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false) } rw.w.Unlock() ... } ``` 要解锁写者拥有的写锁,首先 `readerCount` 的值要增加 `rwmutexMaxReaders`,这个操作会使得 `readerCount` 恢复成非负数,如果这时候 `readerCount` 大于 0,这意味着当前有读者在等待着写者离开临界区。最后写者释放掉它拥有的 `w` 这个互斥量(译注:上文说过,这个互斥量是写者用来防止**其它写者**进入临界区的),这使得其它写者能够有机会再次锁定 `w` 这个互斥量。 如果读者或写者尝试在一个已经解锁的 RWMutex 上调用`Unlock` 和 `RUnlock` 方法会抛出错误([代码](https://play.golang.org/p/YMdFET74olU)): ```go m := sync.RWMutex{} m.Unlock() ``` 输出: ``` fatal error: sync: Unlock of unlocked RWMutex ... ``` ### 递归地读锁定 文档里面写道: 如果一个 goroutine 拥有一个读锁,而另外一个 goroutine 又调用了 `Lock` 函数,那么在第一个读锁被释放之前,没有读者可以获得读锁。这尤其限制了我们不能递归地获取读锁,因为只有这样才能确保锁都能变得可用,一个 `Lock` 的调用会阻止新的读者获取到读锁。(上文已经多次提到这一点了) 因为 RWMutex 就是这么实现的:如果当前有一个 pending 的写者,那么所有尝试调用 `RLock` 的读者都会被阻塞,即使在这之前已经有读者获取到了读锁([源代码](https://play.golang.org/p/XNndlaZ6Ema)): ```go package main import ( "fmt" "sync" "time" ) var m sync.RWMutex func f(n int) int { if n < 1 { return 0 } fmt.Println("RLock") m.RLock() defer func() { fmt.Println("RUnlock") m.RUnlock() }() time.Sleep(100 * time.Millisecond) return f(n-1) + n } func main() { done := make(chan int) go func() { time.Sleep(200 * time.Millisecond) fmt.Println("Lock") m.Lock() fmt.Println("Unlock") m.Unlock() done <- 1 }() f(4) <-done } ``` 输出: ```go RLock RLock RLock Lock RLock fatal error: all goroutines are asleep - deadlock! ``` (译注:上面的代码有两个 goroutine,一个是写者 routine,一个是主 goroutine(也是读者),通过程序的输出可以知道:前三行都是输出 RLock,表示这时候已经有 3 个读者获取到了读锁。后面接着输出了 Lock, 表示这时候写者开始请求写锁,后面接着输出一个 RLock,表示这时又多了一个读者请求读锁。因为 pending 的写者会阻塞掉后续调用 `RLock` 的读者,所以最后一个 RLock 的调用堵塞了主 routine,而写者的 routine 也在堵塞等待前面三个读者释放它们的读锁,所以两个 goroutine 都堵塞了,因此程序报错:`fatal error: all goroutines are asleep - deadlock!`) ### 锁的拷贝 `go tool vet` 可以检测到是否有锁被按值拷贝了,因为这种情况会导致死锁,具体的情况可以看之前的一篇文章:[Detect locks passed by value in Go](https://medium.com/golangspec/detect-locks-passed-by-value-in-go-efb4ac9a3f2b) (译注:[GCTT 译文:检测 Go 程序中按值传递的 locks](https://studygolang.com/articles/14479) ### 性能 之前有人提出:随着 CPU 核心数量的增加,RWMutex 的性能会降低,详见:https://github.com/golang/go/issues/17973 ### 锁的争用 Go 1.8 版本开始支持分析 mutex 的争用情况(译注:原文 Contention,参考[维基百科](https://en.wikipedia.org/wiki/Lock_(computer_science)#Granularity))([patch 补丁](https://github.com/golang/go/commit/ca922b6d363b6ca47822188dcbc5b92d912c7a4b)),我们来看看它是怎么用的: ```go import ( "net/http" _ "net/http/pprof" "runtime" "sync" "time" ) func main() { var mu sync.Mutex runtime.SetMutexProfileFraction(5) for i := 0; i < 10; i++ { go func() { for { mu.Lock() time.Sleep(100 * time.Millisecond) mu.Unlock() } }() } http.ListenAndServe(":8888", nil) } ``` ```bash > go build mutexcontention.go > ./mutexcontention ``` 当程序 mutexcontention 运行时: ``` > go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1 Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1 Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz File: mutexcontention Type: delay Entering interactive mode (type "help" for commands, "o" for options) (pprof) list main Total: 57.28s ROUTINE ======================== main.main.func1 in /Users/mlowicki/projects/golang/src/github.com/mlowicki/mutexcontention/mutexcontention.go 0 57.28s (flat, cum) 100% of Total . . 14: for i := 0; i < 10; i++ { . . 15: go func() { . . 16: for { . . 17: mu.Lock() . . 18: time.Sleep(100 * time.Millisecond) . 57.28s 19: mu.Unlock() . . 20: } . . 21: }() . . 22: } . . 23: . . 24: http.ListenAndServe(":8888", nil) ``` 上面的 57.28s 是什么,它为什么挨着 `mu.Unlock()` 呢? 当 goroutine 因为调用 `Lock` 方法而被阻塞的时候,这个时间点会被记录下来——aquiretime(获取时间)。当其他 goroutine 解锁了这个锁,并且起码有一个 goroutine 在等待获取这个锁的时候。其中一个 goroutine 可以获取到这个锁,这时他会自动调用 `mutexevent` 函数。函数 `mutexevent` 根据 `SetMutexProfileFraction` 函数设定的比率,来确定是否应该保存或忽略掉该事件。这种事件都包含了等待时间(当前时间 - 获取时间)。上述的代码中,所有阻塞在这个锁的 goroutine 的总等待时间会被收集和显示出来, 对于读锁(Rlock 和 `RUnlock`)争用的分析功能,将会在 Go 1.11 版本加入 ([patch 补丁](https://github.com/golang/go/commit/88ba64582703cea0d66a098730215554537572de))

via: https://medium.com/golangspec/sync-rwmutex-ca6c6c3208a0

作者:Michał Łowicki  译者:Alex-liutao  校对:polaris1119

本文由 GCTT 原创编译,Go语言中文网 荣誉推出


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

8612 次点击  ∙  2 赞  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传