本篇来学习一下sync.Cond的用法
使用场景:
我需要完成一项任务,但是这项任务需要满足一定条件才可以执行,否则我就等着。
那我可以怎么获取这个条件呢?一种是循环去获取,一种是条件满足的时候通知我就可以了。显然第二种效率高很多。
通知的方式的话,golang里面通知可以用channel的方式
var mail = make(chan string)
go func() {
<- mail
fmt.Println("get chance to do something")
}()
time.Sleep(5*time.Second)
mail <- "moximoxi"
但是channel的方式的话还是比较适用于一对一的方式,一对多并不是很适合。下面就来介绍一下另一种方式:sync.Cond
sync.Cond就是用于实现条件变量的,是基于sync.Mutext的基础上,增加了一个通知队列,通知的线程会从通知队列中唤醒一个或多个被通知的线程。
主要有以下几个方法:
sync.NewCond(&mutex):生成一个cond,需要传入一个mutex,因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现的。
sync.Wait():用于等待通知
sync.Signal():用于发送单个通知
sync.Broadcat():用于广播
看到上面几个方法的源码
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // 加入到通知队列里面,由signal方法或者broadcast方法发起通知。
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify) // 随机挑选一个进行通知,wait阻塞解除
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify) // 通知所有等待的协程
}
先找到一个sync.Cond的基本用法
var locker sync.Mutex
var cond = sync.NewCond(&locker)
// NewCond(l Locker)里面定义的是一个接口,拥有lock和unlock方法。
// 看到sync.Mutex的方法,func (m *Mutex) Lock(),可以看到是指针有这两个方法,所以应该传递的是指针
func main() {
locker.Lock()
for i := 0; i < 10; i++ {
go func(x int) {
cond.L.Lock() // 获取锁
defer cond.L.Unlock() // 释放锁
cond.Wait() // 等待通知,阻塞当前 goroutine
// 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
fmt.Println(x)
}(i)
}
time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 进入 Wait 阻塞状态
fmt.Println("Signal...")
cond.Signal() // 1 秒后下发一个通知给已经获取锁的 goroutine
time.Sleep(time.Second * 1)
fmt.Println("Signal...")
cond.Signal() // 1 秒后下发下一个通知给已经获取锁的 goroutine
time.Sleep(time.Second * 1)
cond.Broadcast() // 1 秒后下发广播给所有等待的goroutine
fmt.Println("Broadcast...")
time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 执行完毕
}
上述代码实现了主线程对多个goroutine的通知的功能。
抛出一个问题:
主线程执行的时候,如果并不想触发所有的协程,想让不同的协程可以有自己的触发条件,应该怎么使用?
下面就是一个具体的需求:
有四个worker和一个master,worker等待master去分配指令,master一直在计数,计数到5的时候通知第一个worker,计数到10的时候通知第二个和第三个worker。
首先列出几种解决方式
1、所有worker循环去查看master的计数值,计数值满足自己条件的时候,触发操作 >>>>>>>>>弊端:无谓的消耗资源
2、用channel来实现,几个worker几个channel,eg:worker1的协程里<-channel(worker1)进行阻塞,计数值到5的时候,给worker1的channel放入值,
阻塞解除,worker1开始工作。 >>>>>>>弊端:channel还是比较适用于一对一的场景,一对多的时候,需要起很多的channel,不是很美观
3、用条件变量sync.Cond,针对多个worker的话,用broadcast,就会通知到所有的worker。
下面就来实现一下:
func main() {
mutex := sync.Mutex{}
var cond = sync.NewCond(&mutex)
mail := 1
go func() {
for count := 0; count <= 15; count ++{
time.Sleep(time.Second)
mail = count
cond.Broadcast()
}
}()
// worker1
go func() {
for mail != 5 { // 触发的条件,如果不等于5,就会进入cond.Wait()等待,此时cond.Broadcast()通知进来的时候,wait阻塞解除,进入下一个循环,此时发现mail != 5,跳出循环,开始工作。
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
}
fmt.Println("worker1 started to work")
time.Sleep(3*time.Second)
fmt.Println("worker1 work end")
}()
// worker2
go func() {
for mail != 10 {
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
}
fmt.Println("worker2 started to work")
time.Sleep(3*time.Second)
fmt.Println("worker2 work end")
}()
// worker3
go func() {
for mail != 10 {
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
}
fmt.Println("worker3 started to work")
time.Sleep(3*time.Second)
fmt.Println("worker3 work end")
}()
select {
}
}
(qa1):为什么每个worker里要使用for循环?而不是用if?
首先broadcast的时候,会通知到所有的worker,此时wait都会解除,但并不是所有的worker都满足通知条件的,所以加一个for循环,不满足通知条件的会再次wait。
以上部分是对sync.Cond的初步使用的分析,至于具体的实现源码,有时间再细看。
参考文章:
https://cloud.tencent.com/developer/article/1478016
https://blog.51cto.com/qiangmzsx/2106461