企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 6.4 调度循环 所有的初始化工作都已经完成了,是时候启动运行时调度器了。 我们已经知道,当所有准备工作都完成后, 最后一个开始执行的引导调用就是`runtime.mstart`了。现在我们来研究一下它在干什么。 ``` TEXT runtime·rt0_go(SB),NOSPLIT,$0 (...) CALL runtime·newproc(SB) // G 初始化 POPQ AX POPQ AX // 启动 M CALL runtime·mstart(SB) // 开始执行 RET DATA runtime·mainPC+0(SB)/8,$runtime·main(SB) GLOBL runtime·mainPC(SB),RODATA,$8 ``` ## 6.4.1 执行前的准备 ### `mstart`与`mstart1` 在启动前,在[6.2 初始化](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/init)中我们已经了解到 G 的栈边界是还没有初始化的。 因此我们必须在开始前计算栈边界,因此在`mstart1`之前,就是一些确定执行栈边界的工作。 当`mstart1`结束后,会执行`mexit`退出 M。`mstart`也是所有新创建的 M 的起点。 ``` //go:nosplit //go:nowritebarrierrec func mstart() { _g_ := getg() // 终于开始确定执行栈的边界了 // 通过检查 g 执行占的边界来确定是否为系统栈 osStack := _g_.stack.lo == 0 if osStack { // 根据系统栈初始化执行栈的边界 // cgo 可能会离开 stack.hi // minit 可能会更新栈的边界 size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } // 初始化栈 guard,进而可以同时调用 Go 或 C 函数。 _g_.stackguard0 = _g_.stack.lo + _StackGuard _g_.stackguard1 = _g_.stackguard0 // 启动! mstart1() // 退出线程 if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" { // 由于 windows, solaris, darwin, aix 和 plan9 总是系统分配的栈,在在 mstart 之前放进 _g_.stack 的 // 因此上面的逻辑还没有设置 osStack。 osStack = true } // 退出线程 mexit(osStack) } ``` 再来看`mstart1`。 ``` func mstart1() { _g_ := getg() (...) // 为了在 mcall 的栈顶使用调用方来结束当前线程,做记录 // 当进入 schedule 之后,我们再也不会回到 mstart1,所以其他调用可以复用当前帧。 save(getcallerpc(), getcallersp()) (...) minit() // 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程 if _g_.m == &m0 { mstartm0() } // 执行启动函数 if fn := _g_.m.mstartfn; fn != nil { fn() } // 如果当前 m 并非 m0,则要求绑定 p if _g_.m != &m0 { // 绑定 p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 彻底准备好,开始调度,永不返回 schedule() } ``` 几个需要注意的细节: 1. `mstart`除了在程序引导阶段会被运行之外,也可能在每个 m 被创建时运行(本节稍后讨论); 2. `mstart`进入`mstart1`之后,会初始化自身用于信号处理的 g,在`mstartfn`指定时将其执行; 3. 调度循环`schedule`无法返回,因此最后一个`mexit`目前还不会被执行,因此当下所有的 Go 程序创建的线程都无法被释放 (只有一个特例,当使用`runtime.LockOSThread`锁住的 G 退出时会使用`gogo`退出 M,在本节稍后讨论)。 关于运行时信号处理,以及 note 同步机制,我们分别在[6.5 信号处理机制](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/signal)和[6.8 同步原语](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync)详细分析。 ### M 与 P 的绑定 M 与 P 的绑定过程只是简单的将 P 链表中的 P ,保存到 M 中的 P 指针上。 绑定前,P 的状态一定是`_Pidle`,绑定后 P 的状态一定为`_Prunning`。 ``` //go:yeswritebarrierrec func acquirep(_p_ *p) { // 此处不允许 write barrier wirep(_p_) (...) } //go:nowritebarrierrec //go:nosplit func wirep(_p_ *p) { _g_ := getg() (...) // 检查 m 是否正常,并检查要获取的 p 的状态 if _p_.m != 0 || _p_.status != _Pidle { (...) throw("wirep: invalid p state") } // 将 p 绑定到 m,p 和 m 互相引用 _g_.m.p.set(_p_) // *_g_.m.p = _p_ _p_.m.set(_g_.m) // *_p_.m = _g_.m // 修改 p 的状态 _p_.status = _Prunning } ``` ### M 的暂止和复始 无论出于什么原因,当 M 需要被暂止时,可能(因为还有其他暂止 M 的方法)会执行该调用。 此调用会将 M 进行暂止,并阻塞到它被复始时。这一过程就是工作线程的暂止和复始。 ``` // 停止当前 m 的执行,直到新的 work 有效 // 在包含要求的 P 下返回 func stopm() { _g_ := getg() (...) // 将 m 放回到 空闲列表中,因为我们马上就要暂止了 lock(&sched.lock) mput(_g_.m) unlock(&sched.lock) // 暂止当前的 M,在此阻塞,直到被唤醒 notesleep(&_g_.m.park) // 清除暂止的 note noteclear(&_g_.m.park) // 此时已经被复始,说明有任务要执行 // 立即 acquire P acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } ``` 它的流程也非常简单,将 m 放回至空闲列表中,而后使用 note 注册一个暂止通知, 阻塞到它重新被复始。 ## 6.4.2 核心调度 千辛万苦,我们终于来到了核心的调度逻辑。 ``` // 调度器的一轮:找到 runnable Goroutine 并进行执行且永不返回 func schedule() { _g_ := getg() (...) // m.lockedg 会在 LockOSThread 下变为非零 if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // 永不返回 } (...) top: if sched.gcwaiting != 0 { // 如果需要 GC,不再进行调度 gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() } var gp *g var inheritTime bool (...) // 正在 GC,去找 GC 的 g if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // 说明不在 GC // // 每调度 61 次,就检查一次全局队列,保证公平性 // 否则两个 Goroutine 可以通过互相 respawn 一直占领本地的 runqueue if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) // 从全局队列中偷 g gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { // 说明不在 gc // 两种情况: // 1. 普通取 // 2. 全局队列中偷不到的取 // 从本地队列中取 gp, inheritTime = runqget(_g_.m.p.ptr()) (...) } if gp == nil { // 如果偷都偷不到,则休眠,在此阻塞 gp, inheritTime = findrunnable() } // 这个时候一定取到 g 了 if _g_.m.spinning { // 如果 m 是自旋状态,则 // 1. 从自旋到非自旋 // 2. 在没有自旋状态的 m 的情况下,再多创建一个新的自旋状态的 m resetspinning() } if sched.disable.user && !schedEnabled(gp) { // Scheduling of this goroutine is disabled. Put it on // the list of pending runnable goroutines for when we // re-enable user scheduling and look again. lock(&sched.lock) if schedEnabled(gp) { // Something re-enabled scheduling while we // were acquiring the lock. unlock(&sched.lock) } else { sched.disable.runnable.pushBack(gp) sched.disable.n++ unlock(&sched.lock) goto top } } if gp.lockedm != 0 { // 如果 g 需要 lock 到 m 上,则会将当前的 p // 给这个要 lock 的 g // 然后阻塞等待一个新的 p startlockedm(gp) goto top } // 开始执行 execute(gp, inheritTime) } ``` 先不管上面究竟做了什么,我们直接看最后一句的`execute`。 ``` // 在当前 M 上调度 gp。 // 如果 inheritTime 为 true,则 gp 继承剩余的时间片。否则从一个新的时间片开始 // 永不返回。 // //go:yeswritebarrierrec func execute(gp *g, inheritTime bool) { _g_ := getg() // 将 g 正式切换为 _Grunning 状态 casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 // 抢占信号 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { _g_.m.p.ptr().schedtick++ } _g_.m.curg = gp gp.m = _g_.m // profiling 相关 hz := sched.profilehz if _g_.m.profilehz != hz { setThreadCPUProfiler(hz) } (...) // 终于开始执行了 gogo(&gp.sched) } ``` 当开始执行`execute`后,g 会被切换到`_Grunning`状态。 设置自身的抢占信号,将 m 和 g 进行绑定。 最终调用`gogo`开始执行。 在 amd64 平台下的实现: ``` // void gogo(Gobuf*) // 从 Gobuf 恢复状态; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8 MOVQ buf+0(FP), BX // 运行现场 MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX // 确认 g != nil get_tls(CX) MOVQ DX, g(CX) MOVQ gobuf_sp(BX), SP // 恢复 SP MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) // 清理,辅助 GC MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) MOVQ gobuf_pc(BX), BX // 获取 g 要执行的函数的入口地址 JMP BX // 开始执行 ``` 这个`gogo`的实现真是非常巧妙。初次阅读时,看到`JMP BX`开始执行 Goroutine 函数体 后就没了,简直一脸疑惑,就这么没了?后续调用怎么回到调度器呢? 事实上我们已经在[6.2 初始化](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/init)一节中看到过相关操作了: ``` func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) { siz := narg siz = (siz + 7) &^ 7 (...) totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize totalSize += -totalSize & (sys.SpAlign - 1) sp := newg.stack.hi - totalSize spArg := sp (...) memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) (...) } ``` 在执行`gostartcallfn`之前,栈帧状态为: ~~~ +--------+ | | --- --- newg.stack.hi +--------+ | | | | | | +--------+ | | | | | | siz +--------+ | | | | | | +--------+ | | | | | --- +--------+ | | | | +--------+ | totalSize = 4*sys.PtrSize + siz | | | +--------+ | | | | +--------+ | | | --- +--------+ 高地址 SP | | 假想的调用方栈帧 +--------+ --------------------------------------------- | | fn 栈帧 +--------+ | | 低地址 .... +--------+ PC | goexit | +--------+ ~~~ 当执行`gostartcallfn`后: ``` func gostartcallfn(gobuf *gobuf, fv *funcval) { var fn unsafe.Pointer if fv != nil { fn = unsafe.Pointer(fv.fn) } else { fn = unsafe.Pointer(funcPC(nilfunc)) } gostartcall(gobuf, fn, unsafe.Pointer(fv)) } func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) { sp := buf.sp if sys.RegSize > sys.PtrSize { sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = 0 } sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = buf.pc buf.sp = sp buf.pc = uintptr(fn) buf.ctxt = ctxt } ``` 此时保存的堆栈的情况如下: ~~~ +--------+ | | --- --- newg.stack.hi +--------+ | | | | | | +--------+ | | | | | | siz +--------+ | | | | | | +--------+ | | | | | --- +--------+ | | | | +--------+ | totalSize = 4*sys.PtrSize + siz | | | +--------+ | | | | +--------+ | | | --- +--------+ 高地址 | goexit | 假想的调用方栈帧 +--------+ --------------------------------------------- SP | | fn 栈帧 +--------+ | | 低地址 .... +--------+ PC | fn | +--------+ ~~~ 可以看到,在执行现场`sched.sp`保存的其实是`goexit`的地址。 那么也就是`JMP`跳转到 PC 寄存器处,开始执行`fn`。当`fn`执行完毕后,会将(假想的) 调用方`goexit`的地址恢复到 PC,从而达到执行`goexit`的目的: ``` // 在 Goroutine 返回 goexit + PCQuantum 时运行的最顶层函数。 TEXT runtime·goexit(SB),NOSPLIT,$0-0 BYTE $0x90 // NOP CALL runtime·goexit1(SB) // 不会返回 // traceback from goexit1 must hit code range of goexit BYTE $0x90 // NOP ``` 那么接下来就是去执行`goexit1`了: ``` // 完成当前 Goroutine 的执行 func goexit1() { (...) // 开始收尾工作 mcall(goexit0) } ``` 通过`mcall`完成`goexit0`的调用: ``` // func mcall(fn func(*g)) // 切换到 m->g0 栈, 并调用 fn(g). // Fn 必须永不返回. 它应该使用 gogo(&g->sched) 来持续运行 g TEXT runtime·mcall(SB), NOSPLIT, $0-4 MOVL fn+0(FP), DI get_tls(DX) MOVL g(DX), AX // 在 g->sched 中保存状态 MOVL 0(SP), BX // 调用方 PC MOVL BX, (g_sched+gobuf_pc)(AX) LEAL fn+0(FP), BX // 调用方 SP MOVL BX, (g_sched+gobuf_sp)(AX) MOVL AX, (g_sched+gobuf_g)(AX) // 切换到 m->g0 及其栈,调用 fn MOVL g(DX), BX MOVL g_m(BX), BX MOVL m_g0(BX), SI CMPL SI, AX // 如果 g == m->g0 要调用 badmcall JNE 3(PC) MOVL $runtime·badmcall(SB), AX JMP AX MOVL SI, g(DX) // g = m->g0 MOVL (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp PUSHL AX MOVL DI, DX MOVL 0(DI), DI CALL DI // 好了,开始调用 fn POPL AX MOVL $runtime·badmcall2(SB), AX JMP AX RET ``` 于是最终开始`goexit0`: ``` // goexit 继续在 g0 上执行 func goexit0(gp *g) { _g_ := getg() // 切换当前的 g 为 _Gdead casgstatus(gp, _Grunning, _Gdead) if isSystemGoroutine(gp, false) { atomic.Xadd(&sched.ngsys, -1) } // 清理 gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.paniconfault = false gp._defer = nil // 应该已经为 true,但以防万一 gp._panic = nil // Goexit 中 panic 则不为 nil, 指向栈分配的数据 gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { // 刷新 assist credit 到全局池。 // 如果应用在快速创建 Goroutine,这可以为 pacing 提供更好的信息。 scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes)) atomic.Xaddint64(&gcController.bgScanCredit, scanCredit) gp.gcAssistBytes = 0 } // 注意 gp 的栈 scan 目前开始变为 valid,因为它没有栈了 gp.gcscanvalid = true // 解绑 m 和 g dropg() if GOARCH == "wasm" { // wasm 目前还没有线程支持 // 将 g 扔进 gfree 链表中等待复用 gfput(_g_.m.p.ptr(), gp) // 再次进行调度 schedule() // 永不返回 } (...) // 将 g 扔进 gfree 链表中等待复用 gfput(_g_.m.p.ptr(), gp) if locked { // 该 Goroutine 可能在当前线程上锁住,因为它可能导致了不正常的内核状态 // 这时候 kill 该线程,而非将 m 放回到线程池。 // 此举会返回到 mstart,从而释放当前的 P 并退出该线程 if GOOS != "plan9" { // See golang.org/issue/22227. gogo(&_g_.m.g0.sched) } else { // 因为我们可能已重用此线程结束,在 plan9 上清除 lockedExt _g_.m.lockedExt = 0 } } // 再次进行调度 schedule() } ``` 退出的善后工作也相对简单,无非就是复位 g 的状态、解绑 m 和 g,将其 放入 gfree 链表中等待其他的 go 语句创建新的 g。 如果 Goroutine 将自身锁在同一个 OS 线程中且没有自行解绑则 m 会退出,而不会被放回到线程池中。 相反,会再次调用 gogo 切换到 g0 执行现场中,这也是目前唯一的退出 m 的机会,在本节最后解释。 ### 偷取工作 全局 g 链式队列中取 max 个 g ,其中第一个用于执行,max-1 个放入本地队列。 如果放不下,则只在本地队列中放下能放的。过程比较简单: ``` // 从全局队列中偷取,调用时必须锁住调度器 func globrunqget(_p_ *p, max int32) *g { // 如果全局队列中没有 g 直接返回 if sched.runqsize == 0 { return nil } // per-P 的部分,如果只有一个 P 的全部取 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } // 不能超过取的最大个数 if max > 0 && n > max { n = max } // 计算能不能在本地队列中放下 n 个 if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } // 修改本地队列的剩余空间 sched.runqsize -= n // 拿到全局队列队头 g gp := sched.runq.pop() // 计数 n-- // 继续取剩下的 n-1 个全局队列放入本地队列 for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp } ``` 从本地队列中取,首先看 next 是否有已经安排要运行的 g ,如果有,则返回下一个要运行的 g 否则,以 cas 的方式从本地队列中取一个 g。 如果是已经安排要运行的 g,则继承剩余的可运行时间片进行运行; 否则以一个新的时间片来运行。 ``` // 从本地可运行队列中获取 g // 如果 inheritTime 为 true,则 g 继承剩余的时间片 // 否则开始一个新的时间片。在所有者 P 上执行 func runqget(_p_ *p) (gp *g, inheritTime bool) { // 如果有 runnext,则为下一个要运行的 g for { // 下一个 g next := _p_.runnext // 没有,break if next == 0 { break } // 如果 cas 成功,则 g 继承剩余时间片执行 if _p_.runnext.cas(next, 0) { return next.ptr(), true } } // 没有 next for { // 本地队列是空,返回 nil h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, 与其他消费者同步 t := _p_.runqtail if t == h { return nil, false } // 从本地队列中以 cas 方式拿一个 gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, 提交消费 return gp, false } } } ``` 偷取(steal)的实现是一个非常复杂的过程。这个过程来源于我们 需要仔细的思考什么时候对调度器进行加锁、什么时候对 m 进行暂止、 什么时候将 m 从自旋向非自旋切换等等。 ``` // 寻找一个可运行的 Goroutine 来执行。 // 尝试从其他的 P 偷取、从全局队列中获取、poll 网络 func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // 这里的条件与 handoffp 中的条件必须一致: // 如果 findrunnable 将返回 G 运行,handoffp 必须启动 M. top: _p_ := _g_.m.p.ptr() // 如果在 gc,则暂止当前 m,直到复始后回到 top if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } // cgo 调用被终止,继续进入 if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // 取本地队列 local runq,如果已经拿到,立刻返回 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // 全局队列 global runq,如果已经拿到,立刻返回 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Poll 网络,优先级比从其他 P 中偷要高。 // 在我们尝试去其他 P 偷之前,这个 netpoll 只是一个优化。 // 如果没有 waiter 或 netpoll 中的线程已被阻塞,则可以安全地跳过它。 // 如果有任何类型的逻辑竞争与被阻塞的线程(例如它已经从 netpoll 返回,但尚未设置 lastpoll) // 该线程无论如何都将阻塞 netpoll。 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(false); !list.empty() { // 无阻塞 gp := list.pop() injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) (...) return gp, false } } // 从其他 P 中偷 work procs := uint32(gomaxprocs) // 获得 p 的数量 if atomic.Load(&sched.npidle) == procs-1 { // GOMAXPROCS = 1 或除了我们之外的所有人都已经 idle 了。 // 新的 work 可能出现在 syscall/cgocall/网络/timer返回时 // 它们均没有提交到本地运行队列,因此偷取没有任何意义。 goto stop } // 如果自旋状态下 m 的数量 >= busy 状态下 p 的数量,直接进入阻塞 // 该步骤是有必要的,它用于当 GOMAXPROCS>>1 时但程序的并行机制很慢时 // 昂贵的 CPU 消耗。 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } // 如果 m 是非自旋状态,切换为自旋 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { // 随机偷 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // 已经进入了 GC? 回到顶部,暂止当前的 m if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // 如果偷了两轮都偷不到,便优先查找 ready 队列 if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { // 总算偷到了,立即返回 return gp, false } } } stop: // 没有任何 work 可做。 // 如果我们在 GC mark 阶段,则可以安全的扫描并 blacken 对象 // 然后便有 work 可做,运行 idle-time 标记而非直接放弃当前的 P。 if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) (...) return gp, false } // 仅限于 wasm // 如果一个回调返回后没有其他 Goroutine 是苏醒的 // 则暂停执行直到回调被触发。 if beforeIdle() { // 至少一个 Goroutine 被唤醒 goto top } // 放弃当前的 P 之前,对 allp 做一个快照 // 一旦我们不再阻塞在 safe-point 时候,可以立刻在下面进行修改 allpSnapshot := allp // 准备归还 p,对调度器加锁 lock(&sched.lock) // 进入了 gc,回到顶部暂止 m if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } // 全局队列中又发现了任务 if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) // 赶紧偷掉返回 return gp, false } // 归还当前的 p if releasep() != _p_ { throw("findrunnable: wrong p") } // 将 p 放入 idle 链表 pidleput(_p_) // 完成归还,解锁 unlock(&sched.lock) // 这里要非常小心: // 线程从自旋到非自旋状态的转换,可能与新 Goroutine 的提交同时发生。 // 我们必须首先丢弃 nmspinning,然后再次检查所有的 per-P 队列(并在期间伴随 #StoreLoad 内存屏障) // 如果反过来,其他线程可以在我们检查了所有的队列、然后提交一个 Goroutine、再丢弃了 nmspinning // 进而导致无法复始一个线程来运行那个 Goroutine 了。 // 如果我们发现下面的新 work,我们需要恢复 m.spinning 作为重置的信号, // 以取消暂止新的工作线程(因为可能有多个 starving 的 Goroutine)。 // 但是,如果在发现新 work 后我们也观察到没有空闲 P,可以暂停当前线程 // 因为系统已满载,因此不需要自旋线程。 wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // 再次检查所有的 runqueue for _, _p_ := range allpSnapshot { // 如果这时本地队列不空 if !runqempty(_p_) { // 重新获取 p lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) // 如果能获取到 p if _p_ != nil { // 绑定 p acquirep(_p_) // 如果此前已经被切换为自旋 if wasSpinning { // 重新切换回非自旋 _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 这时候是有 work 的,回到顶部重新 find g goto top } // 看来没有 idle 的 p,不需要重新 find g 了 break } } // 再次检查 idle-priority GC work // 和上面重新找 runqueue 的逻辑类似 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } } // poll 网络 // 和上面重新找 runqueue 的逻辑类似 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { (...) gp := netpoll(true) // 阻塞到新的 work 有效为止 atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) (...) return gp, false } injectglist(gp) } } // 真的什么都没找到 // 暂止当前的 m stopm() goto top } ``` 在`findrunnable`这个过程中,我们: * 首先检查是是否正在进行 GC,如果是则暂止当前的 m 并阻塞休眠; * 尝试从本地队列中取 g,如果取到,则直接返回,否则继续从全局队列中找 g,如果找到则直接返回; * 检查是否存在 poll 网络的 g,如果有,则直接返回; * 如果此时仍然无法找到 g,则从其他 P 的本地队列中偷取; * 从其他 P 本地队列偷取的工作会执行四轮,在前两轮中只会查找 runnable 队列,后两轮则会优先查找 ready 队列,如果找到,则直接返回; * 所有的可能性都尝试过了,在准备暂止 m 之前,还要进行额外的检查; * 首先检查此时是否是 GC mark 阶段,如果是,则直接返回 mark 阶段的 g; * 如果仍然没有,则对当前的 p 进行快照,准备对调度器进行加锁; * 当调度器被锁住后,我们仍然还需再次检查这段时间里是否有进入 GC,如果已经进入了 GC,则回到第一步,阻塞 m 并休眠; * 当调度器被锁住后,如果我们又在全局队列中发现了 g,则直接返回; * 当调度器被锁住后,我们彻底找不到任务了,则归还释放当前的 P,将其放入 idle 链表中,并解锁调度器; * 当 M/P 已经解绑后,我们需要将 m 的状态切换出自旋状态,并减少 nmspinning; * 此时我们仍然需要重新检查所有的队列; * 如果此时我们发现有一个 P 队列不空,则立刻尝试获取一个 P,如果获取到,则回到第一步,重新执行偷取工作,如果取不到,则说明系统已经满载,无需继续进行调度; * 同样,我们还需要再检查是否有 GC mark 的 g 出现,如果有,获取 P 并回到第一步,重新执行偷取工作; * 同样,我们还需要再检查是否存在 poll 网络的 g,如果有,则直接返回; * 终于,我们什么也没找到,暂止当前的 m 并阻塞休眠。 ### M 的唤醒 我们已经看到了 M 的暂止和复始的过程,那么 M 的自旋到非自旋的过程如何发生? ``` func resetspinning() { _g_ := getg() (...) _g_.m.spinning = false nmspinning := atomic.Xadd(&sched.nmspinning, -1) (...) // M wakeup policy is deliberately somewhat conservative, so check if we // need to wakeup another P here. See "Worker thread parking/unparking" // comment at the top of the file for details. if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { wakep() } } // 尝试将一个或多个 P 唤醒来执行 G // 当 G 可能运行时(newproc, ready)时调用该函数 func wakep() { // 对自旋线程保守一些,必要时只增加一个 // 如果失败,则立即返回 if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) } // Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. // If spinning is set, the caller has incremented nmspinning and startm will // either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrierrec func startm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } (...) if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) } // 尝试从 midel 列表中获取一个 M // 调度器必须锁住 // 可能在 STW 期间运行,故不允许 write barrier //go:nowritebarrierrec func mget() *m { mp := sched.midle.ptr() if mp != nil { sched.midle = mp.schedlink sched.nmidle-- } return mp } ``` ### M 的创生 M 是通过`newm`来创生的,一般情况下,能够非常简单的创建, 某些特殊情况(线程状态被污染),M 的创建需要一个叫做模板线程的功能加以配合, 我们在[6.4 线程管理](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/thread)中详细讨论: ``` // 创建一个新的 m. 它会启动并调用 fn 或调度器 // fn 必须是静态、非堆上分配的闭包 // 它可能在 m.p==nil 时运行,因此不允许 write barrier //go:nowritebarrierrec func newm(fn func(), _p_ *p) { // 分配一个 m mp := allocm(_p_, fn) // 设置 p 用于后续绑定 mp.nextp.set(_p_) // 设置 signal mask mp.sigmask = initSigmask if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" { // 我们处于一个锁定的 M 或可能由 C 启动的线程。这个线程的内核状态可能 // 很奇怪(用户可能已将其锁定)。我们不想将其克隆到另一个线程。 // 相反,请求一个已知状态良好的线程来创建给我们的线程。 // // 在 plan9 上禁用,见 golang.org/issue/22227 lock(&newmHandoff.lock) if newmHandoff.haveTemplateThread == 0 { throw("on a locked thread with no template thread") } mp.schedlink = newmHandoff.newm newmHandoff.newm.set(mp) if newmHandoff.waiting { newmHandoff.waiting = false // 唤醒 m, 自旋到非自旋 notewakeup(&newmHandoff.wake) } unlock(&newmHandoff.lock) return } newm1(mp) } ``` ``` // Allocate a new m unassociated with any thread. // Can use p for allocation context if needed. // fn is recorded as the new m's m.mstartfn. // // This function is allowed to have write barriers even if the caller // isn't because it borrows _p_. // //go:yeswritebarrierrec func allocm(_p_ *p, fn func()) *m { _g_ := getg() _g_.m.locks++ // disable GC because it can be called from sysmon if _g_.m.p == 0 { acquirep(_p_) // temporarily borrow p for mallocs in this function } // Release the free M list. We need to do this somewhere and // this may free up a stack we can use. if sched.freem != nil { lock(&sched.lock) var newList *m for freem := sched.freem; freem != nil; { if freem.freeWait != 0 { next := freem.freelink freem.freelink = newList newList = freem freem = next continue } stackfree(freem.g0.stack) freem = freem.freelink } sched.freem = newList unlock(&sched.lock) } mp := new(m) mp.mstartfn = fn mcommoninit(mp) // In case of cgo or Solaris or Darwin, pthread_create will make us a stack. // Windows and Plan 9 will layout sched stack on OS stack. if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" { mp.g0 = malg(-1) } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) } mp.g0.m = mp if _p_ == _g_.m.p.ptr() { releasep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } return mp } ``` ``` func newm1(mp *m) { if iscgo { var ts cgothreadstart if _cgo_thread_start == nil { throw("_cgo_thread_start missing") } ts.g.set(mp.g0) ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0])) ts.fn = unsafe.Pointer(funcPC(mstart)) if msanenabled { msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts)) } execLock.rlock() // Prevent process clone. asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts)) execLock.runlock() return } execLock.rlock() // Prevent process clone. newosproc(mp) execLock.runlock() } ``` 当 m 被创建时,会转去运行`mstart`: * 如果当前程序为 cgo 程序,则会通过`asmcgocall`来创建线程并调用`mstart`(在[10.4 cgo](https://golang.design/under-the-hood/zh-cn/part2runtime/ch10abi/cgo)中讨论) * 否则会调用`newosproc`来创建线程,从而调用`mstart`。 既然是`newosproc`,我们此刻仍在 Go 的空间中,那么实现就是操作系统特定的了, ##### 系统线程的创建 (darwin) ``` // 可能在 m.p==nil 情况下运行,因此不允许 write barrier //go:nowritebarrierrec func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) (...) // 初始化 attribute 对象 var attr pthreadattr var err int32 err = pthread_attr_init(&attr) if err != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } // 设置想要使用的栈大小。目前为 64KB const stackSize = 1 << 16 if pthread_attr_setstacksize(&attr, stackSize) != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } // 通知 pthread 库不会 join 这个线程。 if pthread_attr_setdetachstate(&attr, _PTHREAD_CREATE_DETACHED) != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } // 最后创建线程,在 mstart_stub 开始,进行底层设置并调用 mstart var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp)) sigprocmask(_SIG_SETMASK, &oset, nil) if err != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } } ``` `pthread_create`在[10.1 参与运行时的系统调用](https://golang.design/under-the-hood/zh-cn/part2runtime/ch10abi/syscallrt)中讨论。 ##### 系统线程的创建 (linux) 而 linux 上的情况就乐观的多了: ``` // May run with m.p==nil, so write barriers are not allowed. //go:nowritebarrier func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) (...) // 在 clone 期间禁用信号,以便新线程启动时信号被禁止。 // 他们会在 minit 中重新启用。 var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) (...) } ``` `clone`是系统调用,我们在[10.1 参与运行时的系统调用](https://golang.design/under-the-hood/zh-cn/part2runtime/ch10abi/syscallrt)中讨论 这些系统调用在 Go 中的实现方式。 ### M/G 解绑 `dropg`听起来很玄乎,但实际上就是指将当前 g 的 m 置空、将当前 m 的 g 置空,从而完成解绑: ``` // dropg 移除 m 与当前 Goroutine m->curg(简称 gp )之间的关联。 // 通常,调用方将 gp 的状态设置为非 _Grunning 后立即调用 dropg 完成工作。 // 调用方也有责任在 gp 将使用 ready 时重新启动时进行相关安排。 // 在调用 dropg 并安排 gp ready 好后,调用者可以做其他工作,但最终应该 // 调用 schedule 来重新启动此 m 上的 Goroutine 的调度。 func dropg() { _g_ := getg() setMNoWB(&_g_.m.curg.m, nil) setGNoWB(&_g_.m.curg, nil) } // setMNoWB 当使用 muintptr 不可行时,在没有 write barrier 下执行 *mp = new //go:nosplit //go:nowritebarrier func setMNoWB(mp **m, new *m) { (*muintptr)(unsafe.Pointer(mp)).set(new) } // setGNoWB 当使用 guintptr 不可行时,在没有 write barrier 下执行 *gp = new //go:nosplit //go:nowritebarrier func setGNoWB(gp **g, new *g) { (*guintptr)(unsafe.Pointer(gp)).set(new) } ``` ### M 的死亡 我们已经多次提到过 m 当且仅当它所运行的 Goroutine 被锁定在该 m 且 Goroutine 退出后, m 才会退出。我们来看一看它的原因。 首先,我们已经知道调度循环会一直进行下去永远不会返回了: ``` func mstart() { (...) mstart1() // 永不返回 (...) mexit(osStack) } ``` 那`mexit`究竟什么时候会被执行? 事实上,在`mstart1`中: ``` func mstart1() { (...) // 为了在 mcall 的栈顶使用调用方来结束当前线程,做记录 // 当进入 schedule 之后,我们再也不会回到 mstart1,所以其他调用可以复用当前帧。 save(getcallerpc(), getcallersp()) (...) } ``` `save`记录了调用方的 pc 和 sp,而对于`save`: ``` // getcallerpc 返回它调用方的调用方程序计数器 PC program conter // getcallersp 返回它调用方的调用方的栈指针 SP stack pointer // 实现由编译器内建,在任何平台上都没有实现它的代码 // // 例如: // // func f(arg1, arg2, arg3 int) { // pc := getcallerpc() // sp := getcallersp() // } // // 这两行会寻找调用 f 的 PC 和 SP // // 调用 getcallerpc 和 getcallersp 必须被询问的帧中完成 // // getcallersp 的结果在返回时是正确的,但是它可能会被任何随后调用的函数无效, // 因为它可能会重新定位堆栈,以使其增长或缩小。一般规则是,getcallersp 的结果 // 应该立即使用,并且只能传递给 nosplit 函数。 //go:noescape func getcallerpc() uintptr //go:noescape func getcallersp() uintptr // implemented as an intrinsic on all platforms // save 更新了 getg().sched 的 pc 和 sp 的指向,并允许 gogo 能够恢复到 pc 和 sp // // save 不允许 write barrier 因为 write barrier 会破坏 getg().sched // //go:nosplit //go:nowritebarrierrec func save(pc, sp uintptr) { _g_ := getg() // 保存当前运行现场 _g_.sched.pc = pc _g_.sched.sp = sp _g_.sched.lr = 0 _g_.sched.ret = 0 // 保存 g _g_.sched.g = guintptr(unsafe.Pointer(_g_)) // 我们必须确保 ctxt 为零,但这里不允许 write barrier。 // 所以这里只是做一个断言 if _g_.sched.ctxt != nil { badctxt() } } ``` 由于`mstart/mstart1`是运行在 g0 上的,因此`save`将保存`mstart`的运行现场保存到`g0.sched`中。 当调度循环执行到`goexit0`时,会检查 m 与 g 之间是否被锁住: ``` func goexit0(gp *g) { (...) gfput(_g_.m.p.ptr(), gp) if locked { if GOOS != "plan9" { gogo(&_g_.m.g0.sched) } } schedule() } ``` 如果 g 锁在当前 m 上,则调用`gogo`恢复到`g0.sched`的执行现场,从而恢复到`mexit`调用。 最后来看`mexit`: ``` // mexit 销毁并退出当前线程 // // 请不要直接调用来退出线程,因为它必须在线程栈顶上运行。 // 相反,请使用 gogo(&_g_.m.g0.sched) 来解除栈并退出线程。 // // 当调用时,m.p != nil。因此可以使用 write barrier。 // 在退出前它会释放当前绑定的 P。 // //go:yeswritebarrierrec func mexit(osStack bool) { g := getg() m := g.m if m == &m0 { // 主线程 // // 在 linux 中,退出主线程会导致进程变为僵尸进程。 // 在 plan 9 中,退出主线程将取消阻塞等待,即使其他线程仍在运行。 // 在 Solaris 中我们既不能 exitThread 也不能返回到 mstart 中。 // 其他系统上可能发生别的糟糕的事情。 // // 我们可以尝试退出之前清理当前 M ,但信号处理非常复杂 handoffp(releasep()) // 让出 P lock(&sched.lock) // 锁住调度器 sched.nmfreed++ checkdead() unlock(&sched.lock) notesleep(&m.park) // 暂止主线程,在此阻塞 throw("locked m0 woke up") } sigblock() unminit() // 释放 gsignal 栈 if m.gsignal != nil { stackfree(m.gsignal.stack) } // 将 m 从 allm 中移除 lock(&sched.lock) for pprev := &allm; *pprev != nil; pprev = &(*pprev).alllink { if *pprev == m { *pprev = m.alllink goto found } } // 如果没找到则是异常状态,说明 allm 管理出错 throw("m not found in allm") found: if !osStack { // Delay reaping m until it's done with the stack. // // If this is using an OS stack, the OS will free it // so there's no need for reaping. atomic.Store(&m.freeWait, 1) // Put m on the free list, though it will not be reaped until // freeWait is 0. Note that the free list must not be linked // through alllink because some functions walk allm without // locking, so may be using alllink. m.freelink = sched.freem sched.freem = m } unlock(&sched.lock) // Release the P. handoffp(releasep()) // After this point we must not have write barriers. // Invoke the deadlock detector. This must happen after // handoffp because it may have started a new M to take our // P's work. lock(&sched.lock) sched.nmfreed++ checkdead() unlock(&sched.lock) if osStack { // Return from mstart and let the system thread // library free the g0 stack and terminate the thread. return } // mstart is the thread's entry point, so there's nothing to // return to. Exit the thread directly. exitThread will clear // m.freeWait when it's done with the stack and the m can be // reaped. exitThread(&m.freeWait) } ``` 可惜`exitThread`在 darwin 上还是没有定义: ``` // 未在 darwin 上使用,但必须定义 func exitThread(wait *uint32) { } ``` 在 Linux amd64 上: ``` // func exitThread(wait *uint32) TEXT runtime·exitThread(SB),NOSPLIT,$0-8 MOVQ wait+0(FP), AX // 栈使用完毕 MOVL $0, (AX) MOVL $0, DI // exit code MOVL $SYS_exit, AX SYSCALL // 甚至连栈都没有了 INT $3 JMP 0(PC) ``` 从实现上可以看出,只有 linux 中才可能正常的退出一个栈,而 darwin 只能保持暂止了。 而如果是主线程,则会始终保持 park。 ## 小结 我们已经看过了整个调度器的设计,图 1 纵观了整个调度循环: ![](https://golang.design/under-the-hood/assets/schedule.png)**图 1:调度器调度循环纵览** 那么,很自然的能够想到这个流程中存在两个问题: 1. `findRunnableGCWorker`在干什么? 2. 调度循环看似合理,但如果 G 执行时间过长,难道要等到 G 执行完后再调度其他的 G?显然不符合实际情况,那么到底会发生什么事情?