深入分析go调度(四)

lucasgao · · 848 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
    _p_ := allp[i]
    if _p_ == nil {
        // This can happen if procresize has grown
        // allp but not yet created new Ps.
        continue
    }
    pd := &_p_.sysmontick // last tick observed by sysmon
    s := _p_.status
    sysretake := false
    if s == _Prunning || s == _Psyscall {
        // Preempt G if it's running for too long.
        t := int64(_p_.schedtick) // schedtick 调度循环次数,每次调度循环完成+1
        if int64(pd.schedtick) != t { // 如果已经切换,那就更新上次调度的数
            pd.schedtick = uint32(t)
            pd.schedwhen = now
        } else if pd.schedwhen+forcePreemptNS <= now { // 如果超过规定时间
            // 抢占调度
            preemptone(_p_)
            // In case of syscall, preemptone() doesn't
            // work, because there is no M wired to P.
            sysretake = true
        }
    }
    if s == _Psyscall {
    .... // 系统调用抢占代码
}
}
unlock(&allpLock)
return uint32(n)

}


由上可见,如果当前G运行时间过长(通过p上的pd判断),那么就发起抢占 `preemptone(_p_)`

而神奇的preemptone宛如一个彬彬有礼的绅士,没坐任何暴力行为

```go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    // 设置抢占调度位 为true 
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    gp.stackguard0 = stackPreempt // 设置stackguard0为一个特别大的数

    ... // 信号抢占,暂时忽略

    return true
}

到目前为止,负责抢占调度的retake已经完事收工,但是我们没有看到任何暴力行为,只是告诉g你需要让位了。(ps.这也就是我们称为协作式调度的原因)

响应抢占

这个是由newstack完成的

具体调度流morestack_noctxt()->morestack()->newstack() ,

// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the GC is trying to stop this g then it will set preemptscan to true.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
    thisg := getg() // thisg = g0
    ......
    // 这行代码获取g0.m.curg,也就是需要扩栈或响应抢占的goroutine
    // 对于我们这个例子gp = main goroutine
    gp := thisg.m.curg
    ......
    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below.
    //检查g.stackguard0是否被设置为stackPreempt
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    // Be conservative about where we preempt.
    // We are interested in preempting user Go code, not runtime code.
    // If we're holding locks, mallocing, or preemption is disabled, don't
    // preempt.
    // This check is very early in newstack so that even the status change
    // from Grunning to Gwaiting and back doesn't happen in this case.
    // That status change by itself can be viewed as a small preemption,
    // because the GC might change Gwaiting to Gscanwaiting, and then
    // this goroutine has to wait for the GC to finish before continuing.
    // If the GC is in some way dependent on this goroutine (for example,
    // it needs a lock held by the goroutine), that small preemption turns
    // into a real deadlock.
    if preempt {
        //检查被抢占goroutine的状态
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" ||  thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            //还原stackguard0为正常值,表示我们已经处理过抢占请求了
            gp.stackguard0 = gp.stack.lo + _StackGuard
           
            //不抢占,调用gogo继续运行当前这个g,不需要调用schedule函数去挑选另一个goroutine
            gogo(&gp.sched) // never return
        }
    }

    //省略的代码做了些其它检查所以这里才有两个同样的判断

    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }
        ......
        //下面开始响应抢占请求
        // Act like goroutine called runtime.Gosched.
        //设置gp的状态,省略的代码在处理gc时把gp的状态修改成了_Gwaiting
        casgstatus(gp, _Gwaiting, _Grunning)
       
        //调用gopreempt_m把gp切换出去
        gopreempt_m(gp) // never return
    }
    ......
}


func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}


可见这里我们判断如果需要抢占,那么就切换状态为调用 goschedimpl完成调度。

响应抢占的细节之栈扩容

我们上面看到,响应抢占是在检查栈的时候才判断调度,那么我们就会有个疑问,什么时候检查栈

go设计:在函数调用的时候会插入栈扩容的检查代码

以以下代码为例

package main

import "fmt"

func sum(a, b int) int {
    a2 := a * a
    b2 := b * b
    c := a2 + b2

    fmt.Println(c)

    return c
}

func main() {
    sum(1, 2)
}

运行以下命令进入gdb,结果如下

go build -o q3 q3.go
gdb q3
b q3.go:15
r
disass

<img src="/Users/gaoke/Library/Application Support/typora-user-images/image-20200607190207907.png" alt="image-20200607190207907" style="zoom:90%;" />

可见函数尾部对morestack_noctxt进行了调用,而跳转到该函数的逻辑是

=> 0x0000000000490eb0 <+0>: mov    %fs:0xfffffffffffffff8,%rcx #main函数第一条指令,rcx = g
   0x0000000000490eb9 <+9>: cmp    0x10(%rcx),%rsp  #0x10(%rcx) 为 g.stackguard0
   0x0000000000490ebd <+13>:    jbe    0x490eed <main.main+61>

我们知道g结构便宜16个字节为stackguard0。那么第二行的意思就是在比较栈顶寄存器rsp的值是否比stackguard0的值小,如果rsp的值更小,说明当前g的栈要用完了,有溢出风险,需要扩栈。

回忆下栈结构

<img src="http://picgo.vipkk.work/20200607190850.png" alt="image-20200607190850026" style="zoom:50%;" />

其中正常情况下 stackguard0 = lo+stackguard,而lo有所栈底,所以这里如果rsp比 stackguard0小的话,那就需要进行扩容。在设置了抢占调度的时候,那么stackguard0 是一个特别大的值,rsp必小,所以就会进行调用morestack_noctxt。

当执行完之后,那么就继续调用main恢复执行!!!

那么在协作式调度的时候,如果一个函数里没有函数调用,那么它也就永远不会被抢占,因为我们上面分析了,morestack_noctxt是插在函数序言里的,如果没有的话,那就永远不会检查抢占了。

如果没有函数调用,那就永远阻塞了

对系统调用过长发起抢占

回到retake

// 抢占调度检测
func retake(now int64) uint32 {
        ...
  
 if s == _Psyscall { // 如果是系统调用的话
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                /**
                进入条件
                1. 当前为空
                2. 有空闲的p (有人工作 抢占调度没必要)
                3. 未超过10ms
                 */
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_) // 寻找下一个新的m来让p操作
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    
  ...
}

根据retake函数的代码,只要满足下面三个条件中的任意一个就需要对处于_Psyscall 状态的p进行抢占:

  1. p的运行队列里面有等待运行的goroutine。这用来保证当前p的本地运行队列中的goroutine得到及时的调度,因为该p对应的工作线程正处于系统调用之中,无法调度队列中goroutine,所以需要寻找另外一个工作线程来接管这个p从而达到调度这些goroutine的目的;
  2. 没有空闲的p。表示其它所有的p都已经与工作线程绑定且正忙于执行go代码,这说明系统比较繁忙,所以需要抢占当前正处于系统调用之中而实际上系统调用并不需要的这个p并把它分配给其它工作线程去调度其它goroutine。
  3. 从上一次监控线程观察到p对应的m处于系统调用之中到现在已经超过10了毫秒。这表示只要系统调用超时,就对其抢占,而不管是否真的有goroutine需要调度,这样保证sysmon线程不至于觉得无事可做(sysmon线程会判断retake函数的返回值,如果为0,表示retake并未做任何抢占,所以会觉得没啥事情做)而休眠太长时间最终会降低sysmon监控的实时性。

综上,系统调用因为深陷在os中,go runtime鞭长莫及,所以在设置了抢占位之后,p还做了另外一件事,就是寻找下一个新的m来和p合作,脱离之前的m。

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    // 如果当前p运行队列不为空,那么它还有任务要做,所以启动一个m来工作
    if !runqempty(_p_) || sched.runqsize != 0 {
        startm(_p_, false)
        return
    }
    // if it has GC work, start it straight away
    // 如果当前正在进行gc,也启动(TODO 不解)
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 这里的意思是说,如果其他人(m or p)都在干活,证明现在比较忙。那么这个p也就不能歇着。
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock) // 接下来的操作涉及全局
    if sched.gcwaiting != 0 {
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    // 全局队列不为空
    if sched.runqsize != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    // 也不能都歇着,需要人等网络,其他人不干 那就我把。
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    if when := nobarrierWakeTime(_p_); when != 0 {
        wakeNetPoller(when)
    }
    pidleput(_p_)
    unlock(&sched.lock)
}

handoffp函数流程比较简单,它的主要任务是通过各种条件判断是否需要启动工作线程来接管p,如果不需要则把p放入P的全局空闲队列。

从handoffp的代码可以看出,在如下几种情况下则需要调用我们已经分析过的startm函数启动新的工作线程出来接管p

  1. p的本地运行队列或全局运行队列里面有待运行的goroutine;
  2. 需要帮助gc完成标记工作;
  3. 系统比较忙,所有其它p都在运行goroutine,需要帮忙;
  4. 所有其它P都已经处于空闲状态,如果需要监控网络连接读写事件,则需要启动新的m来poll网络连接。

到此,sysmon监控线程对处于系统调用之中的p的抢占就已经完成。

系统调用

TODO

简单说

  1. 解绑
  2. 恢复的时候尝试重新绑定,绑定不上就去找新的。

系统调用汇编实现

// func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, err uintptr)
TEXT ·Syscall6(SB),NOSPLIT,$0-80
  // 预处理
    CALL    runtime·entersyscall(SB)
    // 系统调用
    MOVQ    a1+8(FP), DI
    MOVQ    a2+16(FP), SI
    MOVQ    a3+24(FP), DX
    MOVQ    a4+32(FP), R10
    MOVQ    a5+40(FP), R8
    MOVQ    a6+48(FP), R9
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL
    CMPQ    AX, $0xfffffffffffff001
    JLS ok6
    MOVQ    $-1, r1+56(FP)
    MOVQ    $0, r2+64(FP)
    NEGQ    AX
    MOVQ    AX, err+72(FP)
    // 退出处理
    CALL    runtime·exitsyscall(SB)
    RET
ok6:
    MOVQ    AX, r1+56(FP)
    MOVQ    DX, r2+64(FP)
    MOVQ    $0, err+72(FP)
    // 退出处理
    CALL    runtime·exitsyscall(SB)
    RET

系统调用做了三件事

  1. 预处理: entersyscall
  2. 处理 syscall
  3. 退出处理: exitsyscall

entersyscall

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    // 需要禁止g被抢占
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    // entersyscall中不能调用任何导致栈增长或者分裂的函数
    _g_.stackguard0 = stackPreempt
    // 设置 throwsplit,在newstack中,如果发现throwsplit是true,挥之即crash
    /**
    if thisg.m.curg.throwsplit{
        throw("runtime: stack split at bad time")
    }
    */
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    // 保存现场,在syscall之后 依据这些数据恢复现场
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
  // 修改g的状态
    casgstatus(_g_, _Grunning, _Gsyscall)
    
  ...
    
  // 解绑处理
    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    _g_.m.mcache = nil
    pp := _g_.m.p.ptr()
    pp.m = 0
    _g_.m.oldp.set(pp) // 优先绑定
    _g_.m.p = 0
    // 修改p的状态
    atomic.Store(&pp.status, _Psyscall)
    if sched.gcwaiting != 0 {
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}

exitsyscall

源码先不补充,这块还需要再复习

大概工作如下

  1. 尝试绑定之前的p
  2. 失败之后,从全局找一个p来执行
  3. 如果找不到,那就把g放到全局队列,自身stop

信号抢占

go在1.14中引入了信号抢占,这里也分析下

发起抢占

func preemptone(_p_ *p) bool {
  ...
  
  // Request an async preemption of this P.
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        // 抢占m,信号调度
        preemptM(mp)
    }
  
  ...
}


func preemptM(mp *m) {
    signalM(mp, sigPreempt)
}

// signalM sends a signal to mp.
func signalM(mp *m, sig int) {
    tgkill(getpid(), int(mp.procid), sig)
}

go在设置抢占标志位的时候,同时发送了一个抢占信号。

信号接收

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    if sig == sigPreempt { // 如果是抢占信息 ,go1.14加入
        // Might be a preemption signal.
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we
        // still let it through to the application.
    }
}

信号处理

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
        // Inject a call to asyncPreempt.
        // 执行抢占的关键方法
        ctxt.pushCall(funcPC(asyncPreempt))
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
}
        

pushcall 直接修改了g的pc,从而恢复的时候执行的地方不一样。

func asyncPreempt() // 编译器实现

//go:nosplit
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    // 真正的抢占
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    gp.asyncSafePoint = false
}

https://changkun.de/golang/zh-cn/part2runtime/ch06sched/preemption/

  1. https://go-review.googlesource.com/c/go/+/201762

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:lucasgao

查看原文:深入分析go调度(四)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

848 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传