熟悉Go编程的朋友都不会对sync
库感到陌生,这个语言内置库提供了很多常见的处理并发编程的工具,今天就从最为小众,使用最少的sync.Cond
库说起。
Cond是什么?
按照文档的描述:
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
大概意思就是Cond
实现了一种条件变量,用于描述事件(event)的发生或者作为goroutine的交汇点。
听起来似乎有点抽象,下面我们使用实际代码来说明。
现在引入一个实际问题,如何当一个变量为指定值时做一些特定的行为?比如,当同时登陆用户大于100时,做一些额外的动作。
首先,我们可以在变量产生变化的时候主动做检查,看是否触发了条件(这种方法固然可行,但是不在今天的考虑范畴)。其次,我们可以通过轮询的方式,来检查值是否达到了指定的条件,这种方式的好处在于,处理变量变化的动作与额外触发的行为可以完全分隔开,有利于代码解耦。
一个简单的轮询逻辑很容易实现:
for {
mu.RLock()
func() {
defer mu.RUnlock()
if len(users) >= 100 {
funcDoSomething()
}
}()
}
不出意外的,这段代码在非抢占调度下会占用一整个处理器的资源(因为是一个无限死循环且没有syscall去主动让出处理器控制器)。
一个最容易想到的简单优化是,在for循环中加上一个sleep
时间,这样对于cpu的占用就会低很多。
更理想的行为是,当且仅当条件发生变化的时候(这里的条件是users长度),才做后续的工作。
使用Cond
在代码实现上大概是:
cond.L.Lock()
for len(users) < 100 {
cond.Wait()
}
funcDoSomething()
cond.L.Unlock()
// in some other goroutine:
cond.L.Lock()
users = append(users, newUser)
cond.L.Unlock()
cond.Signal()
不难发现,其实上面的代码可以通过channel
做简单实现:
select{
case <-ch:
if len(users)>=100{
funcDoSomething()
}
default:
}
// in some other goroutine:
mu.Lock()
users = append(users, newUser)
mu.Unlock()
ch<-struct{}{}
上面两段代码在逻辑上其实是等价的。这么看来,Cond
并没有解决什么复杂问题嘛,怪不得大家都不关注它。其实Cond
主要有用的并不是Signal
模式,而是Broadcast
模式。
其实在API的使用上面,Signal
与Broadcast
并无不同。两者主要解决的场景差异在于:Signal
会唤醒任意一个处于Wait
状态的goroutine;而Broadcast
则会唤醒所有处于Wait
状态的goroutine。我们在使用channel做类似模式的时候,很容易可以通过一个send操作来模仿Signal
,而Broadcast
就不那么容易(直接close channel是一个方案,问题在于close动作只能发生一次,而Broadcast
则可以发生多次)。
用channel实现Cond
模式(约束在于Broadcast
只能实现一次):
type Cond struct {
L sync.Mutex // used by caller
ch chan struct{}
}
func (c *Cond) Wait() {
ch := c.ch
c.L.Unlock()
<-ch
c.L.Lock()
}
func (c *Cond) Signal() {
select {
case c.ch <- struct{}{}:
default:
}
}
func (c *Cond) Broadcast() {
close(c.ch)
c.ch = make(chan struct{})
}
Cond
的实际实现是:
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
有疑问加站长微信联系(非本文作者)