【2-3 Golang】Go并发编程—调度器schedule

tomato01 · · 1595 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

&emsp;&emsp;我们一直提到,每一个线程都有一个线程栈,也称为系统栈;协程g0就运行在这个栈上,而且协程g0执行的就是调度逻辑schedule。Go语言调度器是如何管理以及调度这些成千上万个协程呢?和操作系统一样,维护着可运行队列和阻塞队列吗,有没有所谓的按照时间片或者是优先级或者是抢占式调度呢? ## 调度器schedule &emsp;&emsp;我们已经知道每一个P都有一个协程队列runq,该队列存储的都是处于可运行状态的协程,调度器一般情况下只需要从当前p.runq获取协程即可;另外,Go语言为了避免多个P负载分配不均衡,还有一个全局队列sched.runq,如果当前p.runq队列为空,也会从全局队列sched.runq尝试获取协程;如果还为获取不到可执行协程,甚至会从其他P的队列去偷。 &emsp;&emsp;当然,无论是p.runq,还是全局的sched.runq,存储的都是处于可运行状态的协程;那处于阻塞状态的协程呢,这些协程在调度器执行的时候,还处于阻塞状态码?不知道,所以在获取不到可执行协程时,还会尝试去看一下有没有协程解除阻塞了,如果有则还可以调度执行这些协程。 &emsp;&emsp;调度器schedule看着比较简单,获取可运行协程,通过execute调度执行该协程: ``` func schedule() { // schedtick调度计数器,没调度以此加1 // 调度器周期61次,首先从全局队列获取可运行协程 if gp == nil { if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } //调度执行 execute(gp, inheritTime) } ``` &emsp;&emsp;按照之前我们说的,先查找当前P的协程队列,再查找全局队列,但是这样可能会导致全局队列的协程长时间得不到调度,所以Go语言调度器每执行61次,都会优先从全局队列获取可运行协程。注意,在查找全局队列的时候,存在多线程并发问题,所以是需要先加锁的。findrunnable是一个比较复杂的函数,看注释"blocks until work is available",获取不到协程时,甚至会block(当前线程M暂停)。execute当然就是切换栈,执行当前协程了。 ``` func execute(gp *g, inheritTime bool) { //设置协程g与M的互相引用关系 _g_ := getg() _g_.m.curg = gp gp.m = _g_.m //协程状态:运行中 casgstatus(gp, _Grunnable, _Grunning) //协程切换 gogo(&gp.sched) } ``` &emsp;&emsp;gogo我们上一篇文章已经介绍过了,纯汇编代码写的,完成了栈桢的切换,以及代码的跳转。不知道你有没有注意到第二个参数inheritTime,这是什么含义呢?表示这次协程执行是否继承上一个协程的时间片。假如时间片为10ms,上一个协程已经执行了5ms,如果继承,则标明这一个协程最多只能执行5ms,时间片就会结束,从而再次调度其他协程。这么说Go语言调度器是有时间片的概念了?我们先保留一个疑问。 &emsp;&emsp;Go语言什么时候执行调度schedule呢?程序刚启动肯定会执行,而协程因为某些原因阻塞了(chan的读写,socket的读写等等),或者是协程执行结束了,这时候也是需要重新调度其他协程的;协程阻塞通常是通过runtime.gopark函数完成的: ``` func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { // 切换到系统栈,执行park_m mcall(park_m) } func park_m(gp *g) { //协程状态:阻塞 casgstatus(gp, _Grunning, _Gwaiting) //重新调度 schedule() } ``` &emsp;&emsp;协程阻塞之后,想恢复协程的调度呢?与gopark对应的,runtime.goready函数用于恢复协程的调度: ``` func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } func ready(gp *g, traceskip int, next bool) { //更改状态为可运行;添加到P的协程队列 casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) wakep() releasem(mp) } ``` ## 协作式抢占调度 &emsp;&emsp;Go语言调度器到底有时间片的概念吗?其实我们可以通过一个小程序测试一下: ``` package main import ( "fmt" "runtime" ) func main() { //设置P的数目为1 runtime.GOMAXPROCS(1) go func() { fmt.Println("hello world") for { //死循环 } }() //main协程主动让出 runtime.Gosched() fmt.Println("main end") } ``` &emsp;&emsp;上一篇文章讲解协程创建的时候提到,go关键字创建协程时,只是将该协程添加到当前P的队列,并没有调度执行;所以,为了避免主协程执行打印语句结束后程序退出,我们可以通过runtime.Gosched函数使得main协程主动让出CPU,这样Go调度器就能先调度执行其他协程了。另外,我们通过runtime.GOMAXPROCS设置P的数目为1,即最多只能有一个线程M绑定P,即最多只能有一个调度器运行。这样,如果Go语言调度器没有时间片的概念,则一旦子协程执行到循环,就会一直执行死循环,导致调度器再也没有机会调度其他协程了;最终的现象就是main协程的打印语句无法执行。 &emsp;&emsp;执行结果怎么样呢?如果你是在Go1.18环境运行该程序,你会发现正常输出了"main end";但是如果你是在Go1.13版本及以下运行该程序,你将发现程序一直执行,没有输出"main end"。你可以下载两个版本的Go试一试,看看结果是不是这样的。Go1.13版本及以下不会输出,是否说明Go1.13版本及以下,没有时间片的概念?其实也不然。你可以再试试下面这个程序: ``` package main import ( "fmt" "runtime" ) func main() { runtime.GOMAXPROCS(1) go func() { fmt.Println("hello world") var arr []int for i := 0; i < 100; i ++ { arr = append(arr, i) } for { test(arr) } }() runtime.Gosched() fmt.Println("main end") } func test(arr []int) []int { diff := make([]int, len(arr), len(arr)) diff[0] = arr[0] for i := 1; i < len(arr); i ++ { diff[i] = arr[i] - arr[i - 1] } return diff } ``` &emsp;&emsp;这一次我们的死循环不是简单的空语句,而是函数调用,而且test函数也有一些稍微复杂的语句。Go1.13版本及以下再执行这个程序试试呢?你会发现,神奇的是,主协程又输出了"main end"。为什么呢?唯一不同的是第一个程序的死循环只是简单的空语句,第二个程序的循环是函数调用! &emsp;&emsp;怎么,函数调用就特殊?是的,函数调用就是不同于普通语句,就是特殊。Go语言在编译函数的时候,还添加了一些自己的代码。还记得上一篇文章,在介绍协程栈溢出时候提到,Go语言编译阶段,在所有用户函数,都加了一点代码逻辑,判断栈顶指针SP小于某个位置时,说明栈空间不足,需要扩容了。需要扩容的时候,执行的是函数runtime.morestack_noctxt,而该函数(其实是runtime.newstack)不仅仅是判断是否需要扩容,还会判断当前协程是否应该让出CPU。 &emsp;&emsp;注意,Go语言并没有严格限制协程执行时间片,而是通过一种协作式抢占调度(1.13版本及以下)的方式,实现伪时间片功能。这就需要一个帮手了,Go程序启动时不止创建普通的调度线程,还存在辅助线程,辅助线程的主函数是runtime.sysmon,每10ms轮询一次,检测是否有协程执行时间过长,如果有,则通知该协程让出CPU。 ``` //创建新线程,主函数sysmon newm(sysmon, nil) func sysmon() { delay = 10 * 1000 // up to 10ms usleep(delay) for { //preempt long running G's retake(nanotime()) } } ``` &emsp;&emsp;preempt的意思是抢占。我们先思考两个问题: &emsp;&emsp;1)sysmon线程如何判断哪些协程执行时间过长?遍历协程吗?肯定不是这样。想想线程M调度协程流程,要求必须先绑定P,而且M正在调度执行的协程只有一个,所以呢?只需要遍历P,通过p.m.curg就能获取到正在执行的协程。接下来就是检测协程执行时间了,每个协程记录调度时间吗?貌似也行,Go语言为每一个P维护了p.schedtick,M每调度一次协程,该值加1,而且还有一个变量p.schedwhen记录了上次调度的时间。这就好办了,每10毫秒检测的时候,如果p.schedtick没法发生改变,说明这10ms内没有发生调度,则应该通知当前协程p.m.curg让出CPU了。 &emsp;&emsp;2)sysmon线程如何跨线程通知该协程让出CPU呢?还记得协程栈扩容是怎么判断的吗?stackguard0!通知协程让出CPU也是通过在协程栈stackguard0位置设置特殊标识实现的。 &emsp;&emsp;似乎整个流程通顺了,第一步,Go语言编译阶段在test函数添加一些自己的代码,如下: ``` "".test STEXT 0x0000 00000 (test.go:26) CMPQ SP, 16(R14) 0x0004 00004 (test.go:26) PCDATA $0, $-2 0x0004 00004 (test.go:26) JLS 404 0x0194 00404 (test.go:26) MOVQ AX, 8(SP) 0x0199 00409 (test.go:26) MOVQ BX, 16(SP) 0x019e 00414 (test.go:26) MOVQ CX, 24(SP) 0x01a3 00419 (test.go:26) CALL runtime.morestack_noctxt(SB) 0x01b7 00439 (test.go:26) JMP 0 ``` &emsp;&emsp;R14寄存器就是当前协程g,想想结构体g的第一个字段stack占16字节,第二个字段就是stackguard0,所以这里比较栈顶SP寄存器与16(R14)地址大小。那明白了,stackguard0位置处设置的特殊标识肯定是一个非常大的值,任何栈位置都小于该值。 &emsp;&emsp;第二步,sysmon线程10ms周期执行retake函数抢占长时间执行的G: ``` func retake(now int64) uint32 { //遍历所有的P for i := 0; i < len(allp); i++ { _p_ := allp[i] s := _p_.status if s == _Prunning { t := int64(_p_.schedtick) //不等于,说明在这10ms期间重新调度协程了; if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } //G长时间运行 if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } } func preemptone(_p_ *p) bool { mp := _p_.m.ptr() gp := mp.curg if gp == nil || gp == mp.g0 { return false } //抢占标识 gp.preempt = true gp.stackguard0 = stackPreempt return true } const forcePreemptNS = 10 * 1000 * 1000 // 10ms stackPreempt = (1<<(8*sys.PtrSize) - 1) & -1314 //非常大 ``` &emsp;&emsp;一般情况下,每一个P都有可能有正在执行的协程p.m.curg,所以这里需要遍历所有的P,如果P的状态为_Prunning,说明该P已经被M绑定且正在调度协程。p.schedtick每调度一次协程值加1,所以检测时如果这个值与上次记录不一样,则说明这10ms期间肯定重新调度协程了,跳过即可。否则,如果距上次调度时间已经过去很久了,则通过preemptone抢占,看吧,抢占标识就是通过设置gp.stackguard0实现的。 &emsp;&emsp;第三步,子协程执行10ms之后,进入到函数test,检测stackguard0标识,发现栈顶指针SP小于stackguard0,这时候跳转到了runtime.morestack_noctxt函数。函数morestack_noctxt也是汇编写的,一系列判断之后,最终调用了函数runtime.newstack,就是在这里,判断是否被抢占了,如果是,则让出CPU。 ``` func newstack() { gp := getg().m.curg preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { gopreempt_m(gp) // never return;抢占 } } func gopreempt_m(gp *g) { //修改协程状态 casgstatus(gp, _Grunning, _Grunnable) //添加到全局队列 lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) //重新调度 schedule() } ``` &emsp;&emsp;newstack就是通过gp.stackguard0判断是否被抢占了。另外,我们发现协程被抢占时,被添加到了全局队列,这样相当于优先级降低了。 &emsp;&emsp;这就是Go1.13版本及以下实现的协作式抢占调度,协程只有在进入函数时,才有可能检测是否被抢占了,所以死循环中只是简单的语句是无法抢占的。而且函数如果非常简单,还有可能被优化掉,所以你测试的时候,可能发现循环中调用了函数,但是运行结果却显示无法被抢占。 ## 基于信号的抢占式调度 &emsp;&emsp;Go1.13版本及以下是基于协作式的抢占调度,所以死循环中是简单的语句,还是复杂的函数调用,最终结果是不一样的。那Go1.14版本以上呢?好像无论哪一种情况,都能被抢占,是做了哪些优化吗?是的,Go1.14版本以上实现的是基于信号的抢占。 &emsp;&emsp;信号?kill -signal pid发送的就是信号,比如我们常用SIGTERM信号终止进程,而Linux总共有64种信号可供选择。当然,程序想要接收并处理某种信号,还需要设置信号处理器: ``` struct sigaction{ void (*sa_handler)(int); sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); } ``` &emsp;&emsp;sa_hander就是我们的信号处理器函数指针。Go语言设置的信号处理函数为runtime.sighandler。 &emsp;&emsp;下面看一下Go1.18实现的抢占逻辑,同样是函数preemptone: ``` func preemptone(_p_ *p) bool { //协作式抢占标记 gp.stackguard0 = stackPreempt //如果支持信号抢占,发送信号 if preemptMSupported { preemptM(mp) } return true } func preemptM(mp *m) { pthread_kill(pthread(mp.procid), sigPreempt) } const sigPreempt = _SIGURG ``` &emsp;&emsp;函数pthread_kill可用于向指定线程发送信号,选择第几种信号也是有要求的,有很多信号有特殊含义,是不能随便使用的,Go语言选择的协程抢占信号是SIGURG。sysmon线程发送抢占信号,调度线程M就会收到信号,判断收到的是抢占信号,则换出当前协程,重新调度。 ``` func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { //抢占信号 if sig == sigPreempt { doSigPreempt(gp, c) } } ``` &emsp;&emsp;最终其实和协作式抢占一样,都是将当前协程添加到全局队列,触发调度,这里就不在赘述。 ## 总结 &emsp;&emsp;本篇文章主要介绍了Go语言调度器,调度算法由runtime.schedule函数实现,程因为某些原因阻塞了(chan的读写,socket的读写等等),或者是协程执行结束了,都会触发重新调度。另外Go语言还支持抢占调度,辅助协程sysmon检测长时间执行协程,设置抢占标识或者发送抢占信号。Go1.13版本及以下实现的是协作式调度,普通死循环语句没有办法被抢占,只有执行函数调用时(Go编译阶段添加了一些代码),才有可能实现抢占,而Go1.14版本及以上,通过信号实现的抢占调度则没有这个问题。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1595 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传