声明
下面的分析均基于Golang1.14版本。
状态图
主要流程
1.procresize创建,Go程序初始化时初始CPU数量个P。
2.acquirep进入运行状态。当有新的G创建或waiting状态的G变得可运行,并且有P空闲时,通过acquirep获取p来运行最新的G。
3.entersyscall,进入系统调用状态。当P绑定的M正在运行的G进入系统调用状态,P也随之进入系统调用状态。
4.exitsyscallfast,退出系统调用状态。如果G快速退出系统调用,此时P理应等待G,当G退出系统调用时,继续运行G的代码。
5.retake,如果G长时间处于系统调用,此时P应与对应的G,M解绑。考虑到如果所有G都阻塞在系统调用,则整个Go程序都阻塞在系统调用,无法执行用户代码,因此需要一个将长时间陷入系统调用的P解绑M,重新绑定空闲的M执行用户代码。
6.releasep,让P进入空闲状态。如果P上可运行的G的队列空闲,并且无法从其它P中偷取G,此时应让P进入空闲状态。
procresize函数
1.谁在什么时候调用?程序初始化时或者修改CPU数量时(这种情况暂不考虑)调用。
2.P的状态变化?从无到有的过程,初始化CPU数量个P,所有P的状态都是_PIdle。
// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
old := gomaxprocs // 在这之前P的数量
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime) // 此处统计的应该是总的P的存在的时间
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) { // 根据需要扩展allp来存储P
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// initialize new P's
for i := old; i < nprocs; i++ { // 将新创建的初始化 然后投入allp中
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i) // 初始化P 主要是分配mcache
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) // 将创建的P放入allp中
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 如果当前M绑定的P仍有效
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
// release the current P and acquire allp[0].
//
// We must do this before destroying our current P
// because p.destroy itself has write barriers, so we
// need to do that from a valid P.
if _g_.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0 // M绑定的P已无效 解绑
}
// M绑定新的P(allp[0])
_g_.m.p = 0
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// g.m.p is now set, so we no longer need mcache0 for bootstrapping.
mcache0 = nil // mcache0是为没有P的情况下给第一个P分配mcache 此时已经有P 则不需要mcache0
// release resources from unused P's
for i := nprocs; i < old; i++ { // 如果allp缩小 则释放多余的P
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs { // 缩小切片allp
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- { // 将除了当前的P以外的P 进行初始化
p := allp[i]
if _g_.m.p.ptr() == p {
continue
}
p.status = _Pidle
if runqempty(p) { // 如果当前P没有要运行的G 将P放入Idle队列
pidleput(p)
} else {
p.m.set(mget()) // 从空闲的M队列中取出M 将其和P进行绑定
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) // 设置gomaxprocs为当前P的数量
return runnablePs
}
acquirep
1.谁在什么时候调用?当有新的G创建(newproc)或有G从waiting进入Runnable状态(ready)时,如果有空闲的P且没有正在找可运行的G的P,则唤醒一个P来运行这个刚创建的G。
2.P的状态转换?P从原先的Idle状态进入Running状态。
entersyscall
1.谁在什么时候调用?当P绑定的M要进入系统调用或者CGo调用时,触发entersyscall。
2.P的状态转换?P从Running状态进入Syscall状态,并且和当前绑定的M解绑。
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
_g_.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
_g_.stackguard0 = stackPreempt
_g_.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
casgstatus(_g_, _Grunning, _Gsyscall) //切换G的状态
if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
systemstack(func() {
print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
throw("entersyscall")
})
}
// 按需记录调用信息
if trace.enabled {
systemstack(traceGoSysCall)
// systemstack itself clobbers g.sched.{pc,sp} and we might
// need them later when the G is genuinely blocked in a
// syscall
save(pc, sp)
}
if atomic.Load(&sched.sysmonwait) != 0 { // 如果系统监控线程在等待 则执行下面的函数 具体不深究
systemstack(entersyscall_sysmon)
save(pc, sp)
}
if _g_.m.p.ptr().runSafePointFn != 0 { // 如果runSafePointFn不为0 则执行 具体不深究
// runSafePointFn may stack split if run on this stack
systemstack(runSafePointFn)
save(pc, sp)
}
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
// m p 解绑
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp)
_g_.m.p = 0
atomic.Store(&pp.status, _Psyscall) //切换P的状态
if sched.gcwaiting != 0 { // 正在gc时 则调用以下代码 详细的不深究
systemstack(entersyscall_gcwait)
save(pc, sp)
}
_g_.m.locks--
}
exitsyscallfast
1.谁在什么时候调用?进入系统调用的G,迅速退出系统调用后,此时P尚未和别的M绑定,于是M和P重新绑定,继续运行该G。
2.P的状态转换?P从Syscall状态进入Running状态,M和P重新绑定。
// 删除了部分无关代码
func exitsyscallfast(oldp *p) bool {
_g_ := getg()
// Freezetheworld sets stopwait but does not retake P's.
if sched.stopwait == freezeStopWait {
return false
}
// Try to re-acquire the last P. sysmon线程未剥夺P
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
// There's a cpu for us, so we can run.
// 切换p的状态 并且重新绑定m p
wirep(oldp)
exitsyscallfast_reacquired()
return true
}
return false
}
retake
1.谁在什么时候调用?sysmon(系统监控线程)在P进入Syscall一段时间后,将P从Syscall状态切为Idle状态。
2.P的状态转换?P从Syscall进入Idle状态。
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock) // 给allp加锁
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick) // schedtick表示p执行execute的次数
if int64(pd.schedtick) != t { // 调整p.sysmontick里的schedtick次数
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // 如果P运行时间过长 则抢占P
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true // Syscal状态P未绑定M 无法抢占
}
}
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick) // 当P从Syscall状态退出时++
if !sysretake && int64(pd.syscalltick) != t { // 如果未触发sysretake 尝试调整p.sysmontick.syscalltick次数
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
// 如果P的队列为空 且已有空闲的P或者有P绑定的M处于spinning(寻找可运行的G)且系统调用时间小于10ms
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1) // ???
if atomic.Cas(&_p_.status, s, _Pidle) { // P转为Idle状态
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_) // 尝试把P放入空闲队列中
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// if it has local work, start it straight away
if !runqempty(_p_) || sched.runqsize != 0 { // 如果本地队列不为空或者全局运行队列不为空
startm(_p_, false)
return
}
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
// 如果没有正在spinning状态的M 且 没有P处于Idle状态 则不退出
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
if sched.runqsize != 0 { // 如果全局G运行队列不为空
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
// 如果是最后一个P 并且...
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
pidleput(_p_) // 将P放入空闲队列中
unlock(&sched.lock)
}
releasep
1.谁在什么时候调用?当P本地的可运行G的队列为空且无法从其它地方窃取可运行的G时,P和M解绑,进入空闲状态。
2.P的状态转换?P和M解绑,从Running进入Idle。
func releasep() *p {
_g_ := getg()
if _g_.m.p == 0 {
throw("releasep: invalid arg")
}
_p_ := _g_.m.p.ptr()
if _p_.m.ptr() != _g_.m || _p_.status != _Prunning {
print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n")
throw("releasep: invalid p state")
}
if trace.enabled {
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p = 0
_p_.m = 0
_p_.status = _Pidle
return _p_
}
总结
1.P和G的状态转换息息相关,理解G的状态转换后,理解P的状态转顺理成章。
有疑问加站长微信联系(非本文作者)