概述
cond是go语言sync提供的条件变量,通过cond可以让一系列的goroutine在触发某个条件时才被唤醒。每一个cond结构体都包含一个锁L。cond提供了三个方法:
- Signal:调用Signal之后可以唤醒单个goroutine。
- Broadcast:唤醒等待队列中所有的goroutine。
- Wait:会把当前goroutine放入到队列中等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock()报错。
简单使用
创建40个goroutine都wait阻塞住。调用Signal则唤醒第一个goroutine。调用Broadcast则唤醒所有等待的goroutine。
package main
import (
"fmt"
"sync"
"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
func test(x int) {
cond.L.Lock() //获取锁
cond.Wait() //等待通知 暂时阻塞
fmt.Println(x)
time.Sleep(time.Second * 1)
cond.L.Unlock() //释放锁
}
func main() {
for i := 0; i < 40; i++ {
go test(i)
}
fmt.Println("start all")
time.Sleep(time.Second * 3)
fmt.Println("broadcast")
cond.Signal() // 下发一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 3)
cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 3)
cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
time.Sleep(time.Second * 60)
}
源码分析
Cond
type Cond struct {
noCopy noCopy
// 锁的具体实现,通常为 mutex 或者rwmutex
L Locker
// notifyList对象,维护等待唤醒的goroutine队列,使用链表实现
notify notifyList
checker copyChecker
}
// 新建cond初始化cond对象
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
type notifyList struct {
// 等待数量
wait uint32
// 通知数量
notify uint32
// 锁对象
lock mutex
// 链表头
head *sudog
// 链表尾
tail *sudog
}
Wait
// 等待函数
func (c *Cond) Wait() {
c.checker.check()
// 等待计数器加1 看下面具体实现
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
//
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// 此函数在sema.go中控制计数器加1
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
// 此函数在sema.go中
// 获取当前goroutine 添加到链表末端,然后goparkunlock函数休眠阻塞当前goroutine
// goparkunlock函数会让出当前处理器的使用权并等待调度器的唤醒
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
Broadcast
唤醒链表中所有的阻塞中的goroutine,还是使用readyWithTime来实现这个功能
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// 源代码在sema.go中
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Signal
// 调用runtime_notifyListNotifyOne方法唤醒链表头的goroutine
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// runtime_notifyListNotifyOne具体实现 获取链表头部的G,然后调用readyWithTime唤醒goroutine
// 源代码在sema.go中
func notifyListNotifyOne(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lock(&l.lock)
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
有疑问加站长微信联系(非本文作者)