golang的sync.Cond的用法

zzzyyy111 · · 7389 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本篇来学习一下sync.Cond的用法
使用场景:
我需要完成一项任务,但是这项任务需要满足一定条件才可以执行,否则我就等着。
那我可以怎么获取这个条件呢?一种是循环去获取,一种是条件满足的时候通知我就可以了。显然第二种效率高很多。
通知的方式的话,golang里面通知可以用channel的方式

    var mail = make(chan string)
    go func() {
        <- mail
        fmt.Println("get chance to do something")
    }()
    time.Sleep(5*time.Second)
    mail <- "moximoxi"

但是channel的方式的话还是比较适用于一对一的方式,一对多并不是很适合。下面就来介绍一下另一种方式:sync.Cond
sync.Cond就是用于实现条件变量的,是基于sync.Mutext的基础上,增加了一个通知队列,通知的线程会从通知队列中唤醒一个或多个被通知的线程。
主要有以下几个方法:

sync.NewCond(&mutex):生成一个cond,需要传入一个mutex,因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现的。
sync.Wait():用于等待通知
sync.Signal():用于发送单个通知
sync.Broadcat():用于广播

看到上面几个方法的源码

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)      // 加入到通知队列里面,由signal方法或者broadcast方法发起通知。
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)  // 随机挑选一个进行通知,wait阻塞解除
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)       // 通知所有等待的协程
} 

先找到一个sync.Cond的基本用法

var locker sync.Mutex
var cond = sync.NewCond(&locker)
// NewCond(l Locker)里面定义的是一个接口,拥有lock和unlock方法。
// 看到sync.Mutex的方法,func (m *Mutex) Lock(),可以看到是指针有这两个方法,所以应该传递的是指针
func main() {
    locker.Lock()
    for i := 0; i < 10; i++ {
        go func(x int) {
            cond.L.Lock()         // 获取锁
            defer cond.L.Unlock() // 释放锁
            cond.Wait()           // 等待通知,阻塞当前 goroutine
            // 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
            fmt.Println(x)
        }(i)
    }
    time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 进入 Wait 阻塞状态
    fmt.Println("Signal...")
    cond.Signal()               // 1 秒后下发一个通知给已经获取锁的 goroutine
    time.Sleep(time.Second * 1)
    fmt.Println("Signal...")
    cond.Signal()               // 1 秒后下发下一个通知给已经获取锁的 goroutine
    time.Sleep(time.Second * 1)
    cond.Broadcast()            // 1 秒后下发广播给所有等待的goroutine
    fmt.Println("Broadcast...")
    time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 执行完毕

}

上述代码实现了主线程对多个goroutine的通知的功能。
抛出一个问题:
主线程执行的时候,如果并不想触发所有的协程,想让不同的协程可以有自己的触发条件,应该怎么使用?
下面就是一个具体的需求:
有四个worker和一个master,worker等待master去分配指令,master一直在计数,计数到5的时候通知第一个worker,计数到10的时候通知第二个和第三个worker。
首先列出几种解决方式
1、所有worker循环去查看master的计数值,计数值满足自己条件的时候,触发操作 >>>>>>>>>弊端:无谓的消耗资源
2、用channel来实现,几个worker几个channel,eg:worker1的协程里<-channel(worker1)进行阻塞,计数值到5的时候,给worker1的channel放入值,
阻塞解除,worker1开始工作。 >>>>>>>弊端:channel还是比较适用于一对一的场景,一对多的时候,需要起很多的channel,不是很美观
3、用条件变量sync.Cond,针对多个worker的话,用broadcast,就会通知到所有的worker。
下面就来实现一下:

func main()  {

    mutex := sync.Mutex{}
    var cond = sync.NewCond(&mutex)
    mail := 1
    go func() {
        for count := 0; count <= 15; count ++{
            time.Sleep(time.Second)
            mail = count
            cond.Broadcast()
        }
    }()
    // worker1
    go func() {
        for mail != 5 {          // 触发的条件,如果不等于5,就会进入cond.Wait()等待,此时cond.Broadcast()通知进来的时候,wait阻塞解除,进入下一个循环,此时发现mail != 5,跳出循环,开始工作。
            cond.L.Lock()
            cond.Wait()
            cond.L.Unlock()
        }
        fmt.Println("worker1 started to work")
        time.Sleep(3*time.Second)
        fmt.Println("worker1 work end")
    }()
    // worker2
    go func() {
        for mail != 10 {
            cond.L.Lock()
            cond.Wait()
            cond.L.Unlock()
        }
        fmt.Println("worker2 started to work")
        time.Sleep(3*time.Second)
        fmt.Println("worker2 work end")
    }()
    // worker3
    go func() {
        for mail != 10 {
            cond.L.Lock()
            cond.Wait()
            cond.L.Unlock()
        }
        fmt.Println("worker3 started to work")
        time.Sleep(3*time.Second)
        fmt.Println("worker3 work end")
    }()
    select {

    }
}

(qa1):为什么每个worker里要使用for循环?而不是用if?
首先broadcast的时候,会通知到所有的worker,此时wait都会解除,但并不是所有的worker都满足通知条件的,所以加一个for循环,不满足通知条件的会再次wait。

以上部分是对sync.Cond的初步使用的分析,至于具体的实现源码,有时间再细看。
参考文章:
https://cloud.tencent.com/developer/article/1478016
https://blog.51cto.com/qiangmzsx/2106461


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:zzzyyy111

查看原文:golang的sync.Cond的用法

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

7389 次点击  
加入收藏 微博
1 回复  |  直到 2022-02-22 14:28:12
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传