从之前的文章《Golang 中 GPM 之 G 从哪里来》
,我们知道了 Golang 对于使用 G 是有一定的复用机制,这样减少了资源的浪费,接下来我们要看下 G 生成后会怎么样
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
...
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead) //这里将新建的 G 中对应的状态是从 Gidle -> Gdead 中
...
}
// 以上我们得到了这次新产生的 G
...
newg.startpc = fn.fn
...
casgstatus(newg, _Gdead, _Grunnable) // 当新的 G 初始化,并将其状态变更 Gdead -> Grunnable
// 并将新的 G 放到当前 G 运行的 M 对应的 P 中
runqput(_p_, newg, true)
...
}
那接下来的重点就在 runqput(_p_ *p, gp *g, next bool)
了
// runqput 优先将 G 放在本地的可运行队列
// 如果 next 是 false ,则加入可运行队列的队列的尾部
// 如果 next 是 true ,则 runqput 将 G 放到当前 P 下一个执行的位置
// 如果当前 P 队列已经满了,则会将当前队列的 “一半+1” 放到 G 的全局队列中
func runqput(_p_ *p, gp *g, next bool) {
// 这里表示如果是随机的,则不将新产生的 G 放到当前 P 的 runnext
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
// 判断是否将新增的 G 放到当前 P 的 runnext,
// 后续将被替换出来的 G 放到本地 G 队列的尾部
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
gp = oldnext.ptr()
}
retry:
// atomic.LoadAcq 获取上次 atomic.CaSRel 的 _p_.runqhead 的值
h := atomic.LoadAcq(&_p_.runqhead)
t := _p_.runqtail
// 这里的 _p_.runq 的数据类型是[256]guintptr,说明总计一个 P 最多
// 有 256 个 G 放在本地可运行队列中和 1 个接下来运行的 G(runnext)
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)//通过 % 说明这是一个循环队列
atomic.StoreRel(&_p_.runqtail, t+1) //将末尾的位置 +1
return
}
//如果队列满了,将当前 P 本地可运行队列的一半以及 oldnext 放到全局队列中
if runqputslow(_p_, gp, h, t) {
return
}
goto retry
}
对 runqputslow(_p_ *p, gp *g, h, t uint32)
的实现细节可以看下面
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
//这里的长度取的是 p 本地可运行队列总长的一半 +1,即 256/2+1
var batch [len(_p_.runq)/2 + 1]*g
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
// 这个 for 循环次数就是可运行队列总量的一半
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
// 这里根据 _p_.runqhead 判断是否还是老位置,
// 如果是的话,则将起始位置增加 n ,如果失败则说明起始位置有变更过
if !atomic.CasRel(&_p_.runqhead, h, h+n) {
return false
}
batch[n] = gp
if randomizeScheduler { // 这里只是将 batch 里面的 G 来进行随机排序
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// 这里的意思是将排序后的 batch 用链表的形式绑定起来
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
lock(&sched.lock)
// 这里是将这个 batch 放到全局可运行队列(这使用了批量转移),
// 且将 batch 放在全局可运行队列的尾部
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
总结:
第一步:生成一个状态为 _Grunnable 的新 G
第二步:将新 G 放到当前 G 相关的 P 的下一个运行位(即直接插队成下一个要执行的G,替换出来的 G 就成为 oldnext)
第三步:根据 oldnext 的情况
(1)如果 oldnext 本来就没有,则结束
(2)如果有,则接下去后面的步骤
第四步:根据当前 P 的本地可运行队列是否满了
(1)如果未满,则将 oldnext 放到本地可运行队列的后面
(2)如果满了,则将当前 P 本地可运行队列队列的一半以及当前的 oldnext 放到一起放到全局可运行队列的尾部
有疑问加站长微信联系(非本文作者)