声明
下面的分析均基于Golang1.14版本。
M的状态
M只有Running和Stop这2个状态,还有一个spinning中间态,当从Running转为Stop时,会先spinning寻找可运行的G,如果找不到则进入Stop。
主要流程
1.mstart,Go程序初始化时,第一个M是由mstart创建,新的物理线程创建时,调用的函数也是mstart。
2.startm,当有新的G创建或者有G从waiting进入running且还有空闲的P,此时会调用startm,获取一个M和空闲的P绑定执行G。
3.newm,当调用startm时,如果没有空闲的M则会通过newm创建M。
4.stopm,在2种情况下会执行stopm,一是当M绑定的P无可运行的G且无法从其它P窃取可运行的G时,M先进入spinning状态,然后退出。二是当M和G进入系统调用后,长时间未退出,P被retake且M找不到空闲的P绑定,此时M会调用stopm。
5.spinning状态,在findrunnable函数中,会短暂进入spinning状态,如果找不到可运行的G则调用stopm。
PS:上述主要流程解释了函数什么时候由谁触发调用,后面不再赘述。
线程的休眠和唤醒
1.M绑定了一个物理线程,M的running和stop就代表了物理线程的状态,那么物理线程是如何休眠和唤醒的呢?下面以Linux操作系统为例介绍物理线程的变化。
2.Linux线程同步通过futex系统调用实现,futex详细介绍。
#include <linux/futex.h>
#include <sys/time.h>
// 主要关注前3个参数,uaddr表示同步的内存地址。
//futex_op表示操作类型,这里使用了FUTEX_WAIT,FUTEX_WAKE这2种类型。
//val在FUTEX_WAIT时表示当uaddr指向的值等于val时休眠
//val在FUTEX_WAKE时表示唤醒休眠在uaddr上的线程数量(Go中默认是1)
int futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, /* or: uint32_t val2 */
int *uaddr2, int val3);
3.线程休眠
asmcgoyield是执行cgo_yield函数,具体的不深究。
func notesleep(n *note) {
gp := getg() // 线程休眠只发生在退出系统调用或者schedule函数 因此必然是g0
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
ns := int64(-1) // 通常休眠后不会主动唤醒
if *cgo_yield != nil {
// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
ns = 10e6 // 在cgo情况下 每休眠10ms唤醒一次
}
for atomic.Load(key32(&n.key)) == 0 { // 当note.key==0时 休眠
gp.m.blocked = true
futexsleep(key32(&n.key), 0, ns) // 调用futexsleep进入休眠
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil) // 如果是cgo调用asmcgocall
}
gp.m.blocked = false
}
}
func futexsleep(addr *uint32, val uint32, ns int64) {
if ns < 0 { // 如果ns < 0 则一直休眠不主动唤醒
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns) // 设置休眠时间
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
4.线程唤醒
func notewakeup(n *note) {
old := atomic.Xchg(key32(&n.key), 1) // 将note.key设置为1 note.key休眠时为0
if old != 0 {
print("notewakeup - double wakeup (", old, ")\n")
throw("notewakeup - double wakeup")
}
futexwakeup(key32(&n.key), 1) // 尝试唤醒note.key
}
func futexwakeup(addr *uint32, cnt uint32) {
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// 正常情况下不会执行到下面的代码
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
5.总结 M的休眠和唤醒都是通过m.note.key进行同步,对M的休眠和唤醒操作都是操作m.note.key所在的内存。
mstart
此时初始化的M为m0,是Go进程的第一个M。
func mstart() {
_g_ := getg()
osStack := _g_.stack.lo == 0
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// Initialize stack guard so that we can start calling regular
// Go code.
_g_.stackguard0 = _g_.stack.lo + _StackGuard
// This is the g0, so we can also call go:systemstack
// functions, which check stackguard1.
_g_.stackguard1 = _g_.stackguard0
mstart1()
// Exit this thread.
switch GOOS {
case "windows", "solaris", "illumos", "plan9", "darwin", "aix":
// Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
// the stack, but put it in _g_.stack before mstart,
// so the logic above hasn't set osStack yet.
osStack = true
}
mexit(osStack)
}
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此处的pc和sp是mstart调用mstart1时的pc和sp
asminit() // 针对不同的CPU进行初始化 忽略
minit() // 主要是将阻塞的信号屏蔽 阻塞该信号
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 {
mstartm0() // 初始化信号处理函数
}
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
func minit() {
// The alternate signal stack is buggy on arm and arm64.
// The signal handler handles it directly.
if GOARCH != "arm" && GOARCH != "arm64" {
minitSignalStack() // 信号回调栈 信号处理函数使用的栈
}
minitSignalMask() // 初始化信号处理 不深究
getg().m.procid = uint64(pthread_self())
}
func mstartm0() {
// Create an extra M for callbacks on threads not created by Go.
// An extra M is also needed on Windows for callbacks created by
// syscall.NewCallback. See issue #6751 for details.
if (iscgo || GOOS == "windows") && !cgoHasExtraM {
cgoHasExtraM = true
newextram()
}
initsig(false) // 注册信号处理函数 不递归深究
}
startm
寻找空闲的M和P,将P绑定到M中的m.nextp,并且尝试通过m.note唤醒M。当M唤醒后,和m.nextp指定的P绑定。
func startm(_p_ *p, spinning bool) {
lock(&sched.lock) // 对sched上锁
if _p_ == nil {
_p_ = pidleget() // 获取空闲的P
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
mp := mget() // 获取空闲的M
unlock(&sched.lock) // 释放sched中的锁
if mp == nil { // 如果没有空闲m 则新建一个空闲的m
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
// 异常情况抛出异常
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_) // 设置M的nextp 当M唤醒后会和m.nextp中的P绑定
// 通过m.note唤醒M
notewakeup(&mp.park)
}
stopm
调用stopm时,P和M已经解绑,此时将M投入全局的空闲队列并且伴随物理线程一起休眠。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // m投入全局的空闲m列表中
unlock(&sched.lock)
// 线程m将停在 notesleep 中
notesleep(&_g_.m.park)
noteclear(&_g_.m.park) // 休眠时m.note.key == 0 当m.note.key != 0时退出休眠 此时回复m.note.key = 0
acquirep(_g_.m.nextp.ptr()) // 和m.nextp进行绑定
_g_.m.nextp = 0 // m.nextp设置为0
}
newm
在源码剖析前先分析newm要做什么。
1.创建M对应的结构体。
2.创建和M绑定的g0。
3.创建物理线程进入休眠,并且M和物理线程绑定。
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) // 根据P和fn创建M
mp.nextp.set(_p_) // 设置M的nextp
mp.sigmask = initSigmask
// 删除部分代码 go 调用C 然后调用 go? 暂不考虑
newm1(mp)
}
func allocm(_p_ *p, fn func()) *m {
_g_ := getg()
acquirem() // disable GC because it can be called from sysmon
if _g_.m.p == 0 { // 如果P空闲则尝试获取P
acquirep(_p_) // temporarily borrow p for mallocs in this function
}
// Release the free M list. We need to do this somewhere and
// this may free up a stack we can use.
//删掉部分代码 释放freem 不太理解为什么要在这里做
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp) // m初始化 不深究
// In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
// 初始化g0 注意 有cgo的情况下 g0不分配栈 而是使用物理线程的栈 为什么呢?
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
mp.g0.m = mp
if _p_ == _g_.m.p.ptr() {
releasep()
}
releasem(_g_.m)
return mp
}
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
// round2把数值向上调整为2的幂次
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
// 主要是栈大小的调整和栈内存的实际分配
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
// Clear the bottom word of the stack. We record g
// there on gsignal stack during VDSO on ARM and ARM64.
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}
func newm1(mp *m) {
if iscgo { // 如果是cgo
// cgo情况下 会将M作为参数传入并且最终调用mstart函数 在mstart中具体分析
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(funcPC(mstart)) //
if msanenabled {
msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
execLock.rlock() // Prevent process clone.
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock() // Prevent process clone.
newosproc(mp) // 创建物理线程
execLock.runlock()
}
// startm 部分节选
func mstart() {
osStack := _g_.stack.lo == 0 // cgo时 未初始化g0的栈 使用os的栈
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
// g0的栈绑定到物理线程的栈上
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
mstart1()
}
func mstart1() {
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
// 创建物理线程来执行mstart
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的栈作为线程的栈
/*
* note: strace gets confused if we use CLONE_PTRACE here.
*/
if false {
print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")
}
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// 通过clone创建线程 g0.stack作为栈 mstart作为启动函数
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
if ret < 0 {
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
if ret == -_EAGAIN {
println("runtime: may need to increase max user processes (ulimit -u)")
}
throw("newosproc")
}
}
总结:
1.g0的栈和物理线程使用的栈是统一的。
2.cgo情况下,使用物理线程分配的栈,原因是cgo调用的C的库,C代码都是运行在物理线程上,如果不使用物理线程大小的栈,cgo代码可能在其它语言调用时是正常的,而在go中调用失败,栈溢出。
3.newm创建的M已经在执行schedule函数了,不需要再度唤醒。
findrunnable--spinning
spinning主要是
1.GC,网络,timer相关的处理。
2.尝试从全局的和其它的P中窃取可运行的G。
3.找不到可运行的G则stopm。
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// local runq 本地的可运行G队列
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq 全局的可运行G队列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Steal work from other P's. 从其它P中窃取部分G
procs := uint32(gomaxprocs)
ranTimer := false
// 运行中的P有一半是在spinning 则直接stop
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ { // 总共找4次 每次随机一个起始的P进行偷取
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle(delta) { // 进入stop前回调
// At least one goroutine got woken.
goto top
}
// 再次检查全局的G运行队列
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again 再次尝试从其它P中偷取
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
stopm()
goto top
}
总结
1.M包含物理线程运行所需要的数据,P包含调度G所需要的数据。
有疑问加站长微信联系(非本文作者)