Goroutine的调度

不争_900c · · 851 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

G状态转换图

G状态转换

声明

下面的分析均基于Golang1.14版本。

Gosched--主动让出

在用户代码中执行runtime.Gosched()时执行以下代码。

// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so executioexecutionn resumes automatically.
func Gosched() {
    checkTimeouts() // 什么都没做
    mcall(gosched_m) // 通过mcall来调用 在其它调度文章中提过 切换到g0栈执行该函数且永远不会返回
}

func checkTimeouts() {}

// Gosched continuation on g0.
func gosched_m(gp *g) {
    if trace.enabled { // 打印跟踪日志
        traceGoSched()
    }
    goschedImpl(gp)
}

func goschedImpl(gp *g) { // gp为调用mcall的g
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
        dumpgstatus(gp)
        throw("bad g status")
    }
    casgstatus(gp, _Grunning, _Grunnable) // 切换为runnable状态
    dropg() // m g的解绑
    lock(&sched.lock)
    globrunqput(gp) // 因为sched是全局的调度数据 所以加锁访问
    unlock(&sched.lock)

    schedule() // 调度,寻找下一个可用的g来执行
}

func dropg() {
    _g_ := getg()  // 此时_g_理应为g0

    setMNoWB(&_g_.m.curg.m, nil) // 将当前g指向的m解引用 设置为nil
    setGNoWB(&_g_.m.curg, nil) // 将当前的m指向的g解引用 设置为nil
}

执行完Gosched函数后,G将放入全局的可运行的G的队列中,在schedule中将会被再度执行。

park_m--陷入等待

当用户代码执行channel读写导致阻塞时,最终都是调用park_m将G设置为waitting状态(time.Sleep()底层也是调用channel的读写,后面有时间再分析golang timer的实现)。

// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// Reason explains why the goroutine has been parked.
// It is displayed in stack traces and heap dumps.
// Reasons should be unique and descriptive.
// Do not re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem() // 给m加锁
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    // 记录g休眠的原因和上下文
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp) // 释放m的锁
    // can't do anything that might move the G between Ms here.
    mcall(park_m) // 切换到g0执行park_m
}

// park continuation on g0.
func park_m(gp *g) {
    _g_ := getg()

    if trace.enabled { // 打印调度信息
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    // 状态切为等待状态
    casgstatus(gp, _Grunning, _Gwaiting)
    dropg() // m g的解绑

    // 进入等待状态前执行前置函数 
    if fn := _g_.m.waitunlockf; fn != nil {
        ok := fn(gp, _g_.m.waitlock) // 执行上层设置的进入wait状态前的函数
        _g_.m.waitunlockf = nil // 执行完成后该函数设置为0
        _g_.m.waitlock = nil // 释放锁
        if !ok { // 如果执行失败 则不让g陷入等待状态 继续执行该G
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

执行完park_m后,G进入waiting状态,此时G不在任何一个可运行的队列里,需要通过ready函数进入runnable状态后才会放入可运行队列。

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    if trace.enabled {
        traceGoUnpark(gp, traceskip)
    }

    status := readgstatus(gp)

    // Mark runnable.
    _g_ := getg()
    mp := acquirem() // disable preemption because it can be holding p in a local var  给当前的m加锁
    if status&^_Gscan != _Gwaiting { // ready处理的g必然是waiting状态
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next) // 将g放入可运行队列中
    // 如果有空闲的p 并且没有正在spinning状态的m 则唤醒一个p
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
        wakep()
    }
    releasem(mp) //释放当前m的锁
}

ready函数并没有通过mcall执行,而是通过systemstack进行执行,在之前我们已经知道systemstack执行完这部分代码后会切回原来的g继续执行。为什么呢?思考什么时候会调用goready,通常是原来的G被阻塞,比如channel读写,以读为例,当G1的channel阻塞在读时,另一个G2往channel里写数据会触发goready,将G1设置为可运行,此时G2理应继续执行自己的代码,G1进入可运行队列后,自然会被其它M调度进入运行状态。

Gpreempt--信号式抢占

1.14是通过信号来触发抢占调度,信号注册代码和信号处理函数如下。

// Initialize signals.  删减了大部分代码 由下面代码可知所有信号的处理函数都是sighandler.
func initsig(preinit bool) {
    for i := uint32(0); i < _NSIG; i++ {
        setsig(i, funcPC(sighandler))  // 
    }
}

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    c := &sigctxt{info, ctxt}
    if sig == sigPreempt {
        doSigPreempt(gp, c) // 可知当信号为抢占信号时 调用doSigPreempt函数
    }
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
    if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
        // Inject a call to asyncPreempt.
        ctxt.pushCall(funcPC(asyncPreempt))  // 将asyncPreempt函数指针赋给ctx,最终会执行 asyncPreempt。
    }
}

// asyncPreempt saves all user registers and calls asyncPreempt2.
// 最终执行的时asyncPreempt2 下面分别看preemptPark和gopreempt_m的实现
func asyncPreempt()
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {
        mcall(preemptPark) // 当正在GC时会调用该函数。
    } else {
        mcall(gopreempt_m) // 正常情况下会调用该函数
    }
    gp.asyncSafePoint = false
}

// 正常情况下调用gopreempt_m 最终调用goshcedImpl即上面Gosched调用的函数,会进入runnable状态,放入全局队列中
func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}

func preemptPark(gp *g) {
    if trace.enabled {
        traceGoPark(traceEvGoBlock, 0)
    }
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
        dumpgstatus(gp)
        throw("bad g status")
    }
    gp.waitreason = waitReasonPreempted
    casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
    dropg() // g m相互解绑
    casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 最终G为_Gpreempted状态。
    schedule()
}

抢占后,如果不在GC状态,则调用goschedImpl,进入runnable状态等待下一次执行。如果正在GC,G会进入_Gpreempted状态,此时会在完成GC后通过suspendG进入waiting,由于和GC关系比较紧密,因此暂时不分析,等内存管理,GC相关的了解后,再把坑填上。
一个问题:以上步骤是否缺少了什么?当一个正在运行的G被信号抢占时,应该把当前的上下文保存,当下次调度时,恢复上下文以继续执行。

entersyscall--系统调用

系统调用和cgo调用都会进入_GSyscall状态,最终调用的函数都是reentersyscall,entersyscall主要是设置g和p的状态以及g,m的相互解绑。

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态
    if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
        systemstack(func() {
            print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
            throw("entersyscall")
        })
    }

    // 按需记录调用信息
    if trace.enabled {
        systemstack(traceGoSysCall)
        // systemstack itself clobbers g.sched.{pc,sp} and we might
        // need them later when the G is genuinely blocked in a
        // syscall
        save(pc, sp)
    }

    if atomic.Load(&sched.sysmonwait) != 0 { // 如果系统监控线程在等待 则执行下面的函数 具体不深究
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }

    if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不为0 则执行 具体不深究
        // runSafePointFn may stack split if run on this stack
        systemstack(runSafePointFn)
        save(pc, sp)
    }

    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    // m p 解绑
    pp := _g_.m.p.ptr()
    pp.m = 0
    _g_.m.oldp.set(pp)
    _g_.m.p = 0
    atomic.Store(&pp.status, _Psyscall) //切换P的状态
    if sched.gcwaiting != 0 { // 正在gc时 则调用以下代码 详细的不深究
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}

当系统调用结束后会调用exitsyscall,该函数的流程是,先判断是否是快速退出,即g和p未解绑,如果是则继续运行,否则一直等待直到有可运行的p和当前的m绑定。由此也可看出调度系统对系统调用和cgo的处理,如果快速返回,则继续运行,否则剥夺绑定的P,让P绑定其它的m调度运行避免P可运行队列中的G阻塞。

func exitsyscall() {
    _g_ := getg()

    _g_.m.locks++ // see comment in entersyscall
    if getcallersp() > _g_.syscallsp {
        throw("exitsyscall: syscall frame is no longer valid")
    }

    _g_.waitsince = 0
    oldp := _g_.m.oldp.ptr()
    _g_.m.oldp = 0
    // 满足继续运行的条件 则继续 running
    if exitsyscallfast(oldp) {
        if trace.enabled {
            if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick {
                systemstack(traceGoStart)
            }
        }
        // There's a cpu for us, so we can run.
        _g_.m.p.ptr().syscalltick++
        // We need to cas the status and scan before resuming...
        casgstatus(_g_, _Gsyscall, _Grunning)

        // Garbage collector isn't running (since we are),
        // so okay to clear syscallsp.
        _g_.syscallsp = 0
        _g_.m.locks--
        if _g_.preempt {
            // restore the preemption request in case we've cleared it in newstack
            _g_.stackguard0 = stackPreempt
        } else {
            // otherwise restore the real _StackGuard, we've spoiled it in entersyscall/entersyscallblock
            _g_.stackguard0 = _g_.stack.lo + _StackGuard
        }
        _g_.throwsplit = false

        // 如果g不能继续运行 则调度放弃运行
        if sched.disable.user && !schedEnabled(_g_) {
            // Scheduling of this goroutine is disabled.
            Gosched()
        }

        return
    }

    _g_.sysexitticks = 0
    if trace.enabled {
        // Wait till traceGoSysBlock event is emitted.
        // This ensures consistency of the trace (the goroutine is started after it is blocked).
        for oldp != nil && oldp.syscalltick == _g_.m.syscalltick {
            osyield()
        }
        // We can't trace syscall exit right now because we don't have a P.
        // Tracing code can invoke write barriers that cannot run without a P.
        // So instead we remember the syscall exit time and emit the event
        // in execute when we have a P.
        _g_.sysexitticks = cputicks()
    }

    _g_.m.locks--

    // 没有空闲的P可以运行
    // Call the scheduler.
    mcall(exitsyscall0)

    // Scheduler returned, so we're allowed to run now.
    // Delete the syscallsp information that we left for
    // the garbage collector during the system call.
    // Must wait until now because until gosched returns
    // we don't know for sure that the garbage collector
    // is not running.
    _g_.syscallsp = 0
    _g_.m.p.ptr().syscalltick++
    _g_.throwsplit = false
}

func exitsyscallfast(oldp *p) bool {
    _g_ := getg()

    // Freezetheworld sets stopwait but does not retake P's.
    if sched.stopwait == freezeStopWait {
        return false
    }

    // Try to re-acquire the last P. sysmon线程未剥夺P
    if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
        // There's a cpu for us, so we can run.
        // 切换p的状态 并且重新绑定m p
        wirep(oldp)
        exitsyscallfast_reacquired()
        return true
    }

    // Try to get any other idle P. 有空闲的P
    if sched.pidle != 0 {
        var ok bool
        systemstack(func() {
            ok = exitsyscallfast_pidle() //获取空闲的P 并且绑定m p
            if ok && trace.enabled {
                if oldp != nil {
                    // Wait till traceGoSysBlock event is emitted.
                    // This ensures consistency of the trace (the goroutine is started after it is blocked).
                    for oldp.syscalltick == _g_.m.syscalltick {
                        osyield()
                    }
                }
                traceGoSysExit(0)
            }
        })
        if ok {
            return true
        }
    }
    return false
}

func exitsyscall0(gp *g) {
    _g_ := getg()

    // 将g和m解绑 g变为可运行状态
    casgstatus(gp, _Gsyscall, _Grunnable)
    dropg()
    lock(&sched.lock)

    // 尝试获取p
    var _p_ *p
    if schedEnabled(_g_) {
        _p_ = pidleget()
    }
    if _p_ == nil {
        globrunqput(gp) // 将当前g放入 global 可运行队列
    } else if atomic.Load(&sched.sysmonwait) != 0 {
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)

    // 如果获取p成功则继续运行
    if _p_ != nil {
        acquirep(_p_)
        execute(gp, false) // Never returns.
    }
    // 如果lockedg和m强绑定 停止并等待 直到获取p可继续运行
    if _g_.m.lockedg != 0 {
        // Wait until another thread schedules gp and so m again.
        stoplockedm()
        execute(gp, false) // Never returns.
    }

    // 停止m并一直等待到有可运行的p
    stopm()
    schedule() // Never returns.
}

newstack--栈扩容

newstack的代码主要是对栈进行扩容,涉及内存管理。大致流程是先将G的状态设置为_Gcopystack,然后对栈进行扩容,扩容完成后,将G设置为_Grunning状态,代码不深究。

schedule--寻找下一个可运行的g

shedule先从本地队列中找可运行的g,找不到则从全局的队列中找,再找不到则尝试从其它p中寻找,如果始终找不到则stopm直到被唤醒。

func schedule() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }

    if _g_.m.lockedg != 0 { // 如果有g绑定在m上 则直接运行该g
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {
        throw("schedule: in cgo")
    }

top:
    pp := _g_.m.p.ptr()
    pp.preempt = false

    if sched.gcwaiting != 0 { // 如果正在gc 则等待gc完成后继续执行
        gcstopm()
        goto top
    }
    if pp.runSafePointFn != 0 { // 如果m有为runSafePointFn 则限制性该函数
        runSafePointFn()
    }

    // Sanity check: if we are spinning, the run queue should be empty.
    // Check this before calling checkTimers, as that might call
    // goready to put a ready goroutine on the local run queue.
    if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
        throw("schedule: spinning with local work")
    }

    checkTimers(pp, 0) // timer相关 暂时不深究

    var gp *g
    var inheritTime bool

    // Normal goroutines will check for need to wakeP in ready,
    // but GCworkers and tracereaders will not, so the check must
    // be done here instead.
    tryWakeP := false
    if trace.enabled || trace.shutdown { // 打印调试信息
        gp = traceReader()
        if gp != nil {
            casgstatus(gp, _Gwaiting, _Grunnable)
            traceGoUnpark(gp, 0)
            tryWakeP = true
        }
    }
    if gp == nil && gcBlackenEnabled != 0 { // gc相关 不深究
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        tryWakeP = tryWakeP || gp != nil
    }
    if gp == nil { // 每61次schedtick调度 尝试从全局的可执行队列里取一个g
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr()) // 从p的本地队列中拿
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }
    if gp == nil {
        // 尝试从其它p中偷部分g来运行 如果偷取失败则stopm直到被唤醒
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {  // spinning 表示正在找可运行的g 此时已准备运行 不再spinning
        resetspinning()
    }

    if sched.disable.user && !schedEnabled(gp) { // 如果g不能被调度 则放弃执行该g
        // Scheduling of this goroutine is disabled. Put it on
        // the list of pending runnable goroutines for when we
        // re-enable user scheduling and look again.
        lock(&sched.lock)
        if schedEnabled(gp) {
            // Something re-enabled scheduling while we
            // were acquiring the lock.
            unlock(&sched.lock)
        } else {
            sched.disable.runnable.pushBack(gp)
            sched.disable.n++
            unlock(&sched.lock)
            goto top
        }
    }

    // If about to schedule a not-normal goroutine (a GCworker or tracereader),
    // wake a P if there is one.
    if tryWakeP {
        if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
            wakep()
        }
    }
    if gp.lockedm != 0 { // 如果g绑定了可运行的m 则放弃执行该g
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        startlockedm(gp)
        goto top
    }

    execute(gp, inheritTime)
}

// execute中主要是切换g的状态 g m的互相绑定
func execute(gp *g, inheritTime bool) {
    _g_ := getg()

    // Assign gp.m before entering _Grunning so running Gs have an
    // M.
    _g_.m.curg = gp
    gp.m = _g_.m
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }

    // Check whether the profiler needs to be turned on or off.
    hz := sched.profilehz
    if _g_.m.profilehz != hz {
        setThreadCPUProfiler(hz)
    }

    if trace.enabled {
        // GoSysExit has to happen when we have a P, but before GoStart.
        // So we emit it here.
        if gp.syscallsp != 0 && gp.sysblocktraced {
            traceGoSysExit(gp.sysexitticks)
        }
        traceGoStart()
    }

    gogo(&gp.sched)
}

// gogo由汇编实现 主要是切换上下文 然后跳转到g的pc进行执行 参考asm_arm64.s文件。
TEXT runtime·gogo(SB), NOSPLIT, $24-8
    MOVD    buf+0(FP), R5 // R5 = g.sched
    MOVD    gobuf_g(R5), g // g = g.shced.g
    BL  runtime·save_g(SB) // 将当前的g写入线程缓存

    MOVD    0(g), R4    // make sure g is not nil
    MOVD    gobuf_sp(R5), R0 // R0 = g.sched.sp
    MOVD    R0, RSP // RSP = R0 = g.sched.sp
    MOVD    gobuf_bp(R5), R29 // R29 = g.sched.bp
    MOVD    gobuf_lr(R5), LR  // LR = g.sched.lr
    MOVD    gobuf_ret(R5), R0 // R0 = g.sched.ret
    MOVD    gobuf_ctxt(R5), R26 // R26 = r.sched.ctxt
    // 将g之前的上下文清0
    MOVD    $0, gobuf_sp(R5)
    MOVD    $0, gobuf_bp(R5)
    MOVD    $0, gobuf_ret(R5)
    MOVD    $0, gobuf_lr(R5)
    MOVD    $0, gobuf_ctxt(R5)
    CMP ZR, ZR // set condition codes for == test, needed by stack split
    MOVD    gobuf_pc(R5), R6 // R6 = g.sched.pc
    B   (R6)

未填的坑:
1.schedule中几个重要的函数未展开。
2.save_g的实现未展开。

一个循环

来源于阿波张的博客

G的状态转换

2张图结合来看,Gosched,park_m,asyncPreempt,exitsyscall这些函数都是通过mcall来调用执行,且执行到最后都会调用schedule进入下一个循环。
newstack不会陷入调度,会继续执行,existsyscall_fast也会继续执行。

总结

1.有坑还没填 要继续填。
2.G可以看作带pc,sp等上下文和自己独有栈的消息队列。
3.待续


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:不争_900c

查看原文:Goroutine的调度

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

851 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传