说明
虽然我们一直强调golang调度器是非抢占式。非抢占式的一个最大坏处是无法保证公平性,如果一个g处于死循环状态,那么其他协程可能就会被饿死。 所幸的是,Golang在1.4版本中加入了抢占式调度的逻辑,抢占式调度必然可能在g执行的某个时刻被剥夺cpu,让给其他协程。
实现
还记得我们之前说过Golang的sysmon协程么,该协程会定期唤醒作系统状态检查,我们前面说过了它如何检查处于Psyscall状态的p,以便让处于系统调用状态的P可以被继续执行,不至于饿死。 除了检查这个意外,sysmon还检查处于Prunning状态的P,检查它的目的就是避免这里的某个g占用了过多的cpu时间,并在某个时刻剥夺其cpu运行时间。
static uint32 retake(int64 now)
{
uint32 i, s, n;
int64 t;
P *p;
Pdesc *pd;
n = 0;
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p==nil)
continue;
pd = &pdesc[i];
s = p->status;
if(s == Psyscall) {
......
} else if(s == Prunning) {
// Preempt G if it's running for more than 10ms.
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// 如果自从上次发生调度时间已经超过了10ms
preemptone(p);
}
}
return n;
}
// 这里的抢占只是将g的preempt设置为true
// 只有在g进行函数调用时才会检查该标志位
// 并进而可能发生调度,非常弱
static bool preemptone(P *p)
{
M *mp;
G *gp;
mp = p->m;
if(mp == nil || mp == g->m)
return false;
gp = mp->curg;
if(gp == nil || gp == mp->g0)
return false;
gp->preempt = true;
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp->stackguard0 = StackPreempt;
return true;
}
之前我们说过在函数调用时会进行堆栈检测,现在将gp->stackGuard0设置为StackPreempt(-1314,非常小的值),肯定会调用一次runtime.morestack,逻辑如下:
TEXT runtime·morestack(SB),NOSPLIT,$0-0
// Cannot grow scheduler stack (m->g0).
get_tls(CX)
MOVQ g(CX), BX
MOVQ g_m(BX), BX
MOVQ m_g0(BX), SI
CMPQ g(CX), SI
JNE 2(PC)
INT $3
// Cannot grow signal stack (m->gsignal).
MOVQ m_gsignal(BX), SI
CMPQ g(CX), SI
JNE 2(PC)
INT $3
// Called from f.
// Set m->morebuf to f's caller.
MOVQ 8(SP), AX // f's caller's PC
MOVQ AX, (m_morebuf+gobuf_pc)(BX)
LEAQ 16(SP), AX // f's caller's SP
MOVQ AX, (m_morebuf+gobuf_sp)(BX)
get_tls(CX)
MOVQ g(CX), SI
MOVQ SI, (m_morebuf+gobuf_g)(BX)
// Set g->sched to context in f.
MOVQ 0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
MOVQ SI, (g_sched+gobuf_g)(SI)
LEAQ 8(SP), AX // f's SP
MOVQ AX, (g_sched+gobuf_sp)(SI)
MOVQ DX, (g_sched+gobuf_ctxt)(SI)
MOVQ BP, (g_sched+gobuf_bp)(SI)
// Call newstack on m->g0's stack.
MOVQ m_g0(BX), BX
MOVQ BX, g(CX)
MOVQ (g_sched+gobuf_sp)(BX), SP
CALL runtime·newstack(SB)
MOVQ $0, 0x1003 // crash if newstack returns
RET
最终调用newstack来进行堆栈扩容:
func newstack() {
thisg := getg()
// TODO: double check all gp. shouldn't be getg().
if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
throw("stack growth after fork")
}
if thisg.m.morebuf.g.ptr() != thisg.m.curg {
print("runtime: newstack called from g=", thisg.m.morebuf.g, "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
morebuf := thisg.m.morebuf
traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
throw("runtime: wrong goroutine in newstack")
}
gp := thisg.m.curg
morebuf := thisg.m.morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0
rewindmorestack(&gp.sched)
// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
preempt := atomicloaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
......
// 进行重新调度
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
if gp.preemptscan {
for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
// Likely to be racing with the GC as
// it sees a _Gwaiting and does the
// stack scan. If so, gcworkdone will
// be set and gcphasework will simply
// return.
}
if !gp.gcscandone {
scanstack(gp)
gp.gcscandone = true
}
gp.preemptscan = false
gp.preempt = false
casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
casgstatus(gp, _Gwaiting, _Grunning)
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
// Act like goroutine called runtime.Gosched.
casgstatus(gp, _Gwaiting, _Grunning)
// 放弃当前协程,调度新协程执行
gopreempt_m(gp) // never return
}
}
这里需要注意两个东西:
- thisg := getg():这个代表当前执行newstack()函数的堆栈,也是当前线程的g0的stack;
- gp := thisg.m.curg:这个代表的是申请栈扩容的协程,与上面的thisg不是一个东西。
因为虽然调用了newstack,但是对于stackguard0==stackPreempt的协程来说,它的目的压根不是堆栈扩容,而是发起一次调度,所以直接进入了gopreempt_m,这里将当前协程挂起,并发起一次schedule().
有疑问加站长微信联系(非本文作者)