Go语言提供了channel和sync包两种并发控制的方法,每种方法都有他们适用的场景,并不是所有并发场景都适合应用channel的,有的时候用sync包里提供的同步原语更简单。今天这个话题纯属是为了通过用channel实现同步锁的功能来学习掌握channel拥有的强大能力,并不适合在实际中使用。而且面试中有时候就是会出一些奇奇怪怪的题考应聘者对知识的理解以及灵活运用的应变能力。
大家仔细看看文章里用channel实现几种常用的同步锁的思路,没准儿哪次面试就碰上这样的面试官了呢。
今天,我将深入探讨Go
语言channel
和select
语句的表达能力。为了演示只用这两个原语就可以实现多少功能,我将从头开始用它们重写sync
包。
sync
包提供的同步原语的有哪些以及如何使用我们已经在之前的文章里介绍过了,所以这里不会再去介绍用channel
实现的这些同步原语应该怎么用。如果对用法有疑问请回看之前的文章: Go语言sync包的应用详解。
Once
once是一个简单而强大的原语,可确保在并行程序中一个函数仅执行一次。
channel
版的Once
我们使用带有一个缓冲的通道来实现
第一次调用Do(func ())
的goroutine
从通道中接收到值后,后续的goroutine
将会被阻塞中,直到Do
的参数函数执行完成后关闭通道为止。其他goroutine
判断通道已关闭后将不执行任何操作并立即返回。
type Once chan struct{}
func NewOnce() Once {
o := make(Once, 1)
// 只允许一个goroutine接收,其他goroutine会被阻塞住
o <- struct{}{}
return o
}
func (o Once) Do(f func()) {
_, ok := <-o
if !ok {
// Channel已经被关闭
// 证明f已经被执行过了,直接return.
return
}
// 调用f, 因为channel中只有一个值
// 所以只有一个goroutine会到达这里
f()
// 关闭通道,这将释放所有在等待的
// 以及未来会调用Do方法的goroutine
close(o)
}
复制代码
Mutex
大小为N的信号量最多允许N个goroutine
在任何给定时间保持其锁。互斥锁是大小为1的信号量的特例。
信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程直至该semaphore对象变成signaled状态才能等待成功。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.
我们先用channel
实现信号量的功能
type Semaphore chan struct{}
func NewSemaphore(size int) Semaphore {
return make(Semaphore, size)
}
func (s Semaphore) Lock() {
// 只有在s还有空间的时候才能发送成功
s <- struct{}{}
}
func (s Semaphore) Unlock() {
// 为其他信号量腾出空间
<-s
}
复制代码
上面也说了互斥锁是大小为1的信号量的特例。那么在刚才实现的信号量的基础上实现互斥锁只需要:
type Mutex Semaphore
func NewMutex() Mutex {
return Mutex(NewSemaphore(1))
}
复制代码
RWMutex
RWMutex
是一个稍微复杂的原语:它允许任意数量的并发读锁,但在任何给定时间仅允许一个写锁。还可以保证,如果有线程持有写锁,则任何线程都不能持有或获得读锁。
sync
标准库里的RWMutex
还允许如果有线程尝试获取写锁,则其他读锁将排队等待,以避免饿死尝试获取写锁的线程。为了简洁起见,在用channel
实现的RWMutex
里我们忽略了这部分逻辑。
RWMutex
具有三种状态:空闲,存在写锁和存在读锁。这意味着我们需要两个通道分别标记RWMutex
上的读锁和写锁:空闲时,两个通道都为空;当获取到写锁时,标记写锁的通道里将被写入一下空结构体;当获取到读锁时,我们向两个通道中都写入一个值(避免写锁能够向标记写锁的通道发送值),其中标记读锁的通道里的值代表当前RWMutex
拥有的读锁的数量,读锁释放的时候除了更新通道里存的读锁数量值,也会抽空写锁通道。
type RWMutex struct {
write chan struct{}
readers chan int
}
func NewLock() RWMutex {
return RWMutex{
// 用来做一个普通的互斥锁
write: make(chan struct{}, 1),
// 用来保护读锁的数量,获取读锁时通过接受通道里的值确保
// 其他goroutine不会在同一时间更改读锁的数量。
readers: make(chan int, 1),
}
}
func (l RWMutex) Lock() { l.write <- struct{}{} }
func (l RWMutex) Unlock() { <-l.write }
func (l RWMutex) RLock() {
// 统计当前读锁的数量,默认为0
var rs int
select {
case l.write <- struct{}{}:
// 如果write通道能发送成功,证明现在没有读锁
// 向write通道发送一个值,防止出现并发的读-写
case rs = <-l.readers:
// 能从通道里接收到值,证明RWMutex上已经有读锁了,下面会更新读锁数量
}
// 如果执行了l.write <- struct{}{}, rs的值会是0
rs++
// 更新RWMutex读锁数量
l.readers <- rs
}
func (l RWMutex) RUnlock() {
// 读出读锁数量然后减一
rs := <-l.readers
rs--
// 如果释放后读锁的数量变为0了,抽空write通道,让write通道变为可用
if rs == 0 {
<-l.write
return
}
// 如果释放后读锁的数量减一后不是0,把新的读锁数量发送给readers通道
l.readers <- rs
}
复制代码
WaitGroup
WaitGroup
最常见的用途是创建一个组,向其计数器中设置一个计数,生成与该计数一样多的goroutine
,然后等待它们完成。每次goroutine
运行完毕后,它将在组上调用Done
表示已完成工作。可以通过调用WaitGroup
的Done
方法或以负数调用Add
方法减少计数器的计数。当计数器达到0时,被Wait
方法阻塞住的主线程会恢复执行。
WaitGroup
一个鲜为人知的功能是在计数器达到0后,如果调用Add
方法让计数器变为正数,这将使WaitGroup
重回阻塞状态。 这意味着对于每个给定的WaitGroup
,都有一点"世代"的意味:
- 当计数器从0移到正数时开始"世代"。
- 当计数器重回到0时,
WaitGroup
的一个世代结束。 - 当一个世代结束时,被该世代的所阻塞住的线程将恢复执行。
下面是用channel
实现的WaitGroup
同步原语,真正起到阻塞goroutine
作用的是世代里的wait
通道,然后通过用WaitGroup
通道包装generation
结构体实现WaitGroup
的Wait
和Add
等功能。用文字很难描述清楚还是直接看下面的代码吧,代码里的注释会帮助理解实现原理。
type generation struct {
// 用于让等待者阻塞住的通道
// 这个通道永远不会用于发送,只用于接收和close。
wait chan struct{}
// 计数器,标记需要等待执行完成的job数量
n int
}
func newGeneration() generation {
return generation{ wait: make(chan struct{}) }
}
func (g generation) end() {
// close通道将释放因为接受通道而阻塞住的goroutine
close(g.wait)
}
//这里我们使用一个通道来保护当前的generation。
//它基本上是WaitGroup状态的互斥量。
type WaitGroup chan generation
func NewWaitGroup() WaitGroup {
wg := make(WaitGroup, 1)
g := newGeneration()
// 在一个新的WaitGroup上Wait, 因为计数器是0,会立即返回不会阻塞住线程
// 它表现跟当前世代已经结束了一样, 所以这里先把世代里的wait通道close掉
// 防止刚创建WaitGroup时调用Wait函数会阻塞线程
g.end()
wg <- g
return wg
}
func (wg WaitGroup) Add(delta int) {
// 获取当前的世代
g := <-wg
if g.n == 0 {
// 计数器是0,创建一个新的世代
g = newGeneration()
}
g.n += delta
if g.n < 0 {
// 跟sync库里的WaitGroup一样,不允许计数器为负数
panic("negative WaitGroup count")
}
if g.n == 0 {
// 计数器回到0了,关闭wait通道,被WaitGroup的Wait方法
// 阻塞住的线程会被释放出来继续往下执行
g.end()
}
// 将更新后的世代发送回WaitGroup通道
wg <- g
}
func (wg WaitGroup) Done() { wg.Add(-1) }
func (wg WaitGroup) Wait() {
// 获取当前的世代
g := <-wg
// 保存一个世代里wait通道的引用
wait := g.wait
// 将世代写回WaitGroup通道
wg <- g
// 接收世代里的wait通道
// 因为wait通道里没有值,会把调用Wait方法的goroutine阻塞住
// 直到WaitGroup的计数器回到0,wait通道被close后才会解除阻塞
<-wait
}
复制代码
总结
今天这篇文章用通道实现了Go语言sync
包里常用的几种同步锁,主要的目的是演示通道和select
语句结合后强大的表达能力,并没有什么实际应用价值,大家也不要在实际开发中使用这里实现的同步锁。
有关通道和同步锁都适合解决什么种类的问题我们后面的文章再细说,今天这篇文章,需要充分理解Go语言通道的行为才能理解文章里的代码,如果有哪里看不懂的可以留言,只要时间允许我都会回答。
如果还不了解sync
包里的同步锁的使用方法,请先看这篇文章 Go语言sync包的应用详解。后面的文章我会介绍并发编程里的数据竞争问题以及解决方法,以及考虑给大家留一道思考题,请大家关注公众号里的动态。
有疑问加站长微信联系(非本文作者)