golang 源码学习之GMP (goroutine)

ihornet · · 1922 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

源码

  • 版本
    • 1.14.1
  • 相关目录
    • runtime/asm_amd64.s
    • runtime/proc.go
    • runtime/runtime2.go

关键概念

  • G - 我们代码写的go func(){ }
  • M - 内核线程
  • P - M调度G的上下文, P中存储了很多G,M通过调用P来获取并执行G。为了方便,下文中称它为==局部调度器==
  • schedt - 全局调度器,主要存储了一些空闲的G、M、P
G、M、P、schedt之间的关系
graph TB;
  subgraph schedt
    A(空闲G集合)
    B(runable G集合)
    C(空闲P集合)
    D(空闲M集合)
  end
graph TB;
 subgraph M
    A(running G)
  end
 subgraph P
    B(空闲G集合)
    C(runable G集合)
  end
  A -- M执行完G,放入空闲集合 --> B
  A -- M从P获取可运行G --> C
  
  D[M] -.-> E[P]
  D2[M] -.-> E2[P]
  D3[M] -.-> E3[P]
GMP.png

关键数据结构

G

/// runtime/runtime2.go 关键字段
type g struct {
    stack       stack   // g自己的栈

    m            *m      // 执行当前g的m
    sched        gobuf   // 保存了g的现场,goroutine切换时通过它来恢复
    atomicstatus uint32  // g的状态Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead
    goid         int64
    schedlink    guintptr // 下一个g, g链表

    preempt       bool //抢占标记

    lockedm        muintptr // 锁定的M,g中断恢复指定M执行
    gopc           uintptr  // 创建该goroutine的指令地址
    startpc        uintptr  // goroutine 函数的指令地址
}

这里先介绍下G的各个状态、

  • Gidle 被创建但没初始换
  • Grunnable 可运行
  • Grunning 正在运行
  • Gsyscall 正在系统调用
  • Gwaiting 正在等待
  • Gdead 运行完成

M

/// runtime/runtime2.go 关键字段
type m struct {
    g0      *g     // g0, 每个M都有自己独有的g0

    curg          *g       // 当前正在运行的g
    p             puintptr // 当前用于的p
    nextp         puintptr // 当m被唤醒时,首先拥有这个p
    id            int64
    spinning      bool // 是否处于自旋

    park          note
    alllink       *m // on allm
    schedlink     muintptr // 下一个m, m链表
    mcache        *mcache  // 内存分配
    lockedg       guintptr // 和 G 的lockedm对应
    freelink      *m // on sched.freem

}

P

/// runtime/runtime2.go 关键字段
type p struct {
    id          int32
    status      uint32 // 状态
    link        puintptr // 下一个P, P链表
    m           muintptr // 拥有这个P的M
    mcache      *mcache  

    // P本地runnable状态的G队列
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    
    runnext guintptr // 一个比runq优先级更高的runnable G

    // 状态为dead的G链表,在获取G时会从这里面获取
    gFree struct {
        gList
        n int32
    }

    gcBgMarkWorker       guintptr // (atomic)
    gcw gcWork

}

这里先介绍下P的各个状态

  • Pidle:没有关联的M
  • Prunning:已和某个M关联
  • Psyscall:当前P中的被运行的那个G正在进行系统调用
  • Pgcstop: 系统正在GC
  • Pdead: 当前P不再使用

schedt

/// runtime/runtime2.go 关键字段
type schedt struct {

    lock mutex

    midle        muintptr // 空闲M链表
    nmidle       int32    // 空闲M数量
    nmidlelocked int32    // 被锁住的M的数量
    mnext        int64    // 已创建M的数量,以及下一个M ID
    maxmcount    int32    // 允许创建最大的M数量
    nmsys        int32    // 不计入死锁的M数量
    nmfreed      int64    // 累计释放M的数量

    pidle      puintptr // 空闲的P链表
    npidle     uint32   // 空闲的P数量

    runq     gQueue // 全局runnable的G队列
    runqsize int32  // 全局runnable的G数量

    // Global cache of dead G's.
    gFree struct {
        lock    mutex
        stack   gList // Gs with stacks
        noStack gList // Gs without stacks
        n       int32
    }

    // freem is the list of m's waiting to be freed when their
    // m.exited is set. Linked through m.freelink.
    freem *m
}

启动

    /// runtime/asm_amd64.s
    
    get_tls(BX)
    LEAQ    runtime·g0(SB), CX
    MOVQ    CX, g(BX)
    LEAQ    runtime·m0(SB), AX

    // save m->g0 = g0
    MOVQ    CX, m_g0(AX)
    // save m0 to g0->m
    MOVQ    AX, g_m(CX)

    CLD             // convention is D is always left cleared
    CALL    runtime·check(SB)

    MOVL    16(SP), AX      // copy argc
    MOVL    AX, 0(SP)
    MOVQ    24(SP), AX      // copy argv
    MOVQ    AX, 8(SP)
    CALL    runtime·args(SB) // 命令行初始化,获取参数
    CALL    runtime·osinit(SB) // OS初始化
    CALL    runtime·schedinit(SB)

    // create a new goroutine to start program
    MOVQ    $runtime·mainPC(SB), AX     // entry 对应runtime·main
    PUSHQ   AX
    PUSHQ   $0          // arg size
    CALL    runtime·newproc(SB)
    POPQ    AX
    POPQ    AX

    // start this M
    CALL    runtime·mstart(SB)

    CALL    runtime·abort(SB)   // mstart should never return
    
    ....
    
    DATA    runtime·mainPC+0(SB)/8,$runtime·main(SB)

汇编语言我也看不懂,看字面意思可知道启动流程:
1.创建g0
2.创建m0
3.m.g0 = g0
4.g0.m = m0
5.命令行初始化,OS初始化
6.schedinit 调度器初始化
7.newproc 将runtime.main作为参数创建goroutine
8.mstart
所以g0和m0是通过汇编指令创建的,并将m0赋值给g0.m

schedinit 调度器初始化
///runtime/proc.go
func schedinit() {

    // 获取g0
    _g_ := getg() 

    ...

    // 设置最大 M 数量
    sched.maxmcount = 10000
    
    ...
    
    // 栈和内存初始化
    stackinit()
    mallocinit()

    ...

    // 初始化当前 M
    mcommoninit(_g_.m)
    
    ...
    
    //参数和环境初始化
    goargs()
    goenvs()
    
    ...

    // 设置 P 的数量
    procs := ncpu
    // 通过环境变量设置P的数量
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    
    ...
}

调度器初始化的主要工作: 空间申请、M的最大数量设置、P的数量设置、初始化参数和环境

newproc 创建goroutine
func newproc(siz int32, fn *funcval) {

    // sys.PtrSize = 8, 表示跳过函数指针, 获取第一个参数的地址
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg() // 获取当前g
    pc := getcallerpc() // 获取下一条要执行的指令地址

    // 用 g0 的栈创建 G
    // systemstack 会切换当前的 g 到 g0, 并且使用g0的栈空间
    systemstack(func() {
        newproc1(fn, argp, siz, gp, pc)
    })
}

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {

    ...

    _p_ := _g_.m.p.ptr() // 获取P
    newg := gfget(_p_)  // 在P或者sched中获取空闲的G, 在这里也就是主goroutine
    if newg == nil {  // 获取失败就创建一个新的
        newg = malg(_StackMin)  
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }            
    ...
    
     // 将runtime.main地址存储在主goroutine的sched中
     gostartcallfn(&newg.sched, fn)    
    ...
    
    runqput(_p_, newg, true) // 将runnable的newg放入P中
    
    ...

}

这里先不分析具体实现,在启动阶段将runtime.main作为入参创建G,也就是创建一个G来运行runtime.main。

runtime.main

func main() {

    ...

    //  64位系统 栈的最大空间为 1G, 32为系统 为 250M
    if sys.PtrSize == 8 {
        maxstacksize = 1000000000
    } else {
        maxstacksize = 250000000
    }

    ...
    
    fn := main_main // 这就是我们代码main包的main函数
    fn() // 运行我们的main函数
    
    ...
    
}

这里就可以调用我们main.main了,所以上面的newproc是为了给我们的main.main创建一个主goroutine。
现在有了主goroutine,那怎么启动这个goroutine呢?-- mstart

mstart
func mstart() {

    ...
    
    mstart1()

    ...
}

func mstart1() {
    ...
    schedule()
}

func schedule() {
    _g_ := getg()

    ...

top:
    pp := _g_.m.p.ptr()
    
    ...

    var gp *g
    
    ...
    
    // 从sched或者P或获取G,启动阶段至此一个产生了两个G:g0和main的G。g0不会存在sched和P中,所以这里获取的是main的G
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }


    execute(gp, inheritTime)
}


func execute(gp *g, inheritTime bool) {
    ...
    
    // 在上面我们已经将runtime.main的地址存在gp.sched中。这里就调用runtime.main。
    gogo(&gp.sched)
}

mstart经过一系列的调用,最终通过gogo(&gp.sched)调用了runtime.main。在runtime.main中又调用了mian.main,至此启动结束。

启动小结
    graph TB;
      A["创建G0、M0, g0.m = m0"] --> C
      C[初始化命令行和OS]-->D
      D["schedinit:设置M最大数量、P个数、栈和内存初始化"] --> E
      E["newproc:为main.main创建一个主goroutine"] --> F
      F["mstart:运行主goroutine --> 运行main.main"]
      
启动.png

==下面开始分析G、M、P==

G 创建
func newproc(siz int32, fn *funcval) {

    // sys.PtrSize = 8, 表示跳过函数指针, 获取第一个参数的地址
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg() // 获取当前g
    pc := getcallerpc() // 获取下一条要执行的指令地址

    // 用 g0 的栈创建 G
    // systemstack 会切换当前的 g 到 g0, 并且使用g0的栈空间
    systemstack(func() {
        newproc1(fn, argp, siz, gp, pc)
    })
}

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
    _g_ := getg() //获取当前g,也就是g0。因为上面的systemstack会切换到g0

    if fn == nil { // fn空报错
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    acquirem() // disable preemption because it can be holding p in a local var
    siz := narg
    siz = (siz + 7) &^ 7

    // 参数大小不能大于  2048 - 4 * 8 - 8   = 2000
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
        throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_) // 在P中获取G
    
    // 如果没获取到则创建一个新的G,并设置成dead状态,加入全局的allgs
    if newg == nil {
        newg = malg(_StackMin) 
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    if newg.stack.hi == 0 {
        throw("newproc1: newg missing stack")
    }

    if readgstatus(newg) != _Gdead {
        throw("newproc1: new g is not Gdead")
    }

    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
    sp := newg.stack.hi - totalSize  // 栈顶地址
    spArg := sp
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // 将参数压入G的栈
        memmove(unsafe.Pointer(spArg), argp, uintptr(narg))

        if writeBarrier.needed && !_g_.m.curg.gcscandone {
            f := findfunc(fn.fn)
            stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
            if stkmap.nbit > 0 {
                // We're in the prologue, so it's always stack map index 0.
                bv := stackmapdata(stkmap, 0)
                bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
            }
        }
    }

    // 清除G的运行现场,因为G有可能是从P中获取的,清除原有的数据
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    // 重新对现场复制
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 将pc指向goexit。这个很重要,G运行完时会执行它,实际将调用goexit1
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 其实在这里面真正pc指向的是fn, 而上面的goexit被用于sp,当RET的时候pop出goexit
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg, false) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    
    // 状态设为runnable
    casgstatus(newg, _Gdead, _Grunnable)

    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 设置id,
    newg.goid = int64(_p_.goidcache) 
    _p_.goidcache++
    if raceenabled {
        newg.racectx = racegostart(callerpc)
    }
    if trace.enabled {
        traceGoCreate(newg, newg.startpc)
    }
    
    // 加入P的runable数组
    runqput(_p_, newg, true)

    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
    releasem(_g_.m)
}

从P获取空闲的G,如果没有则生成新的G。把参数复制到G的栈,pc: fn, sp:goexit,把G设为runable,并加入P的runable数组

如何从P获取G: gfget(p)
func gfget(_p_ *p) *g {
retry:
    // 如果P的本地空闲链表为空&全局空闲链表不为空
    if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
        lock(&sched.gFree.lock)
        // 从全局移一批到本地空闲链表
        for _p_.gFree.n < 32 {
            // Prefer Gs with stacks.
            gp := sched.gFree.stack.pop()
            if gp == nil {
                gp = sched.gFree.noStack.pop()
                if gp == nil {
                    break
                }
            }
            sched.gFree.n--
            _p_.gFree.push(gp)
            _p_.gFree.n++
        }
        unlock(&sched.gFree.lock)
        goto retry
    }
    
    // 从本地空闲链表pop一个G
    gp := _p_.gFree.pop()
    if gp == nil {
        return nil
    }
    _p_.gFree.n--
    if gp.stack.lo == 0 {
        // Stack was deallocated in gfput. Allocate a new one.
        systemstack(func() {
            gp.stack = stackalloc(_FixedStack)
        })
        gp.stackguard0 = gp.stack.lo + _StackGuard
    } else {
        if raceenabled {
            racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
        }
        if msanenabled {
            msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
        }
    }
    return gp
}

从P的本地空闲链表获取G。如果本地空闲链表为空,则从全局空闲链表移一批到本地。

加入本地runabled列表,runqput(p, newg, true)
func runqput(_p_ *p, gp *g, next bool) {
    if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
    }

    if next {
    retryNext:
        // 把g设为runnext, 之前的runnext加入runable队列
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 本地runable队列未满,加入本地
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 本地runable队列已满,移一半到全局runable队列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // 移了一部分到全局,所以本地队列未满,再次尝试加入本地队列
    goto retry
}

把本地runable队列一半到全局 runqputslow
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    var batch [len(_p_.runq)/2 + 1]*g

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    // 如果head - tail 不等于 1/2,说明出问题了
    if n != uint32(len(_p_.runq)/2) {
        throw("runqputslow: queue is not full")
    }
    
    // 从head开始遍历 1/2 加入batch中
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    
    // 并没有从队列中删除,只是修改了head
    if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp

    if randomizeScheduler {
        // 重新排序
        for i := uint32(1); i <= n; i++ {
            j := fastrandn(i + 1)
            batch[i], batch[j] = batch[j], batch[i]
        }
    }

    // Link the goroutines.
    for i := uint32(0); i < n; i++ {
        // 各个G连在一起
        batch[i].schedlink.set(batch[i+1])
    }
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])

    // 全局队列可能其他P也会操作,所以加锁
    lock(&sched.lock)
    // 加入全局runable队列
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)
    return true
}

runable入队列:

  • 替换runnext
  • 本地队列未满:
    • 加入本地队列
  • 本地队列已满:
    • 本地队列移一半到全局队列
    • 再次尝试加入本地队列
G小结
graph TB;
    A["go func() {}  我们写的goroutine"] --> B
    B["newproc 获取func和参数"] -- "切换到g0,使用g0栈空间" --> C
    C[newproc1] --> D[gfget 从当前P获取空闲G]
    D --> E{"P空&全局不空"}
    E -- Y --> F[全局移32个到P本地]
    E -- N --> G
    F --> G[本地取出空闲G,初始化栈空间]
    G --> H{"获取空闲G成功"}
    H -- N --> I[创建G,初始化栈空间, 加入全局G数组]
    H -- Y --> J
    I --> J["参数复制到栈,清除堆,pc:func,sp:goexit1"]
    J --> K["状态设为runable,设置goid"]
    K -- runqput 加入当前P的runable队列 --> L[用g替换runnext]
    L --> M{ 本地runable队列满 }
    M -- Y --> N[本地runbale队列移一半到全局] 
    M -- N --> O[加入本地runable队列]
    N --> M
    O --> P{有空闲P&没有自旋的M }
    P -- Y --> Q["wakep()"]
G.png
现在有了G,那存放G的P又是怎么来的呢?procresize

在启动的时候有一个环节是schedinit

func schedinit() {

    ...
    // 修改P的个数
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
    
    ...
}

func procresize(nprocs int32) *p {
    old := gomaxprocs
    
    // P个数必须大于0
    if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
    if trace.enabled {
        traceGomaxprocs(nprocs)
    }

    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // 截断或扩容allp, allp全局存了所有的P
    if nprocs > int32(len(allp)) {
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // 创建新增的P
    for i := old; i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
        }
        // 初始化,状态设为Pgcstop
        pp.init(i)
        // 存入allp
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

    _g_ := getg()
    // 如果当前的P.id < nprocs,说明还在allp里面,继续运行
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else {
        if _g_.m.p != 0 {
            if trace.enabled {
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0
        }
        
        //将当前P和M取消关联
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 再将当前M和allp的第一个P关联,并设置为Prunning
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }

    // 释放多余的P
    for i := nprocs; i < old; i++ {
        p := allp[i]
        // 这里的释放工作不是直接删除回收p,而是主要把p中的可运行和空闲的G移到全局去
        p.destroy()
    }

    // 如果长度不相等,裁剪allp
    if int32(len(allp)) != nprocs {
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }

    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        // 当前P继续
        if _g_.m.p.ptr() == p {
            continue
        }
        
        // 将P设为Pidle,也就是不和M关联
        p.status = _Pidle
        
        // 如果P没有可运行的G, 那么将P加入全局空闲P链表
        if runqempty(p) {
            pidleput(p)
        } else {
            // 重新设置M
            p.m.set(mget())
            // 将allp的各个P连起来
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs 
    // 更新gomaxprocs, P个数
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}


func (pp *p) destroy() {
    // 将p的runable移到全局去
    for pp.runqhead != pp.runqtail {
        // Pop from tail of local queue
        pp.runqtail--
        gp := pp.runq[pp.runqtail%uint32(len(pp.runq))].ptr()
        // Push onto head of global queue
        globrunqputhead(gp)
    }
    
    // runnext移到全局
    if pp.runnext != 0 {
        globrunqputhead(pp.runnext.ptr())
        pp.runnext = 0
    }
    
    ...
    
    // If there's a background worker, make it runnable and put
    // it on the global queue so it can clean itself up.
    if gp := pp.gcBgMarkWorker.ptr(); gp != nil {
        casgstatus(gp, _Gwaiting, _Grunnable)
        if trace.enabled {
            traceGoUnpark(gp, 0)
        }
        globrunqput(gp)
        // This assignment doesn't race because the
        // world is stopped.
        pp.gcBgMarkWorker.set(nil)
    }
    
    ...
    
    // 释放p的内存
    freemcache(pp.mcache)
    pp.mcache = nil
    
    // P的gFree链表移到全局
    gfpurge(pp)
    
    ...
    
    pp.status = _Pdead
}


P小结

通过启动时候的schedinit调用procresize生成对应个数的P。因为可以通过runtime.GOMAXPROCS来动态修改P的个数,所以在procresize中会对P数组进行调整,或新增P或减少P。被减少的P会将自身的runable、runnext、gfee移到全局去。

  • 如果当前P不在多余的P中,则状态为running
  • 如果当前P在多余的P中,则将当前M和P解绑,再将M和P数组的第一P绑定,并设为running
  • 除了当前P外;所有P都设为idle,如果P中没有runnable,则将P加入全局空闲P,否则获取全局空闲M和P绑定。
M从何而来?

在创建G的时候会根据情况是否创新M


func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {

...
    // 全局存在空闲P且没有自旋的M
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }

...

}

func wakep() {
    // be conservative about spinning threads
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    if _p_ == nil {
        _p_ = pidleget()
        
        // M与P绑定,没有了P 当然也就不需要生成M了
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    
    // 全局获取空闲M
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        // 没有空闲M 就重新生成一个
        newm(fn, _p_)
        return
    }
    
    ...
    
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning
    mp.nextp.set(_p_) // nextP指向P
    notewakeup(&mp.park)  // 唤醒M
}

创建G时,如果全局存在空闲P且没有自旋的M,则获取M和空闲P绑定。

  • 如果全局存在空闲M:获取空闲M,绑定P,唤醒M
  • 如果全局不存在空闲M: 新生成M
func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn) // 新生成M, 并创建M的g0
    mp.nextp.set(_p_) // nextp指向p
    mp.sigmask = initSigmask
    
    ...
    
    newm1(mp)
}


func newm1(mp *m) {

    ...
    
    execLock.rlock() // Prevent process clone.
    newosproc(mp)
    execLock.runlock()
}


func newosproc(mp *m) {

    ...
    
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    
    // 创建线程和M绑定,最终会调用mstart
    err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
    sigprocmask(_SIG_SETMASK, &oset, nil)
    
    ...
}

创建M和P绑定,创建线程和M绑定,最终将调用mstart


func mstart() {
    ...
    
    mstart1()

    ...
    
}


func mstart1() {
    ...
    schedule()
    ...
}


func schedule() {

    ...
    
top:
    pp := _g_.m.p.ptr()
    pp.preempt = false
    
    ...
    
    // 下面就是在各个地方获取runable的G
    
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        tryWakeP = tryWakeP || gp != nil
    }
    if gp == nil {
    
        // 以一定频率从全局获取runable G。 平衡本地和全局
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        // 从本地获取runable G
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    if gp == nil {
        // 阻塞直到获取runable。从本地、全局、网络、其他P中获取。
        // 如果没有可取的runable,则M进入休眠,
        gp, inheritTime = findrunnable() 
    }

    if _g_.m.spinning {
        resetspinning()
    }

    ...

    execute(gp, inheritTime)
}


func execute(gp *g, inheritTime bool) {
    _g_ := getg()

    // g和m相互绑定,并设为running
    _g_.m.curg = gp
    gp.m = _g_.m
    casgstatus(gp, _Grunnable, _Grunning)
    ...
    
    // 运作g,gp.sched的pc是我们写的func, sp是goexit1。
    // 执行完func,RET的时候弹出的是goexit1。
    gogo(&gp.sched)
}


func goexit1() {
    ...
    // goexit1 调用的是goexit0
    mcall(goexit0)
}


func goexit0(gp *g) {
    _g_ := getg()

    // g执行完了状态改为dead
    casgstatus(gp, _Grunning, _Gdead)
    
    ...
    
    // 清除g的各种信息
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.preemptStop = false
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    ...
    
    // g 和 m相互解绑
    dropg()

    ...
    
    // 将g放入本地空闲链表。 如果本地空闲个数大于64个,则移一半到全局去
    gfput(_g_.m.p.ptr(), gp)
    
    ...
    
    // 这里又开始调用上面的schedule了。所以M是在不断获取G,执行G
    schedule()
}

M小结
graph TB;
A["有空闲P&没有自旋的M"] --> B["wakep()"]
B -- startm --> B2[全局获取空闲P] 
B2--> C[全局获取空闲M]
C --> D{获取成功}
D -- Y --> E[M和P绑定]
D -- N --> F["创建M"]
F --> E
E --> G[唤醒M]
G --> H[mstart / mstart1]
H --> I[schedule]
I --> J[在P本地或全局获取runbale G]
J --> K{获取成功}
K -- Y --> L
K -- N --> M["反复在本地、全局、网络、其他P中获取runable G"]
M --> N{获取成功}
N -- Y --> L["G和M相互绑定,G设为running"]
L --> O["汇编执行G的pc:func。执行完RET弹出sp:goexit1"]
O --> P[goexit1 / goexit0]
P --> Q["将G状态设为dead,清除G的各种信息,G和M相互解绑"]
Q --> R["G放入本地空闲链表。如果本地空闲个数大于64个,则移一半到全局去"]
R --> I
N -- N --> S["MP解绑,P加入全局空闲P,M加入全局空闲M"]
S --> T[M进入睡眠]
M.png
就这么完了? 不,还有一个独立的M

在启动阶段创建了一个M执行sysmon

func main() {

    ...

    if GOARCH != "wasm" { 
        systemstack(func() {
            newm(sysmon, nil)
        })
    }
    
    ...
    
}

func sysmon() {
    lock(&sched.lock)
    sched.nmsys++
    checkdead()
    unlock(&sched.lock)

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody
    delay := uint32(0)  // 间隔时间 20us ~ 10ms
    for {
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
        now := nanotime()
        next, _ := timeSleepUntil() //所有P中timer最先到时的时间
        
        // 正在STW或者所有P都处于空闲时,sysmon休眠一会
        if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
            lock(&sched.lock)
            if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
                if next > now {
                
                    atomic.Store(&sched.sysmonwait, 1)
                    
                    ...
                    
                    // sysmon休眠一会
                    notetsleep(&sched.sysmonnote, sleep)
                    
                    ...
                    
                    atomic.Store(&sched.sysmonwait, 0)
                    noteclear(&sched.sysmonnote)
                }
                idle = 0
                delay = 20
            }
            unlock(&sched.lock)
        }
        // trigger libc interceptors if needed
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil)
        }
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        
        // 如果超过10ms
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            list := netpoll(0) // 获取网络事件的Gs
            if !list.empty() {
                incidlelocked(-1)
                // 将这些G设为runable,加入全局runable
                // 如果存在空闲P,则调用startm运行他们
                injectglist(&list)
                incidlelocked(1)
            }
        }
        
        // 有timer到时了,启动M去执行
        if next < now {
            startm(nil, false)
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
        // 2分钟或GC标记堆达到一定大小,触发垃圾回收
        if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
            lock(&forcegc.lock)
            forcegc.idle = 0
            var list gList
            list.push(forcegc.g)
            injectglist(&list)
            unlock(&forcegc.lock)
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // G运行时间超过10ms,进行抢占。
            // 其实这只针对于running,因为syscall的P没有M
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                preemptone(_p_)
                sysretake = true
            }
        }
        if s == _Psyscall {
            ...
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            ...
            
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
            
                ...
                
                n++
                _p_.syscalltick++
                // 将P交由其他M
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}


sysmon小结
graph TB;
    A["启动阶段创建独立M"] -- sysmon --> B{"STW || 所有P空闲"}
    B --Y--> C[睡眠一会儿]
    B --N--> D{超过10ms}
    C --> D
    D --Y-->E["从netpoll获取G,加入全局runable。如有空闲P,startM运行他们"]
    D --N--> F["如有timer到时,startM运行"]
    E --> F
    F --retake--> G[循环所有P]
    
    G -.running:超过10ms.-> H[preempton抢占]
    
    H -.syscall:有work或小于10ms .-> I[handoffp将P交由其他M]
    
    I -.-> G
    
    I --> J["2分钟或GC标记堆达到一定大小,触发垃圾回收"]
    
    J -- "20us~10ms" --> B

sysmon.png

夜已深,实在写不动了!待修改,待续。。。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:ihornet

查看原文:golang 源码学习之GMP (goroutine)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1922 次点击  ∙  1 赞  
加入收藏 微博
1 回复  |  直到 2021-07-07 17:15:36
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传