问题
有以下三段代码,内容大体相同,只有很小的差别。
- 第一段
func main() {
runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
print(n, " ")
wg.Done()
}(i)
}
wg.Wait()
}
- 第二段
func main() {
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
print(n, " ")
wg.Done()
}(i)
}
wg.Wait()
}
- 第三段
func main() {
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
time.Sleep(time.Second)
print(n, " ")
wg.Done()
}(i)
}
wg.Wait()
}
其中前两段代码是无意中看到的一道go面试题中的,虽然也看到了一些对于二者输出结果的解释,但感觉还是不够清晰,于是自己做了一些尝试,并无意中发现了第三段代码。好,下面先给出这三段代码相应的输出:
第一段:0到4的乱序输出
第二段:4 0 1 2 3
第三段:3 4 0 1 2
探索
第一段的乱序输出是很好理解的,CPU是多核的,创建了多个goroutine之后,每个goroutine得到执行的顺序无法保证,所以会出现0到4的乱序输出。
第二段输出的循序也是很好理解的,由于设置了单核执行,goroutine根据创建顺序依次执行,但问题在于最后创建的goroutine最先执行了。
一顿搜索查阅之后,知道了大概原因,但总觉得不够清晰,最终找到了以下从P中获取G(go源码中表示goroutine的数据结构,关于G、P、M网上有很多大佬的详细介绍,但并不是本文重点,在这里不再赘述,有兴趣可以自行百度)的源码,由于本文场景中G的数量较少,所以添加时会全部添加到P的本地队列中(超出数量256后会转移一半到全局队列中去,还是扯到了这一块,不太了解的还是建议先去了解一下),所以以下函数就是实际运行时获取G时调用的函数:
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
细节上先不去关注,我们可以很清楚的看到该函数会首先检查P的runnext,如果符合条件,就直接返回runnext,那么下面继续去看一下runnext的赋值,以下是向P的本地队列中添加G的源码:
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
可以看到在next为true的情况下,新创建的G就会被放到P的runnext中,然后runnext的旧值会被添加到runq队列中去,而继续阅读源码发现,在新建一个G的时候next固定为true。next为false的情况只有两种,一是从全局队列中获取G到本地时,二是 gc的goroutine被创建的时候(一般由go运行时自主创建)。
那么现在答案就很清晰了,当最后一个goroutine创建出来后,P的runnext的输出是4,而P的本地队列中G的输出依次是0,1,2,3,执行G的时候先执行runnext,再依次执行队列,输出结果就是4,0,1,2,3。
- 第三段代码看起来十分奇怪,我们一步一步来探索。首先其与第二段代码的唯一不同之处在于在每个go func中添加了一行time.Sleep(time.Second),首先我找到了time.Sleep函数的源码:
func timeSleep(ns int64) {
if ns <= 0 {
return
}
gp := getg()
t := gp.timer
if t == nil {
t = new(timer)
gp.timer = t
}
*t = timer{}
t.when = nanotime() + ns
t.f = goroutineReady
t.arg = gp
tb := t.assignBucket()
lock(&tb.lock)
if !tb.addtimerLocked(t) {
unlock(&tb.lock)
badTimer()
}
goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)
}
大概就是先调用了goparkunlock,然后在sleep时间结束后调用goroutineReady,这两个函数对应的在runtime中的调用函数如下
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
goparkunlock会使当前G进行等待状态,使其在P绑定G时忽略该G,而goroutineReady函数调用了runtime的goready函数,goready中又继续调用了ready函数,ready函数源码如下:
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
releasem(mp)
}
可以看到最终调用的还是上面给出的runqput函数,把唤醒的G加到P本地队列中去,而且next参数传的时true,也就是说添加时会优先添加到P的runnext。那么答案就很明显了,我们先接着第二段输出4 0 1 2 3来说,第三段代码德执行顺序依然是4 0 1 2 3(这里只代表执行相应输出的goroutine,不代表实际输出),但是执行每一个的时候都会执行到time.Sleep进行休眠,然后休眠时间结束之后会调用ready去重新添加到本地队列,添加操作的顺序也是4 0 1 2 3(与休眠的顺序相对应),但是此时由于传入的next值为true,所以会依次对P的runnext进行更改,最终唤醒操作执行完之后,P的runnext为3,runq为4 0 1 2,所以最终的输出结果就是3 4 0 1 2。为了加以验证,我还继续尝试了以下代码
func main() {
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
time.Sleep(time.Second)
time.Sleep(time.Second)
print(n, " ")
wg.Done()
}(i)
}
wg.Wait()
}
输出结果是2 3 4 0 1,验证了本思路的正确。
总结
本人是一个golang的初学者,在本次探索过程中,阅读了一部分的go源码,对go语言有了更深的理解。虽然得出的结论在实际使用中应该用不到,但是在此过程学到了很多东西,也锻炼了阅读源码解决问题的能力。当然毕竟是初学者,如果有什么描述不当之处还请各位不吝赐教。
参考资料
《GO语言学习笔记》——雨痕
一道经常考的面试题 https://studygolang.com/articles/23333
有疑问加站长微信联系(非本文作者)