G状态转换图
声明
下面的分析均基于Golang1.14版本。
Gosched--主动让出
在用户代码中执行runtime.Gosched()时执行以下代码。
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so executioexecutionn resumes automatically.
func Gosched() {
checkTimeouts() // 什么都没做
mcall(gosched_m) // 通过mcall来调用 在其它调度文章中提过 切换到g0栈执行该函数且永远不会返回
}
func checkTimeouts() {}
// Gosched continuation on g0.
func gosched_m(gp *g) {
if trace.enabled { // 打印跟踪日志
traceGoSched()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) { // gp为调用mcall的g
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable) // 切换为runnable状态
dropg() // m g的解绑
lock(&sched.lock)
globrunqput(gp) // 因为sched是全局的调度数据 所以加锁访问
unlock(&sched.lock)
schedule() // 调度,寻找下一个可用的g来执行
}
func dropg() {
_g_ := getg() // 此时_g_理应为g0
setMNoWB(&_g_.m.curg.m, nil) // 将当前g指向的m解引用 设置为nil
setGNoWB(&_g_.m.curg, nil) // 将当前的m指向的g解引用 设置为nil
}
执行完Gosched函数后,G将放入全局的可运行的G的队列中,在schedule中将会被再度执行。
park_m--陷入等待
当用户代码执行channel读写导致阻塞时,最终都是调用park_m将G设置为waitting状态(time.Sleep()底层也是调用channel的读写,后面有时间再分析golang timer的实现)。
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// Reason explains why the goroutine has been parked.
// It is displayed in stack traces and heap dumps.
// Reasons should be unique and descriptive.
// Do not re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem() // 给m加锁
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 记录g休眠的原因和上下文
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp) // 释放m的锁
// can't do anything that might move the G between Ms here.
mcall(park_m) // 切换到g0执行park_m
}
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled { // 打印调度信息
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
// 状态切为等待状态
casgstatus(gp, _Grunning, _Gwaiting)
dropg() // m g的解绑
// 进入等待状态前执行前置函数
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock) // 执行上层设置的进入wait状态前的函数
_g_.m.waitunlockf = nil // 执行完成后该函数设置为0
_g_.m.waitlock = nil // 释放锁
if !ok { // 如果执行失败 则不让g陷入等待状态 继续执行该G
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
执行完park_m后,G进入waiting状态,此时G不在任何一个可运行的队列里,需要通过ready函数进入runnable状态后才会放入可运行队列。
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var 给当前的m加锁
if status&^_Gscan != _Gwaiting { // ready处理的g必然是waiting状态
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next) // 将g放入可运行队列中
// 如果有空闲的p 并且没有正在spinning状态的m 则唤醒一个p
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
releasem(mp) //释放当前m的锁
}
ready函数并没有通过mcall执行,而是通过systemstack进行执行,在之前我们已经知道systemstack执行完这部分代码后会切回原来的g继续执行。为什么呢?思考什么时候会调用goready,通常是原来的G被阻塞,比如channel读写,以读为例,当G1的channel阻塞在读时,另一个G2往channel里写数据会触发goready,将G1设置为可运行,此时G2理应继续执行自己的代码,G1进入可运行队列后,自然会被其它M调度进入运行状态。
Gpreempt--信号式抢占
1.14是通过信号来触发抢占调度,信号注册代码和信号处理函数如下。
// Initialize signals. 删减了大部分代码 由下面代码可知所有信号的处理函数都是sighandler.
func initsig(preinit bool) {
for i := uint32(0); i < _NSIG; i++ {
setsig(i, funcPC(sighandler)) //
}
}
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
c := &sigctxt{info, ctxt}
if sig == sigPreempt {
doSigPreempt(gp, c) // 可知当信号为抢占信号时 调用doSigPreempt函数
}
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
// Inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt)) // 将asyncPreempt函数指针赋给ctx,最终会执行 asyncPreempt。
}
}
// asyncPreempt saves all user registers and calls asyncPreempt2.
// 最终执行的时asyncPreempt2 下面分别看preemptPark和gopreempt_m的实现
func asyncPreempt()
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark) // 当正在GC时会调用该函数。
} else {
mcall(gopreempt_m) // 正常情况下会调用该函数
}
gp.asyncSafePoint = false
}
// 正常情况下调用gopreempt_m 最终调用goshcedImpl即上面Gosched调用的函数,会进入runnable状态,放入全局队列中
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
func preemptPark(gp *g) {
if trace.enabled {
traceGoPark(traceEvGoBlock, 0)
}
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
gp.waitreason = waitReasonPreempted
casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
dropg() // g m相互解绑
casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 最终G为_Gpreempted状态。
schedule()
}
抢占后,如果不在GC状态,则调用goschedImpl,进入runnable状态等待下一次执行。如果正在GC,G会进入_Gpreempted状态,此时会在完成GC后通过suspendG进入waiting,由于和GC关系比较紧密,因此暂时不分析,等内存管理,GC相关的了解后,再把坑填上。
一个问题:以上步骤是否缺少了什么?当一个正在运行的G被信号抢占时,应该把当前的上下文保存,当下次调度时,恢复上下文以继续执行。
entersyscall--系统调用
系统调用和cgo调用都会进入_GSyscall状态,最终调用的函数都是reentersyscall,entersyscall主要是设置g和p的状态以及g,m的相互解绑。
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
_g_.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
_g_.stackguard0 = stackPreempt
_g_.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态
if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
systemstack(func() {
print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
throw("entersyscall")
})
}
// 按需记录调用信息
if trace.enabled {
systemstack(traceGoSysCall)
// systemstack itself clobbers g.sched.{pc,sp} and we might
// need them later when the G is genuinely blocked in a
// syscall
save(pc, sp)
}
if atomic.Load(&sched.sysmonwait) != 0 { // 如果系统监控线程在等待 则执行下面的函数 具体不深究
systemstack(entersyscall_sysmon)
save(pc, sp)
}
if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不为0 则执行 具体不深究
// runSafePointFn may stack split if run on this stack
systemstack(runSafePointFn)
save(pc, sp)
}
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
// m p 解绑
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp)
_g_.m.p = 0
atomic.Store(&pp.status, _Psyscall) //切换P的状态
if sched.gcwaiting != 0 { // 正在gc时 则调用以下代码 详细的不深究
systemstack(entersyscall_gcwait)
save(pc, sp)
}
_g_.m.locks--
}
当系统调用结束后会调用exitsyscall,该函数的流程是,先判断是否是快速退出,即g和p未解绑,如果是则继续运行,否则一直等待直到有可运行的p和当前的m绑定。由此也可看出调度系统对系统调用和cgo的处理,如果快速返回,则继续运行,否则剥夺绑定的P,让P绑定其它的m调度运行避免P可运行队列中的G阻塞。
func exitsyscall() {
_g_ := getg()
_g_.m.locks++ // see comment in entersyscall
if getcallersp() > _g_.syscallsp {
throw("exitsyscall: syscall frame is no longer valid")
}
_g_.waitsince = 0
oldp := _g_.m.oldp.ptr()
_g_.m.oldp = 0
// 满足继续运行的条件 则继续 running
if exitsyscallfast(oldp) {
if trace.enabled {
if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick {
systemstack(traceGoStart)
}
}
// There's a cpu for us, so we can run.
_g_.m.p.ptr().syscalltick++
// We need to cas the status and scan before resuming...
casgstatus(_g_, _Gsyscall, _Grunning)
// Garbage collector isn't running (since we are),
// so okay to clear syscallsp.
_g_.syscallsp = 0
_g_.m.locks--
if _g_.preempt {
// restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
} else {
// otherwise restore the real _StackGuard, we've spoiled it in entersyscall/entersyscallblock
_g_.stackguard0 = _g_.stack.lo + _StackGuard
}
_g_.throwsplit = false
// 如果g不能继续运行 则调度放弃运行
if sched.disable.user && !schedEnabled(_g_) {
// Scheduling of this goroutine is disabled.
Gosched()
}
return
}
_g_.sysexitticks = 0
if trace.enabled {
// Wait till traceGoSysBlock event is emitted.
// This ensures consistency of the trace (the goroutine is started after it is blocked).
for oldp != nil && oldp.syscalltick == _g_.m.syscalltick {
osyield()
}
// We can't trace syscall exit right now because we don't have a P.
// Tracing code can invoke write barriers that cannot run without a P.
// So instead we remember the syscall exit time and emit the event
// in execute when we have a P.
_g_.sysexitticks = cputicks()
}
_g_.m.locks--
// 没有空闲的P可以运行
// Call the scheduler.
mcall(exitsyscall0)
// Scheduler returned, so we're allowed to run now.
// Delete the syscallsp information that we left for
// the garbage collector during the system call.
// Must wait until now because until gosched returns
// we don't know for sure that the garbage collector
// is not running.
_g_.syscallsp = 0
_g_.m.p.ptr().syscalltick++
_g_.throwsplit = false
}
func exitsyscallfast(oldp *p) bool {
_g_ := getg()
// Freezetheworld sets stopwait but does not retake P's.
if sched.stopwait == freezeStopWait {
return false
}
// Try to re-acquire the last P. sysmon线程未剥夺P
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
// There's a cpu for us, so we can run.
// 切换p的状态 并且重新绑定m p
wirep(oldp)
exitsyscallfast_reacquired()
return true
}
// Try to get any other idle P. 有空闲的P
if sched.pidle != 0 {
var ok bool
systemstack(func() {
ok = exitsyscallfast_pidle() //获取空闲的P 并且绑定m p
if ok && trace.enabled {
if oldp != nil {
// Wait till traceGoSysBlock event is emitted.
// This ensures consistency of the trace (the goroutine is started after it is blocked).
for oldp.syscalltick == _g_.m.syscalltick {
osyield()
}
}
traceGoSysExit(0)
}
})
if ok {
return true
}
}
return false
}
func exitsyscall0(gp *g) {
_g_ := getg()
// 将g和m解绑 g变为可运行状态
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
// 尝试获取p
var _p_ *p
if schedEnabled(_g_) {
_p_ = pidleget()
}
if _p_ == nil {
globrunqput(gp) // 将当前g放入 global 可运行队列
} else if atomic.Load(&sched.sysmonwait) != 0 {
atomic.Store(&sched.sysmonwait, 0)
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
// 如果获取p成功则继续运行
if _p_ != nil {
acquirep(_p_)
execute(gp, false) // Never returns.
}
// 如果lockedg和m强绑定 停止并等待 直到获取p可继续运行
if _g_.m.lockedg != 0 {
// Wait until another thread schedules gp and so m again.
stoplockedm()
execute(gp, false) // Never returns.
}
// 停止m并一直等待到有可运行的p
stopm()
schedule() // Never returns.
}
newstack--栈扩容
newstack的代码主要是对栈进行扩容,涉及内存管理。大致流程是先将G的状态设置为_Gcopystack,然后对栈进行扩容,扩容完成后,将G设置为_Grunning状态,代码不深究。
schedule--寻找下一个可运行的g
shedule先从本地队列中找可运行的g,找不到则从全局的队列中找,再找不到则尝试从其它p中寻找,如果始终找不到则stopm直到被唤醒。
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 { // 如果有g绑定在m上 则直接运行该g
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 { // 如果正在gc 则等待gc完成后继续执行
gcstopm()
goto top
}
if pp.runSafePointFn != 0 { // 如果m有为runSafePointFn 则限制性该函数
runSafePointFn()
}
// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
checkTimers(pp, 0) // timer相关 暂时不深究
var gp *g
var inheritTime bool
// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
if trace.enabled || trace.shutdown { // 打印调试信息
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 { // gc相关 不深究
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil { // 每61次schedtick调度 尝试从全局的可执行队列里取一个g
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr()) // 从p的本地队列中拿
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil {
// 尝试从其它p中偷部分g来运行 如果偷取失败则stopm直到被唤醒
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning { // spinning 表示正在找可运行的g 此时已准备运行 不再spinning
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) { // 如果g不能被调度 则放弃执行该g
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
if gp.lockedm != 0 { // 如果g绑定了可运行的m 则放弃执行该g
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
// execute中主要是切换g的状态 g m的互相绑定
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
gogo(&gp.sched)
}
// gogo由汇编实现 主要是切换上下文 然后跳转到g的pc进行执行 参考asm_arm64.s文件。
TEXT runtime·gogo(SB), NOSPLIT, $24-8
MOVD buf+0(FP), R5 // R5 = g.sched
MOVD gobuf_g(R5), g // g = g.shced.g
BL runtime·save_g(SB) // 将当前的g写入线程缓存
MOVD 0(g), R4 // make sure g is not nil
MOVD gobuf_sp(R5), R0 // R0 = g.sched.sp
MOVD R0, RSP // RSP = R0 = g.sched.sp
MOVD gobuf_bp(R5), R29 // R29 = g.sched.bp
MOVD gobuf_lr(R5), LR // LR = g.sched.lr
MOVD gobuf_ret(R5), R0 // R0 = g.sched.ret
MOVD gobuf_ctxt(R5), R26 // R26 = r.sched.ctxt
// 将g之前的上下文清0
MOVD $0, gobuf_sp(R5)
MOVD $0, gobuf_bp(R5)
MOVD $0, gobuf_ret(R5)
MOVD $0, gobuf_lr(R5)
MOVD $0, gobuf_ctxt(R5)
CMP ZR, ZR // set condition codes for == test, needed by stack split
MOVD gobuf_pc(R5), R6 // R6 = g.sched.pc
B (R6)
未填的坑:
1.schedule中几个重要的函数未展开。
2.save_g的实现未展开。
一个循环
2张图结合来看,Gosched,park_m,asyncPreempt,exitsyscall这些函数都是通过mcall来调用执行,且执行到最后都会调用schedule进入下一个循环。
newstack不会陷入调度,会继续执行,existsyscall_fast也会继续执行。
总结
1.有坑还没填 要继续填。
2.G可以看作带pc,sp等上下文和自己独有栈的消息队列。
3.待续
有疑问加站长微信联系(非本文作者)