有群友说面试的时候被问到:单核CPU,开两个goroutine,其中一个死循环,会怎么样?相信很多小伙伴乍一看一脸懵,我就在群里回了一下go1.14版本实现了基于信号的抢占式调度,可以在goroutine执行时间过长的时候强制让出调度,执行其他的goroutine。接下来看看具体怎么实现的,话不多说直接上代码。基于go1.15 linux amd64。
先看一个常规的栗子
func f1() {
fmt.Println("This is f1")
}
func f2() {
fmt.Println("This is f2")
}
func main() {
runtime.GOMAXPROCS(1)
go f1()
go f2()
time.Sleep(100 * time.Millisecond)
fmt.Println("success")
}
// always print f2 f1
以上代码就是模拟单核CPU下go语言的执行情况,无论运行多少次,输出的结果总是 f2 f1,解释如下
func main() {
// 只有一个 P 了,目前运行主 goroutine
runtime.GOMAXPROCS(1)
// 创建一个 G1 , call $runtime.newproc -> runqput(_p_, newg, true)放入到本地队列,注意这里的 next 参数为 true,代表放在队列头部
go f1()
// 因为只有一个 P 主 goroutine继续运行
// 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
go f2()
// 等待 f1 f2的执行 gopark 主 goroutine GMP调度可运行的 G
// 按顺序调用 G2 G1
// 所以不管执行多少次,结果都是
// This is f2
// This is f1
// success
time.Sleep(100 * time.Millisecond)
fmt.Println("success")
}
如果将runtime.GOMAXPROCS(1)改成runtime.GOMAXPROCS(4)即多核CPU,你就会发现 f1 和 f2 交替执行,没有明确的先后,这种事件A和B完全无序执行,即为并发。利用多核CPU同时执行A和B的情况,即为并行。为什么会这样涉及到go语言的GMP调度模型,这里不赘述,有兴趣的小伙伴可以自行学习。
既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?
func f1() {
fmt.Println("This is f1")
}
func f2() {
// 死循环
for {
}
fmt.Println("This is f2")
}
func main() {
runtime.GOMAXPROCS(1)
go f1()
go f2()
time.Sleep(100 * time.Millisecond)
fmt.Println("success")
}
// This is f1
// success
你会发现虽然 f2 block住了没有输出,但是完全没影响f1和主goroutine的运行,这其实就是抢占式调度。golang在之前的版本中已经实现了抢占调度,但有些场景是无法抢占成功的。比如轮询计算 for { i++ } 等,这类操作无法进行newstack、morestack、syscall,无法检测stackguard0 = stackpreempt的场景。通俗点说之前版本实现的抢占调度发生在goroutine虽然执行了很长时间,但还得继续调用函数等操作,才能检查是不是需要抢占。
以下是src/runtime/stack.go/newstack()中抢占调度的内容:
func newstack() {
...
// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
// 如果是发起的抢占请求而非真正的栈分段
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
// 如果正持有锁、分配内存或抢占被禁用,则不发生抢占
if preempt {
if !canPreemptM(thisg.m) {
// Let the goroutine keep running for now. 不发生抢占,继续调度
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return 重新进入调度循环
}
}
...
// 如果需要抢占
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
if gp.preemptShrink {
// We're at a synchronous safe point now, so
// do the pending stack shrink.
gp.preemptShrink = false
shrinkstack(gp)
}
if gp.preemptStop {
preemptPark(gp) // never returns 进入循环调度
}
// Act like goroutine called runtime.Gosched. 表现得像是调用了 runtime.Gosched,主动让权,进入循环调度
gopreempt_m(gp) // never return
}
...
}
显然上面的抢占调度还是存在一些问题的,GO团队在go1.14版本中实现了基于信号协程调度抢占。下面看下如何实现的。
首先是信号的发送方:Go Runtime 在启动程序的时候,会创建一个独立的 M 作为监控线程,称为 sysmon,它是一个系统级的 daemon 线程。这个sysmon 独立于 GPM 之外,也就是说不需要P就可以运行,也是作为抢占信号的发送方一直运行。
src/runtime/proc.go/main()
// The main goroutine.
func main() {
...
// 启动系统后台监控(定期垃圾回收、并发任务调度)
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
// 系统监控在一个独立的 m 上运行
newm(sysmon, nil, -1)
})
}
...
}
src/runtime/proc.go/sysmon(),重点是里面的retake()
func sysmon() {
lock(&sched.lock)
// 不计入死锁的系统 m 的数量
sched.nmsys++
// 死锁检查
checkdead()
unlock(&sched.lock)
lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody 没有 wokeup 的周期数
delay := uint32(0)
//死循环一直执行
for {
if idle == 0 { // start with 20us sleep... 每次启动先休眠 20us
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms... 1ms 后就翻倍休眠时间
delay *= 2
}
if delay > 10*1000 { // up to 10ms 增加到 10ms
delay = 10 * 1000
}
// 休眠
usleep(delay)
now := nanotime()
// timer定时器检查
next, _ := timeSleepUntil()
...
// retake P's blocked in syscalls
// and preempt long running G's 抢夺在 syscall 中阻塞的 P、运行时间过长的 G
if retake(now) != 0 {
idle = 0
} else {
idle++
}
...
}
}
src/runtime/proc.go/retake()
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world. 防止 allp 数组发生变化,除非我们已经 STW,此锁将完全没有人竞争
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long. 如果 G 运行时时间太长则进行抢占
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 运行超过10ms,就在这里了
preemptone(_p_)
// In case of syscall, preemptone() doesn't 对于 syscall 的情况,因为 M 没有与 P 绑定,
// work, because there is no M wired to P. preemptone() 不工作
sysretake = true
}
}
// 对阻塞在系统调用上的 P 进行抢占
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
// 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,一方面,在没有其他 work 的情况下,我们不希望抢夺 P
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.另一方面,因为它可能阻止 sysmon 线程从深度睡眠中唤醒,所以最终我们仍希望抢夺 P
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock. 解除 allpLock,从而可以获取 sched.lock
unlock(&allpLock)
// Need to decrement number of idle locked M's 在 CAS 之前需要减少空闲 M 的数量(假装某个还在运行)
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock. 否则发生抢夺的 M 可能退出 syscall 然后再增加 nmidle ,进而发生死锁
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
// 转移 P 有其他任务就创建 M 去执行 一圈找下来都没有就放入空闲的 P 列表
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
preemptone -> preemptM(mp)->signalM(mp, sigPreempt)直至发起系统调用,完成抢占信号的发送。
信号的接收方:
Go 运行时初始化是会调用runtime.mstart完成信号处理的初始化行为,调用mstart1()
src/runtime/proc.go/mstart1()
func mstart1() {
...
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
// 设置信号 handler;在 minit 之后,以便 minit 可以准备处理信号的的线程
if _g_.m == &m0 {
// 只在当前 m 是 m0 的时候执行, mstartm0主要就是初始化信号处理 initsig
mstartm0()
}
...
}
src/runtime/proc.go/mstartm0()
func mstartm0() {
...
// 信号处理初始化
initsig(false)
}
src/runtime/signal_unix.go/initsig()
func initsig(preinit bool) {
...
// 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
setsig(i, funcPC(sighandler))
...
}
src/runtime/signal_unix.go/sighandler()
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal. 可能是一个抢占信号
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we 即便这是一个抢占信号,它也可能与其他信号进行混合,因此我们
// still let it through to the application. 继续进行处理。
}
}
doSigPreempt->asyncPreempt->asyncPreempt2
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
// 异步抢占过程结束
gp.asyncSafePoint = false
}
不管是preemptPark还是gopreempt_m,最终都是进入调度循环schedule(),去执行其他的 G。
终于写完了,撒花,菜鸟一枚,有什么不对的欢迎评论留言指正,谢谢。
有疑问加站长微信联系(非本文作者)