记一次关于goroutine调度的探索

overstep111 · · 516 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

问题

有以下三段代码,内容大体相同,只有很小的差别。

  • 第一段

func main() {

runtime.GOMAXPROCS(0)

var wg sync.WaitGroup

for i := 0; i < 5; i++ {

wg.Add(1)

go func(n int) {

print(n, " ")

wg.Done()

}(i)

}

wg.Wait()

}

  • 第二段

func main() {

runtime.GOMAXPROCS(1)

var wg sync.WaitGroup

for i := 0; i < 5; i++ {

wg.Add(1)

go func(n int) {

print(n, " ")

wg.Done()

}(i)

}

wg.Wait()

}

  • 第三段

func main() {

runtime.GOMAXPROCS(1)

var wg sync.WaitGroup

for i := 0; i < 5; i++ {

wg.Add(1)

go func(n int) {

time.Sleep(time.Second)

print(n, " ")

wg.Done()

}(i)

}

wg.Wait()

}

其中前两段代码是无意中看到的一道go面试题中的,虽然也看到了一些对于二者输出结果的解释,但感觉还是不够清晰,于是自己做了一些尝试,并无意中发现了第三段代码。好,下面先给出这三段代码相应的输出:

  • 第一段:0到4的乱序输出

  • 第二段:4 0 1 2 3

  • 第三段:3 4 0 1 2

探索

  • 第一段的乱序输出是很好理解的,CPU是多核的,创建了多个goroutine之后,每个goroutine得到执行的顺序无法保证,所以会出现0到4的乱序输出。

  • 第二段输出的循序也是很好理解的,由于设置了单核执行,goroutine根据创建顺序依次执行,但问题在于最后创建的goroutine最先执行了。

一顿搜索查阅之后,知道了大概原因,但总觉得不够清晰,最终找到了以下从P中获取G(go源码中表示goroutine的数据结构,关于G、P、M网上有很多大佬的详细介绍,但并不是本文重点,在这里不再赘述,有兴趣可以自行百度)的源码,由于本文场景中G的数量较少,所以添加时会全部添加到P的本地队列中(超出数量256后会转移一半到全局队列中去,还是扯到了这一块,不太了解的还是建议先去了解一下),所以以下函数就是实际运行时获取G时调用的函数:


// Get g from local runnable queue.

// If inheritTime is true, gp should inherit the remaining time in the

// current time slice. Otherwise, it should start a new time slice.

// Executed only by the owner P.

func runqget(_p_ *p) (gp *g, inheritTime bool) {

// If there's a runnext, it's the next G to run.

for {

next := _p_.runnext

if next == 0 {

break

}

if _p_.runnext.cas(next, 0) {

return next.ptr(), true

}

}

for {

h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers

t := _p_.runqtail

if t == h {

return nil, false

}

gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()

if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume

return gp, false

}

}

}

细节上先不去关注,我们可以很清楚的看到该函数会首先检查P的runnext,如果符合条件,就直接返回runnext,那么下面继续去看一下runnext的赋值,以下是向P的本地队列中添加G的源码:


func runqput(_p_ *p, gp *g, next bool) {

if randomizeScheduler && next && fastrand()%2 == 0 {

next = false

}

if next {

retryNext:

oldnext := _p_.runnext

if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {

goto retryNext

}

if oldnext == 0 {

return

}

// Kick the old runnext out to the regular run queue.

gp = oldnext.ptr()

}

retry:

h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers

t := _p_.runqtail

if t-h < uint32(len(_p_.runq)) {

_p_.runq[t%uint32(len(_p_.runq))].set(gp)

atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption

return

}

if runqputslow(_p_, gp, h, t) {

return

}

// the queue is not full, now the put above must succeed

goto retry

}

可以看到在next为true的情况下,新创建的G就会被放到P的runnext中,然后runnext的旧值会被添加到runq队列中去,而继续阅读源码发现,在新建一个G的时候next固定为true。next为false的情况只有两种,一是从全局队列中获取G到本地时,二是 gc的goroutine被创建的时候(一般由go运行时自主创建)。

那么现在答案就很清晰了,当最后一个goroutine创建出来后,P的runnext的输出是4,而P的本地队列中G的输出依次是0,1,2,3,执行G的时候先执行runnext,再依次执行队列,输出结果就是4,0,1,2,3。

  • 第三段代码看起来十分奇怪,我们一步一步来探索。首先其与第二段代码的唯一不同之处在于在每个go func中添加了一行time.Sleep(time.Second),首先我找到了time.Sleep函数的源码:

func timeSleep(ns int64) {

if ns <= 0 {

return

}

gp := getg()

t := gp.timer

if t == nil {

t = new(timer)

gp.timer = t

}

*t = timer{}

t.when = nanotime() + ns

t.f = goroutineReady

t.arg = gp

tb := t.assignBucket()

lock(&tb.lock)

if !tb.addtimerLocked(t) {

unlock(&tb.lock)

badTimer()

}

goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)

}

大概就是先调用了goparkunlock,然后在sleep时间结束后调用goroutineReady,这两个函数对应的在runtime中的调用函数如下


// Puts the current goroutine into a waiting state and unlocks the lock.

// The goroutine can be made runnable again by calling goready(gp).

func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {

gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)

}

func goready(gp *g, traceskip int) {

systemstack(func() {

ready(gp, traceskip, true)

})

}

goparkunlock会使当前G进行等待状态,使其在P绑定G时忽略该G,而goroutineReady函数调用了runtime的goready函数,goready中又继续调用了ready函数,ready函数源码如下:


// Mark gp ready to run.

func ready(gp *g, traceskip int, next bool) {

if trace.enabled {

traceGoUnpark(gp, traceskip)

}

status := readgstatus(gp)

// Mark runnable.

_g_ := getg()

mp := acquirem() // disable preemption because it can be holding p in a local var

if status&^_Gscan != _Gwaiting {

dumpgstatus(gp)

throw("bad g->status in ready")

}

// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq

casgstatus(gp, _Gwaiting, _Grunnable)

runqput(_g_.m.p.ptr(), gp, next)

if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {

wakep()

}

releasem(mp)

}

可以看到最终调用的还是上面给出的runqput函数,把唤醒的G加到P本地队列中去,而且next参数传的时true,也就是说添加时会优先添加到P的runnext。那么答案就很明显了,我们先接着第二段输出4 0 1 2 3来说,第三段代码德执行顺序依然是4 0 1 2 3(这里只代表执行相应输出的goroutine,不代表实际输出),但是执行每一个的时候都会执行到time.Sleep进行休眠,然后休眠时间结束之后会调用ready去重新添加到本地队列,添加操作的顺序也是4 0 1 2 3(与休眠的顺序相对应),但是此时由于传入的next值为true,所以会依次对P的runnext进行更改,最终唤醒操作执行完之后,P的runnext为3,runq为4 0 1 2,所以最终的输出结果就是3 4 0 1 2。为了加以验证,我还继续尝试了以下代码


func main() {

runtime.GOMAXPROCS(1)

var wg sync.WaitGroup

for i := 0; i < 5; i++ {

wg.Add(1)

go func(n int) {

time.Sleep(time.Second)

time.Sleep(time.Second)

print(n, " ")

wg.Done()

}(i)

}

wg.Wait()

}

输出结果是2 3 4 0 1,验证了本思路的正确。

总结

本人是一个golang的初学者,在本次探索过程中,阅读了一部分的go源码,对go语言有了更深的理解。虽然得出的结论在实际使用中应该用不到,但是在此过程学到了很多东西,也锻炼了阅读源码解决问题的能力。当然毕竟是初学者,如果有什么描述不当之处还请各位不吝赐教。

参考资料

《GO语言学习笔记》——雨痕

一道经常考的面试题 https://studygolang.com/articles/23333


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:overstep111

查看原文:记一次关于goroutine调度的探索

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

516 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传