# 协程调度时机三：抢占式调度

## 实现

``````static uint32 retake(int64 now)
{
uint32 i, s, n;
int64 t;
P *p;

Pdesc *pd;
n = 0;

for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p==nil)
continue;
pd = &pdesc[i];
s = p->status;
if(s == Psyscall) {
......
} else if(s == Prunning) {
// Preempt G if it's running for more than 10ms.
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// 如果自从上次发生调度时间已经超过了10ms
preemptone(p);
}
}
return n;
}

// 这里的抢占只是将g的preempt设置为true
// 只有在g进行函数调用时才会检查该标志位
// 并进而可能发生调度，非常弱
static bool preemptone(P *p)
{
M *mp;
G *gp;

mp = p->m;
if(mp == nil || mp == g->m)
return false;
gp = mp->curg;
if(gp == nil || gp == mp->g0)
return false;
gp->preempt = true;

// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp->stackguard0 = StackPreempt;
return true;
}
``````

``````TEXT runtime·morestack(SB),NOSPLIT,\$0-0
// Cannot grow scheduler stack (m->g0).
get_tls(CX)
MOVQ    g(CX), BX
MOVQ    g_m(BX), BX
MOVQ    m_g0(BX), SI
CMPQ    g(CX), SI
JNE 2(PC)
INT \$3

// Cannot grow signal stack (m->gsignal).
MOVQ    m_gsignal(BX), SI
CMPQ    g(CX), SI
JNE 2(PC)
INT \$3
// Called from f.
// Set m->morebuf to f's caller.
MOVQ    8(SP), AX   // f's caller's PC
MOVQ    AX, (m_morebuf+gobuf_pc)(BX)
LEAQ    16(SP), AX  // f's caller's SP
MOVQ    AX, (m_morebuf+gobuf_sp)(BX)
get_tls(CX)
MOVQ    g(CX), SI
MOVQ    SI, (m_morebuf+gobuf_g)(BX)

// Set g->sched to context in f.
MOVQ    0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
MOVQ SI, (g_sched+gobuf_g)(SI)
LEAQ    8(SP), AX // f's SP
MOVQ    AX, (g_sched+gobuf_sp)(SI)
MOVQ    DX, (g_sched+gobuf_ctxt)(SI)
MOVQ    BP, (g_sched+gobuf_bp)(SI)
// Call newstack on m->g0's stack.
MOVQ    m_g0(BX), BX
MOVQ BX, g(CX)

MOVQ    (g_sched+gobuf_sp)(BX), SP
CALL    runtime·newstack(SB)

MOVQ \$0, 0x1003  // crash if newstack returns
RET
``````

``````func newstack() {
thisg := getg()
// TODO: double check all gp. shouldn't be getg().
if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
throw("stack growth after fork")
}
if thisg.m.morebuf.g.ptr() != thisg.m.curg {
print("runtime: newstack called from g=", thisg.m.morebuf.g, "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
morebuf := thisg.m.morebuf
traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
throw("runtime: wrong goroutine in newstack")
}

gp := thisg.m.curg
morebuf := thisg.m.morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0
rewindmorestack(&gp.sched)

// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
......
// 进行重新调度
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}

if gp.preemptscan {
for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
// Likely to be racing with the GC as
// it sees a _Gwaiting and does the
// stack scan. If so, gcworkdone will
// be set and gcphasework will simply
// return.
}
if !gp.gcscandone {
scanstack(gp)
gp.gcscandone = true
}
gp.preemptscan = false
gp.preempt = false
casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
casgstatus(gp, _Gwaiting, _Grunning)
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}

// Act like goroutine called runtime.Gosched.
casgstatus(gp, _Gwaiting, _Grunning)
// 放弃当前协程，调度新协程执行
gopreempt_m(gp) // never return
}
}
``````

• thisg := getg()：这个代表当前执行newstack()函数的堆栈，也是当前线程的g0的stack；
• gp := thisg.m.curg：这个代表的是申请栈扩容的协程，与上面的thisg不是一个东西。

0 回复

• 请尽量让自己的回复能够对别人有帮助
• 支持 Markdown 格式, **粗体**、~~删除线~~、``单行代码``
• 支持 @ 本站用户；支持表情（输入 : 提示），见 Emoji cheat sheet
• 图片支持拖拽、截图粘贴等方式上传

## 实现

``````static uint32 retake(int64 now)
{
uint32 i, s, n;
int64 t;
P *p;

Pdesc *pd;
n = 0;

for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p==nil)
continue;
pd = &pdesc[i];
s = p->status;
if(s == Psyscall) {
......
} else if(s == Prunning) {
// Preempt G if it's running for more than 10ms.
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// 如果自从上次发生调度时间已经超过了10ms
preemptone(p);
}
}
return n;
}

// 这里的抢占只是将g的preempt设置为true
// 只有在g进行函数调用时才会检查该标志位
// 并进而可能发生调度，非常弱
static bool preemptone(P *p)
{
M *mp;
G *gp;

mp = p->m;
if(mp == nil || mp == g->m)
return false;
gp = mp->curg;
if(gp == nil || gp == mp->g0)
return false;
gp->preempt = true;

// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp->stackguard0 = StackPreempt;
return true;
}
``````

``````TEXT runtime·morestack(SB),NOSPLIT,\$0-0
// Cannot grow scheduler stack (m->g0).
get_tls(CX)
MOVQ    g(CX), BX
MOVQ    g_m(BX), BX
MOVQ    m_g0(BX), SI
CMPQ    g(CX), SI
JNE 2(PC)
INT \$3

// Cannot grow signal stack (m->gsignal).
MOVQ    m_gsignal(BX), SI
CMPQ    g(CX), SI
JNE 2(PC)
INT \$3
// Called from f.
// Set m->morebuf to f's caller.
MOVQ    8(SP), AX   // f's caller's PC
MOVQ    AX, (m_morebuf+gobuf_pc)(BX)
LEAQ    16(SP), AX  // f's caller's SP
MOVQ    AX, (m_morebuf+gobuf_sp)(BX)
get_tls(CX)
MOVQ    g(CX), SI
MOVQ    SI, (m_morebuf+gobuf_g)(BX)

// Set g->sched to context in f.
MOVQ    0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
MOVQ SI, (g_sched+gobuf_g)(SI)
LEAQ    8(SP), AX // f's SP
MOVQ    AX, (g_sched+gobuf_sp)(SI)
MOVQ    DX, (g_sched+gobuf_ctxt)(SI)
MOVQ    BP, (g_sched+gobuf_bp)(SI)
// Call newstack on m->g0's stack.
MOVQ    m_g0(BX), BX
MOVQ BX, g(CX)

MOVQ    (g_sched+gobuf_sp)(BX), SP
CALL    runtime·newstack(SB)

MOVQ \$0, 0x1003  // crash if newstack returns
RET
``````

``````func newstack() {
thisg := getg()
// TODO: double check all gp. shouldn't be getg().
if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
throw("stack growth after fork")
}
if thisg.m.morebuf.g.ptr() != thisg.m.curg {
print("runtime: newstack called from g=", thisg.m.morebuf.g, "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
morebuf := thisg.m.morebuf
traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
throw("runtime: wrong goroutine in newstack")
}

gp := thisg.m.curg
morebuf := thisg.m.morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0
rewindmorestack(&gp.sched)

// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
......
// 进行重新调度
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}

if gp.preemptscan {
for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
// Likely to be racing with the GC as
// it sees a _Gwaiting and does the
// stack scan. If so, gcworkdone will
// be set and gcphasework will simply
// return.
}
if !gp.gcscandone {
scanstack(gp)
gp.gcscandone = true
}
gp.preemptscan = false
gp.preempt = false
casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
casgstatus(gp, _Gwaiting, _Grunning)
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}

// Act like goroutine called runtime.Gosched.
casgstatus(gp, _Gwaiting, _Grunning)
// 放弃当前协程，调度新协程执行
gopreempt_m(gp) // never return
}
}
``````

• thisg := getg()：这个代表当前执行newstack()函数的堆栈，也是当前线程的g0的stack；
• gp := thisg.m.curg：这个代表的是申请栈扩容的协程，与上面的thisg不是一个东西。