Goroutine源码记录

nino's blog · · 1213 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇博客总结了Go调度器的设计以及go调度器解决如何解决了用户态线程典型问题,这一篇就跟踪下Goroutine的源码实现。Go1.5源码剖析 已经写的非常详细了,我只把我觉得重要的地方集中总结一下。

Go程序初始化过程

C程序的入口地址通常是C运行库的_start函数,它完成了初始化堆栈、命令行参数设置、环境变量初始化、IO初始化、线程相关初始化或者还有全局构造。Go的入口函数整个初始化过程也完成了类似的工作。


runtime.args   // 整理命令行参数 设置环境变量
runtime.osinit  // 确定CPU的core数目 调整procs的值
runtime.schedinit  // 初始化栈 内存分配器 垃圾回收器和并发调度器

初始化过程之后进入了runtime.main。这时启动了后台监控线程sysmon。执行了runtime包和用户包所有初始化init函数之后进入用户的main函数。


func main() {
    // new os thread for sysmon
    systemstack(func() {
        newm(sysmon, nil)
    })
    runtime_init() // must be before defer
    gcenable()
    main_init()
    main_main()
}

需要注意的是runtime.main是通过newprocmstart创建的。也就是说main对应的是goroutine而不是线程,所以它的地位还没有sysmon高啊


runtime.newproc
runtime.mstart

P与G的创建

schedinit

schedinit中与调度器相关的操作包括,设置最大的M数量10000;初始化当前的m;初始化P的数目默认为CPU核数,可以通过环境变量GOMAXPROCS设置;最后调整P的大小。


func schedinit() {
 ...
    sched.maxmcount = 10000
    mcommoninit(_g_.m)
    procs := int(ncpu)
    if n := atoi(gogetenv("GOMAXPROCS")); n > 0 {
        if n > _MaxGomaxprocs {
            n = _MaxGomaxprocs
        }
        procs = n
    }
    if procresize(int32(procs)) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
  ...
}

调整P大小是因为P保存在全局数组allp中,它在.data段就分配了空间 [_MaxGomaxprocs + 1]*p,这对应着256+1个指针空间。在schedinit中通过procresize将这个空间里的nprocs个指针初始化,其余的删除。

  • freeUnused P时要处理P里面原始的G队列,将他们放到全局schedt中。当然schedinit时不存在这个操作,这个逻辑是startTheWorld修改P数目准备的
  • 如果当前的P是被释放的那一拨,则将当前P与M分离,将M与allp[0]绑定。
  • 处理allp[0-nprocs],将没有本地G的P放入schedt的全局idleP链表,将有本地G队列的作为runnalblePs链表返回。

func procresize(nprocs int32) *p {
...
    // initialize new P's
    for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
            pp.id = i
            pp.status = _Pgcstop     ...
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        if pp.mcache == nil {
           pp.mcache = allocmcache()         
        }
    }
    // free unused P's
    for i := nprocs; i < old; i++ {
        p := allp[i]
        // move all runnable goroutines to the global queue
        for p.runqhead != p.runqtail {
            p.runqtail--
            gp := p.runq[p.runqtail%uint32(len(p.runq))]
            globrunqputhead(gp)
        }
     
        freemcache(p.mcache)
        p.mcache = nil
        p.status = _Pdead
        // can't free P itself because it can be 
        // referenced by an M in syscall
    }
    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
    } else {
        // release the current P and acquire allp[0]
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        acquirep(p)
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        if runqempty(p) {
            pidleput(p)
        } else {
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    atomicstore((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}

newproc

go编译器将go func() 翻译成runtime.newproc()。为了go func的执行,从右到左入栈了调用方的PC寄存器,返回值数目,参数数目,第一个参数的地址以及函数地址。


func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), ptrSize)
	pc := getcallerpc(unsafe.Pointer(&siz))
	systemstack(func() {
		newproc1(fn, (*uint8)(argp), siz, 0, pc)
	})
}

newproc1创建了G实例。从gfget()获取空闲的G对象,若获取失败则malg新建G对象。设置栈空间和保存现场的sched域以及初始状态Grunnable,runqput放入待运行队列,如果有空闲的P则尝试唤醒它来执行。

  • gfget是从p的gfree列表或全局sched的gfree链表中获取可以复用的G对象。当goroutine结束时调用goexit0时会将当前的G对象gfput到p本地的gfree队列中。
  • malg用默认的2KB栈空间来将new(g)创建的新G对象初始化。主要是通过stackalloc初始化newg.stack。
  • go func指定的执行参数会被拷贝到G的栈空间,因为它跟main所在的栈不再有任何关系,各自使用独立的栈空间。
  • 创建好的G优先放入P.runnext,或者放入数组实现的循环队列P.runq,若本地队列runq [256]*g已满则加锁放入全局队列Sched.runq。
  • 如果本地队列满会通过runqputslow 将P本地一半的任务G放到全局队列中。使得别的P可以去执行,这也是最后wakeP去唤醒其他M/P执行任务的原因。

func newproc1(fn *funcval, argp *uint8, narg int32,
              nret int32, callerpc uintptr) *g {
    _g_ := getg()
    _p_ := _g_.m.p.ptr()
  
    newg := gfget(_p_) // 获取空闲的可复用的G对象
    if newg == nil {
        newg = malg(_StackMin)  // 新建栈空间为2KB的G对象
        casgstatus(newg, _Gidle, _Gdead) // 设置G状态位Gdead
        allgadd(newg)
    }
    
  ...
    // 确定栈顶位置 并且入栈参数列表和参数个数
    memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
    // 初始化用于保存执行现场的sched域
    memclr(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    // 指定了G任务函数的返回地址 goexit
    newg.sched.pc = funcPC(goexit) + _PCQuantum 
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
  
    //设置status和id域
    newg.gopc = callerpc
    newg.startpc = fn.fn
    casgstatus(newg, _Gdead, _Grunnable)
...
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    // 将G放入待运行队列
    runqput(_p_, newg, true)
    // 如果有全局空闲的P 则尝试唤醒waitP来执行
    // 如果有M处于自旋等待P或G的状态 放弃
    // 如果当前创建的是 runtime.main 放弃
    if atomicload(&sched.npidle) != 0 &&
  atomicload(&sched.nmspinning) == 0 && 
  unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) {
        wakep()
    }
...
    return newg
}

M的创建和G的执行

从上一节可见runtime.newproc只是创建了G并放入当前P的G队列或全局G队列。如果是main goroutine,则显示调用mstart;其他goroutine则尝试wakeP去启动M的创建和G的执行。

wakeP+startm

首先G创建后会尝试通过pidleget去Sched.pidle链表获取空闲的P来执行,若没有的话就继续排队等待现有的P执行。获取到P后需要绑定M来执行,这时可以从shec.midle中获取可复用的m,通过notewakeup唤醒M;若没有空闲的M则重建newm


func wakep() {
    // be conservative about spinning threads
    if !cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    // 如果startm没有指定P则尝试获取空闲的P
    if _p_ == nil {
        _p_ = pidleget()
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                xadd(&sched.nmspinning, -1)
            }
            return
        }
    }
    mp := mget() //获取闲置的M若无则新建newM
    unlock(&sched.lock)
    if mp == nil {
        var fn func()
        if spinning {
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    mp.spinning = spinning
    mp.nextp.set(_p_)
    notewakeup(&mp.park)
}

newM

allocm主要就是初始化了m自带的名为g0的栈,默认8KB栈内存。它的栈内存地址会被传给newosproc,作为系统线程默认的栈空间。mcommoninit检查M数目是否超过默认的10000,然后将m添加到allm链表且不会释放。newosproc 表示创建OS线程,Linux调用的是clone,并指定了以下flags表示哪些进程资源可以共享,最后CLONE_THREAD表示clone出来的是线程,与当前进程显示同一个pid。同时指定了OS线程对应的启动函数是mstart

CLONE_VM| CLONE_FS | CLONE_FILES| CLONE_SIGHAND | CLONE_THREAD


func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn)
    // 设置p设置到m的nextp域
    mp.nextp.set(_p_)
    msigsave(mp)
...
    newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
}
func allocm(_p_ *p, fn func()) *m {
...
    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp)
    mp.g0 = malg(8192 * stackGuardMultiplier)
    mp.g0.m = mp
...
    return mp
}
func newosproc(mp *m, stk unsafe.Pointer) {
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), 
      unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
}

mstart

无论是main goroutine还是其他goroutine,最终G执行的起点都是mstart。mstart主要设置了G的stack空间边界以及将m与它的nextp进行绑定。绑定过程acquirep,即m获取p的mcache并设置P的状态为prunning。


func mstart() {
    _g_ := getg()
    // Initialize stack guards so that we can start calling
    // both Go and C functions with stack growth prologues.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    mstart1()
}
func mstart1() {
    _g_ := getg()
    // 初始化g0执行现场
    gosave(&_g_.m.g0.sched)
    _g_.m.g0.sched.pc = ^uintptr(0) // make sure it is never used
    asminit()
    minit()
    // 执行启动函数 通常是mspinning() sched.nmspinning--
    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }
    // 将m与它的nextp绑定
    if _g_.m.helpgc != 0 {
        _g_.m.helpgc = 0
        stopm()
    } else if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    // 进入任务调度循环 不再返回
    schedule()
}

schedule

总结G的执行过程:从各种渠道获取G任务+执行execute这个G任务。执行G时需要从当前g0栈切换到G的栈执行,返回时执行goexit清理现场,然后重新进入schedule。

  • 获取G任务优先从本地P队列中runqget获取,另外每处理n个任务就要去全局获取G任务,如果本地G和全局G,甚至网络任务netpoll都没有,则从其它的P队列steal。
  • execute任务是最终调用的是gogo函数。它完成了g0栈道G栈的切换,JMP到G任务函数代码执行。
  • G任务返回时执行的是goexit,因为在newproc1初始化G时,它的栈空间入栈的返回地址是goexit。goexit完成了G状态的清理,将G放回复用链表重新进入调度循环。

func schedule() {
    _g_ := getg()
top:
    var gp *g
    var inheritTime bool
    if gp == nil {
        // 处理n个任务后就去全局队列中获取G任务,以确保公平
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
            if gp != nil {
                resetspinning()
            }
        }
    }
    // 从P本地队列获取G任务
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
  
    // 从其它可能的地方获取G 若获取失败则block M进入休眠状态
    if gp == nil {
        gp, inheritTime = findrunnable() 
        resetspinning()
    }
    // 执行G
    execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
    _g_ := getg()
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }
    _g_.m.curg = gp
    gp.m = _g_.m
    gogo(&gp.sched)
}

func goexit0(gp *g) {
    _g_ := getg()
    casgstatus(gp, _Grunning, _Gdead)
    gp.m = nil
...
    gfput(_g_.m.p.ptr(), gp)
    schedule()
}

状态变迁

P与G的状态变迁

P创建于schedinit程序初始化时,除了当前对应main goroutine的P,其他npcrocs-1个P都放进空闲P链表中等待使用,状态为Pidle。当m与p绑定时调用acquirep会将P状态设置为Prunning。Psyscall只有进入系统调用时发生。Pdead只有调整prosize大小时用到。


const (
    _Pidle    = iota
    _Prunning // Only this P is allowed to change from _Prunning.
    _Psyscall
    _Pgcstop
    _Pdead
)

G创建于newproc即通过go关键字调用函数时初始为Gidle。在给G分配栈空间之前G为Gdead,初始化后放进队列之前状态改为Grunnable。m真正执行到G后状态才是Grunning。


const (
    _Gidle            = iota // 0
    _Grunnable               // 1 runnable and on a run queue
    _Grunning                // 2
    _Gsyscall                // 3
    _Gwaiting                // 4
    _Gmoribund_unused        // 5 currently unused, but hardcoded in gdb scripts
    _Gdead                   // 6
    _Genqueue                // 7 Only the Gscanenqueue is used.
    _Gcopystack              // 8 in this state when newstack is moving the stack
)

gopark+goready

Gwaiting只有park_m才会出现,这时除非发生runtime.ready否则G永远不会执行。因为Gwaiting并不出现在待运行队列中。channel操作 定时器 网络poll都有可能park goroutine。


func park_m(gp *g) {
    _g_ := getg()
    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()
    if _g_.m.waitunlockf != nil {
        fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

midle与gsyscall

当陷入系统调用的G返回时,首先要dropg与原始的M分开,因为原始的M已经没有P给它提供内存了。之后G要重新pidleget找到一个空闲的P入队,若没有则入队全局队列。最后stopm停止当前m并继续schedule。


func exitsyscall0(gp *g) {
    _g_ := getg()
    casgstatus(gp, _Gsyscall, _Grunnable)
    dropg()
    lock(&sched.lock)
    _p_ := pidleget()
    if _p_ == nil {
        globrunqput(gp)
    } else if atomicload(&sched.sysmonwait) != 0 {
        atomicstore(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    if _p_ != nil {
        acquirep(_p_)
        execute(gp, false) // Never returns.
    }
    if _g_.m.lockedg != nil {
        // Wait until another thread schedules gp and so m again.
        stoplockedm()
        execute(gp, false) // Never returns.
    }
    stopm()
    schedule() // Never returns.
}

当M从系统调用退出时exitsyscall0会调用stopm把m放进空闲m链表,陷入休眠等待被唤醒。startm时所谓空闲的M的来源就是从系统调用中恢复的M。startm发生在两个时候:有新的G加入wakeP时,handOffP时P还有别的G任务。这时都会触发空闲M重用,对应notewakeup


func stopm() {
    _g_ := getg()
retry:
    lock(&sched.lock)
    mput(_g_.m)
    unlock(&sched.lock)
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

有疑问加站长微信联系(非本文作者)

本文来自:nino's blog

感谢作者:nino's blog

查看原文:Goroutine源码记录

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1213 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传