调度器就是将goroutine分配到工作线程中运行
涉及3种类型的对象:
G - goroutine
M - 工作线程即os线程
P - 处理器,一种用来运行go代码的抽象资源,最大数目不能超过GOMAXPROCS,在运行go代码时需要关联一个M
全局的运行队列:
G *runtime·sched.runqhead;
G *runtime·sched.runqtail;
int runtime·sched.runqsize;
P结构的主要成员:包含一个本地运行队列
struct P
{
uint32 status; // one of Pidle/Prunning/...
uint32 schedtick; // incremented on every scheduler call
M* m; // back-link to associated M (nil if idle)
// Queue of runnable goroutines.
uint32 runqhead;
uint32 runqtail;
G* runq[256];
};
schedule函数主要部分代码
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
G *gp;
uint32 tick;
......
top:
......
gp = nil;
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
tick = g->m->p->schedtick;
// This is a fancy way to say tick%61==0,
// it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
runtime·lock(&runtime·sched.lock);
gp = globrunqget(g->m->p, 1);
runtime·unlock(&runtime·sched.lock);
if(gp)
resetspinning();
}
if(gp == nil) {
gp = runqget(g->m->p);
if(gp && g->m->spinning)
runtime·throw("schedule: spinning with local work");
}
if(gp == nil) {
gp = findrunnable(); // blocks until work is available
resetspinning();
}
execute(gp);
}
调度器在超过一定间隔时间的情况下,为了公平原则,首先会从全局的运行队列获取G
从本地的运行队列中获取G
等待新的G进入运行队列
globrunqget从全局运行队列获取G,同时它还会将一定数量的G转移到P的本地运行队列中.
runqget从本地运行队列获取G,本地运行队列的实现是无锁的:
// Get g from local runnable queue.
// Executed only by the owner P.
static G*
runqget(P *p)
{
G *gp;
uint32 t, h;
for(;;) {
h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
t = p->runqtail;
if(t == h)
return nil;
gp = p->runq[h%nelem(p->runq)];
if(runtime·cas(&p->runqhead, h, h+1)) // cas-release, commits consume
return gp;
}
}
findrunnable阻塞等待可运行的G
- 检查本地运行队列
- 检查全局运行队列
- 以non-blocking的模式poll network
- 检查其它P的本地运行队列
如果最后依旧无法在系统内获取到G,那么就以blocking的模式poll network
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
static G
findrunnable(void)
{
G gp;
P *p;
int32 i;top:
if(runtime·sched.gcwaiting) {
gcstopm();
goto top;
}
if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)
runtime·ready(gp);
// local runq
gp = runqget(g->m->p);
if(gp)
return gp;
// global runq
if(runtime·sched.runqsize) {
runtime·lock(&runtime·sched.lock);
gp = globrunqget(g->m->p, 0);
runtime·unlock(&runtime·sched.lock);
if(gp)
return gp;
}
// poll network
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp->schedlink);
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if(!g->m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle)) // TODO: fast atomic
goto stop;
if(!g->m->spinning) {
g->m->spinning = true;
runtime·xadd(&runtime·sched.nmspinning, 1);
}
// random steal from other P's
for(i = 0; i < 2*runtime·gomaxprocs; i++) {
if(runtime·sched.gcwaiting)
goto top;
p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];
if(p == g->m->p)
gp = runqget(p);
else
gp = runqsteal(g->m->p, p);
if(gp)
return gp;
}
stop:
// return P and block
runtime·lock(&runtime·sched.lock);
if(runtime·sched.gcwaiting) {
runtime·unlock(&runtime·sched.lock);
goto top;
}
if(runtime·sched.runqsize) {
gp = globrunqget(g->m->p, 0);
runtime·unlock(&runtime·sched.lock);
return gp;
}
p = releasep();
pidleput(p);
runtime·unlock(&runtime·sched.lock);
if(g->m->spinning) {
g->m->spinning = false;
runtime·xadd(&runtime·sched.nmspinning, -1);
}
// check all runqueues once again
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p && p->runqhead != p->runqtail) {
runtime·lock(&runtime·sched.lock);
p = pidleget();
runtime·unlock(&runtime·sched.lock);
if(p) {
acquirep(p);
goto top;
}
break;
}
}
// poll network
if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
if(g->m->p)
runtime·throw("findrunnable: netpoll with p");
if(g->m->spinning)
runtime·throw("findrunnable: netpoll with spinning");
gp = runtime·netpoll(true); // block until new work is available
runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
if(gp) {
runtime·lock(&runtime·sched.lock);
p = pidleget();
runtime·unlock(&runtime·sched.lock);
if(p) {
acquirep(p);
injectglist(gp->schedlink);
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
injectglist(gp);
}
}
stopm();
goto top;
}
有疑问加站长微信联系(非本文作者)