6.5 Goroutine

Go 语言在并发编程方面有着非常强大的能力,这也离不开语言层面对并发编程的支持,我们会在 Go 语言中使用 Goroutine 并行执行任务并将 Channel 作为 Goroutine 之间的通信方式,虽然使用互斥锁和共享内存在 Go 语言中也可以完成 Goroutine 间的通信,但是使用 Channel 才是更推荐的做法 — 不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

golang-goroutine-and-channe

我们在这一节中不仅会介绍 Go 语言中的协程 Goroutine 的数据结构和实现原理,还会介绍调度器是如何对 Goroutine 进行调度的,其中包括调度的时机和过程。

概述

谈到 Go 语言的调度器,我们不得不提的就是操作系统、进程与线程这些概念,操作系统其实为我们提供的 POSIX API 中就包含对 线程) 的相关操作,线程其实也是操作系统在做调度时的最基本单元,线程和进程的实现在不同操作系统上也有所不同,但是在大多数的实现中线程都是进程的一个组件:

process-and-threads

多个线程可以存在于同一个进程中并共享了同一片内存空间,由于不需要创建新的虚拟内存空间,所以它们也不需要内存管理单元处理上下文的切换,线程之前的通信也正是基于共享的内存进行的,相比于重量级的进程,线程显得比较轻量,所以我们可以在一个进程中创建出多个线程。

虽然线程相对进程比较轻量,但是线程仍然会占用较多的资源并且调度时也会造成比较大的额外开销,每个线程会都占用 1M 以上的内存空间,在对线程进行切换时不止会消耗较多的内存空间,对寄存器中的内容进行恢复还需要向操作系统申请或者销毁对应的资源,每一次线程上下文的切换都需要消耗 ~1us 左右的时间,但是 Go 调度器对 Goroutine 的上下文切换 ~0.2us,减少了 80% 的额外开销。除了减少上下文切换带来的开销,Golang 的调度器还能够更有效地利用 CPU 的缓存。

goroutines-on-thread

Go 语言的调度器其实就是通过使用数量合适的线程并在每一个线程上执行更多的工作来降低操作系统和硬件的负载。

数据结构

相信各位读者已经对 Go 语言调度相关的数据结构已经非常熟悉了,但是我们在这里还是要简单回顾一下其中的三个组成部分线程 M、协程 G 和处理器 P:

golang-schedule

  • M — 表示操作系统的线程,它是被操作系统管理的线程,与 POSIX 中的标准线程非常类似;
  • G — 表示 Goroutine,每一个 Goroutine 都包含堆栈、指令指针和其他用于调度的重要信息;
  • P — 表示调度的上下文,它可以被看做一个运行于线程 M 上的本地调度器;我们会在这一节中分别介绍不同的组成部分,详细介绍它们的基本作用、数据结构以及在运行期间可能处于的状态。

G

Go 语言中并发的执行单元其实就是 Goroutine,它很像操作系统中的线程,但是占用了更小的内存空间并降低了 Goroutine 切换的开销。

Goroutine 只存在于 Go 语言的运行时,它是 Go 语言在用户态为我们提供的『线程』,如果一个 Goroutine 由于 IO 操作而陷入阻塞,操作系统并不会对上下文进行切换,但是 Go 语言的调度器会将陷入阻塞 Goroutine 『切换』下去等待系统调用结束并让出计算资源,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的 CPU。

结构体

Goroutine 在 Go 语言运行时使用一个名为 g 的私有结构体表示,这个私有结构体非常复杂,总共有 40 多个用于表示各种状态的成员变量,我们在这里也不能介绍全部的属性,而是会挑选其中的一部分重点进行介绍,我们可以在 runtime2.go#L387-L450 文件中查看 g 结构体的全部属性:

  1. type g struct {
  2. m *m // current m; offset known to arm liblink
  3. sched gobuf
  4. syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
  5. syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
  6. param unsafe.Pointer // passed parameter on wakeup
  7. atomicstatus uint32
  8. goid int64
  9. schedlink guintptr
  10. waitsince int64 // approx time when the g become blocked
  11. waitreason waitReason // if status==Gwaiting
  12. preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
  13. lockedm muintptr
  14. writebuf []byte
  15. sigcode0 uintptr
  16. sigcode1 uintptr
  17. sigpc uintptr
  18. gopc uintptr // pc of go statement that created this goroutine
  19. startpc uintptr // pc of goroutine function
  20. waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
  21. }

为了减少无关的干扰项,我们在这里删除了跟堆栈以及追踪相关的字段,剩下的都是 g 结构体中比较重要的字段。

状态

结构体 g 的字段 atomicstatus 就存储了当前 Goroutine 的状态,runtime2.go 文件中定义了 Goroutine 全部可能存在的状态,除了几个已经不被使用的以及与 GC 相关的状态之外,全部常见的状态都展示在这里:

状态描述
_Gidle刚刚被分配并且还没有被初始化
_Grunnable没有执行代码、没有栈的所有权、存储在运行队列中
_Grunning可以执行代码、拥有栈的所有权,被赋予了内核线程 M 和处理器 P
_Gsyscall正在执行系统调用、拥有栈的所有权、没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
_Gwaiting由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
_Gdead没有被使用,没有执行代码,可能有分配的栈
_Gcopystack栈正在被拷贝、没有执行代码、不在运行队列上

上述状态中比较常见是 _Grunnable_Grunning_Gsyscall_Gwaiting 四个状态,我们在这里也会重点介绍这几个状态,Goroutine 中所有状态的迁移是一个非常复杂的过程,会触发 Goroutine 状态迁移的方法也非常多,在这里我们也没有办法介绍全部的迁移线路,我们会从其中选择一些进行介绍。

goroutine-status

虽然 Goroutine 在运行时中定义的状态非常多而且复杂,但是我们可以将这些不同的状态聚合成最终的三种:等待中、可运行、运行中,在运行期间我们会在这三种不同的状态来回切换:

  • 等待中:表示当前 Goroutine 等待某些条件满足后才会继续执行,例如当前 Goroutine 正在执行系统调用或者同步操作;
  • 可运行:表示当前 Goroutine 等待在某个 M 执行 Goroutine 的指令,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间;
  • 运行中:表示当前 Goroutine 正在某个 M 上执行指令;

M

Go 语言并发模型中的 M 其实表示的是操作系统线程,在默认情况下调度器能够允许创建 10000 个线程,但是其中绝大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS 个线程 M 能够正常运行。

所有 Golang 程序中的最大『可运行』线程数其实就等于 GOMAXPROCS 这个变量的值;在默认情况下,它会被设置成当前应用的核数,我们也可以使用 runtime.GOMAXPROCS 方法来改变当前程序中最大的线程数。

scheduler-m-and-thread

在默认情况下,一个四核机器上会创建四个操作系统线程,每一个线程其实都是一个 m 结构体,我们也可以通过 runtime.GOMAXPROCS 改变最大可运行线程的数量,我们可以使用 runtime.GOMAXPROCS(3) 将 Go 程序中的线程数改变成 3 个。

在大多数情况下,我们都会使用 Go 的默认设置,也就是 #thread == #CPU,在这种情况下不会触发操作系统级别的线程调度和上下文切换,所有的调度都会发生在用户态,由 Go 语言调度器触发,能够减少非常多的额外开销。

结构体

操作系统线程在 Go 语言中就会使用私有结构体 m 来表示,这个结构体中也包含了几十个私有的字段,我们这里还是对其进行了简单的删减,感兴趣的读者可以查看 runtime2.go#L452-L521 了解更多的内容:

  1. type m struct {
  2. g0 *g // goroutine with scheduling stack
  3. curg *g // current running goroutine
  4. ...
  5. }

其中 g0 是持有调度堆栈的 Goroutine,curg 是在当前线程上运行的 Goroutine,这也是作为操作系统线程唯一关心的两个 Goroutine 了。

P

Go 语言调度器中的最后一个重要结构就是处理器 P,其实就是线程需要的上下文环境,也是用于处理代码逻辑的处理器,通过处理器 P 的调度,每一个内核线程 M 都能够执行多个 G,这样就能在 G 进行一些 IO 操作时及时对它们进行切换,提高 CPU 的利用率。

每一个 Go 语言程序中所以处理器的数量一定会等于 GOMAXPROCS,这是因为调度器在启动时就会创建 GOMAXPROCS 个处理器 P,这些处理器会绑定到不同的线程 M 上并为它们调度 Goroutine。

结构体

处理器在 Go 语言运行时中同样使用私有结构体 p 表示,作为调度器的内部实现,它包含的字段也非常多,我们在这里就简单展示一下结构体中的大致内容,感兴趣的读者可以查看 runtime2.go#L523-L602)

  1. type p struct {
  2. id int32
  3. status uint32 // one of pidle/prunning/...
  4. link puintptr
  5. schedtick uint32 // incremented on every scheduler call
  6. syscalltick uint32 // incremented on every system call
  7. sysmontick sysmontick // last tick observed by sysmon
  8. m muintptr // back-link to associated m (nil if idle)
  9. mcache *mcache
  10. runqhead uint32
  11. runqtail uint32
  12. runq [256]guintptr
  13. runnext guintptr
  14. sudogcache []*sudog
  15. sudogbuf [128]*sudog
  16. ...
  17. }

我们将结构体中 GC 以及用于追踪调试的字段全部删除以简化这里需要展示的属性,在上述字段中,status 表示了当前处理器的状态,runheadrunqtailrunq 以及 runnext 等字段表示处理器持有的运行队列,运行队列中就包含待执行的 Goroutine 列表。

状态

p 结构体中的状态 status 其实就会是以下五种状态其中的一种,我们能在 runtime2.go#L99-L147 文件中找到处理器 P 的全部状态:

状态描述
_Pidle处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空
_Prunning被线程 M 持有,并且正在执行用户代码或者调度器
_Psyscall没有执行用户代码,当前线程陷入系统调用
_Pgcstop被线程 M 持有,当前处理器由于垃圾回收被停止
_Pdead当前处理器已经不被使用

通过分析处理器 P 的这些状态,我们其实能够对处理器的工作过程有一些简单的理解,例如处理器在执行用户代码时会处于 _Prunning 状态,在当前线程执行 IO 操作时会陷入 _Psyscall 状态。

小结

我们在这一小节中简单介绍了 Go 语言调度器中常见的数据结构,包括线程 M、处理器 P 和 Goroutine G,它们在 Go 语言运行时中分别使用不同的私有结构体表示,我们在下面的小节中就会深入介绍 Go 语言调度器的实现原理。

实现原理

这里会以 Go 语言中 Goroutine 的相关操作为入口,详细介绍 Goroutine 的不同状态、它是如何被创建和销毁的以及调度器的启动过程。

创建 Goroutine

想要启动一个新的 Goroutine 来执行任务时,我们需要使用 Go 语言中的 go 关键字,这个关键字会在编译期间通过以下方法 stmtcall 两个方法将该关键字转换成 newproc 函数调用,代码的路径和原理与 defer 关键字几乎完全相同,两者的区别也只是 defer 被转化成 deferprocgo 被转换成 newproc 方法:

  1. func (s *state) stmt(n *Node) {
  2. switch n.Op {
  3. case OGO:
  4. s.call(n.Left, callGo)
  5. }
  6. }
  7. func (s *state) call(n *Node, k callKind) *ssa.Value {
  8. // ...
  9. if k == callDeferStack {
  10. // ...
  11. } else {
  12. switch {
  13. case k == callGo:
  14. call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())
  15. default:
  16. }
  17. }
  18. // ...
  19. return ...
  20. }

经过了 Go 语言的 中间代码生成 的过程,所有的 go 关键字都会被转换成 newproc 函数调用,我们向 newproc 中传入一个表示函数的指针 funcval,在这个函数中我们还会获取当前调用 newproc 函数的 Goroutine 以及调用方的程序计数器 PC,然后调用 newproc1 函数:

  1. func newproc(siz int32, fn *funcval) {
  2. argp := add(unsafe.Pointer(&fn), sys.PtrSize)
  3. gp := getg()
  4. pc := getcallerpc()
  5. newproc1(fn, (*uint8)(argp), siz, gp, pc)
  6. }

newproc1 函数的主要作用就是创建一个运行传入参数 fng 结构体,在这个方法中我们也会拷贝当前方法的全部参数,argpnarg 共同表示函数 fn 的入参,我们在该方法中其实也会直接将所有参数对应的内存空间整片的拷贝到新 Goroutine 的栈上。

  1. func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
  2. _g_ := getg()
  3. siz := narg
  4. siz = (siz + 7) &^ 7
  5. _p_ := _g_.m.p.ptr()
  6. newg := gfget(_p_)
  7. if newg == nil {
  8. newg = malg(_StackMin)
  9. casgstatus(newg, _Gidle, _Gdead)
  10. allgadd(newg)
  11. }
  12. totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
  13. totalSize += -totalSize & (sys.SpAlign - 1)
  14. sp := newg.stack.hi - totalSize
  15. spArg := sp
  16. if narg > 0 {
  17. memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
  18. }
  19. memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
  20. newg.sched.sp = sp
  21. newg.stktopsp = sp
  22. newg.sched.pc = funcPC(goexit) + sys.PCQuantum
  23. newg.sched.g = guintptr(unsafe.Pointer(newg))
  24. gostartcallfn(&newg.sched, fn)
  25. newg.gopc = callerpc
  26. newg.startpc = fn.fn
  27. if isSystemGoroutine(newg, false) {
  28. atomic.Xadd(&sched.ngsys, +1)
  29. }
  30. casgstatus(newg, _Gdead, _Grunnable)
  31. newg.goid = int64(_p_.goidcache)
  32. _p_.goidcache++
  33. runqput(_p_, newg, true)
  34. if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
  35. wakep()
  36. }
  37. }

newproc1 函数的执行过程其实可以分成以下的几个步骤:

  • 获取当前 Goroutine 对应的处理器 P 并从它的列表中取出一个空闲的 Goroutine,如果当前不存在空闲的 Goroutine,就会通过 malg 方法重新分配一个 g 结构体并将它的状态从 _Gidle 转换成 _Gdead
  • 获取新创建 Goroutine 的堆栈并直接通过 memmove 将函数 fn 需要的参数全部拷贝到栈中;
  • 初始化新 Goroutine 的栈指针、程序计数器、调用方程序计数器等属性;
  • 将新 Goroutine 的状态从 _Gdead 切换成 _Grunnable 并设置 Goroutine 的标识符(goid);
  • runqput 函数会将新的 Goroutine 添加到处理器 P 的结构体中;
  • 如果符合条件,当前函数会通过 wakep 来添加一个新的 p 结构体来执行 Goroutine;

获取结构体

在这个过程中我们会有两种不同的方法获取一个新的 g 结构体,一种情况是直接从当前 Goroutine 所在处理器的 p.gFree 列表或者调度器的 sched.gFree 列表中获取 g 结构体,另一种方式就是通过 malg 方法生成一个新的 g 结构体并将当前结构体追加到全局的 Goroutine 列表 allgs 中。

golang-newproc-get-goroutine

gfget 函数中包含两部分逻辑,当处理器结构体中的空闲 Goroutine 列表已经为空时就会从就会将调度器持有的空闲 Goroutine 转移到当前处理器上,当 Goroutine 数量充足时它会将当前处理器的空闲 Goroutine 数量『装载』到 32 个,随后 gfget 函数就会从当前非空的 gFree 列表中获取空闲的 Goroutine 了:

  1. func gfget(_p_ *p) *g {
  2. retry:
  3. if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
  4. lock(&sched.gFree.lock)
  5. for _p_.gFree.n < 32 {
  6. gp := sched.gFree.stack.pop()
  7. if gp == nil {
  8. gp = sched.gFree.noStack.pop()
  9. if gp == nil {
  10. break
  11. }
  12. }
  13. sched.gFree.n--
  14. _p_.gFree.push(gp)
  15. _p_.gFree.n++
  16. }
  17. unlock(&sched.gFree.lock)
  18. goto retry
  19. }
  20. gp := _p_.gFree.pop()
  21. if gp == nil {
  22. return nil
  23. }
  24. _p_.gFree.n--
  25. if gp.stack.lo == 0 {
  26. gp.stack = stackalloc(_FixedStack)
  27. gp.stackguard0 = gp.stack.lo + _StackGuard
  28. }
  29. return gp
  30. }

当然调度器的 gFree 和当前处理器的 gFree 列表都为空时,我们就会调用 malg 方法初始化一个新的 g 结构体,如果申请的堆栈大小大于 0,在这里我们就会通过 stackalloc 初始化一片栈空间,栈空间的大小一般是 1KB:

  1. func malg(stacksize int32) *g {
  2. newg := new(g)
  3. if stacksize >= 0 {
  4. stacksize = round2(_StackSystem + stacksize)
  5. newg.stack = stackalloc(uint32(stacksize))
  6. newg.stackguard0 = newg.stack.lo + _StackGuard
  7. newg.stackguard1 = ^uintptr(0)
  8. }
  9. return newg
  10. }

这也就是 newproc1 获取 g 结构体的两种不同路径,通过 malg 获取的结构体会被存储到全局变量 allgs 中。

运行队列

新创建的 Goroutine 在大多数情况下都可以通过调用 runqput 函数将当前 Goroutine 添加到处理器 P 的运行队列上,该运行队列是一个使用数组构成的环形链表,其中最多能够存储 256 个指向 Goroutine 的指针,除了 runq 中能够存储待执行的 Goroutine 之外,runnext 指针中也可以存储 Goroutine,runnext 指向的 Goroutine 会成为下一个被运行的 Goroutine:

  1. func runqput(_p_ *p, gp *g, next bool) {
  2. if next {
  3. retryNext:
  4. oldnext := _p_.runnext
  5. if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
  6. goto retryNext
  7. }
  8. if oldnext == 0 {
  9. return
  10. }
  11. gp = oldnext.ptr()
  12. }
  13. retry:
  14. h := atomic.LoadAcq(&_p_.runqhead)
  15. t := _p_.runqtail
  16. if t-h < uint32(len(_p_.runq)) {
  17. _p_.runq[t%uint32(len(_p_.runq))].set(gp)
  18. atomic.StoreRel(&_p_.runqtail, t+1)
  19. return
  20. }
  21. if runqputslow(_p_, gp, h, t) {
  22. return
  23. }
  24. goto retry
  25. }
  • next=true 时将 Goroutine 设置到处理器的 runnext 上作为下一个被当前处理器执行的 Goroutine;
  • next=false 并且运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;
  • 当处理器的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runqputslow 添加到调度器持有的全局运行队列上;golang-runnable-queue

简单总结一下,Go 语言中有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列存储 Goroutine。

Goroutine 调度

在 Go 语言程序的运行期间,所有触发 Goroutine 调度的方式最终都会调用 gopark 函数让出当前处理器 P 的控制权,gopark 函数中会更新当前处理器的状态并在处理器上设置该 Goroutine 的等待原因:

  1. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
  2. mp := acquirem()
  3. gp := mp.curg
  4. mp.waitlock = lock
  5. mp.waitunlockf = unlockf
  6. gp.waitreason = reason
  7. mp.waittraceev = traceEv
  8. mp.waittraceskip = traceskip
  9. releasem(mp)
  10. mcall(park_m)
  11. }

上述函数中调用的 park_m 函数会将当前 Goroutine 的状态从 _Grunning 切换至 _Gwaiting 并调用 waitunlockf 函数进行解锁,如果解锁失败就会将该 Goroutine 的状态切换回 _Grunning 并重新执行:

  1. func park_m(gp *g) {
  2. _g_ := getg()
  3. casgstatus(gp, _Grunning, _Gwaiting)
  4. dropg()
  5. if fn := _g_.m.waitunlockf; fn != nil {
  6. ok := fn(gp, _g_.m.waitlock)
  7. _g_.m.waitunlockf = nil
  8. _g_.m.waitlock = nil
  9. if !ok {
  10. casgstatus(gp, _Gwaiting, _Grunnable)
  11. execute(gp, true) // Schedule it back, never returns.
  12. }
  13. }
  14. schedule()
  15. }

在大多数情况下都会调用 schedule 触发一次 Goroutine 调度,这个函数的主要作用就是从不同的地方查找待执行的 Goroutine:

  1. func schedule() {
  2. _g_ := getg()
  3. top:
  4. var gp *g
  5. var inheritTime bool
  6. if gp == nil {
  7. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
  8. lock(&sched.lock)
  9. gp = globrunqget(_g_.m.p.ptr(), 1)
  10. unlock(&sched.lock)
  11. }
  12. }
  13. if gp == nil {
  14. gp, inheritTime = runqget(_g_.m.p.ptr())
  15. if gp != nil && _g_.m.spinning {
  16. throw("schedule: spinning with local work")
  17. }
  18. }
  19. if gp == nil {
  20. gp, inheritTime = findrunnable() // blocks until work is available
  21. }
  22. execute(gp, inheritTime)
  23. }
  • 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,有一定几率会从全局的运行队列中查找对应的 Goroutine;
  • 从当前处理器本地的运行队列中查找待执行的 Goroutine;
  • 如果前两种方法都没有找到 Goroutine,就会通过 findrunnable 进行查找,这个函数的实现相对比较复杂,它会尝试从其他处理器上取出一部分 Goroutine,如果没有可执行的任务就会阻塞直到条件满足;findrunnable 函数会再次从本地运行队列、全局运行队列、网络轮训器和其他的处理器中获取待执行的任务,该方法一定会返回待执行的 Goroutine,否则就会一直阻塞。

获取可以执行的任务之后就会调用 execute 函数执行该 Goroutine,执行的过程中会先将其状态修改成 _Grunning、与线程 M 建立起双向的关系并调用 gogo 触发调度。

  1. func execute(gp *g, inheritTime bool) {
  2. _g_ := getg()
  3. casgstatus(gp, _Grunnable, _Grunning)
  4. gp.waitsince = 0
  5. gp.preempt = false
  6. gp.stackguard0 = gp.stack.lo + _StackGuard
  7. if !inheritTime {
  8. _g_.m.p.ptr().schedtick++
  9. }
  10. _g_.m.curg = gp
  11. gp.m = _g_.m
  12. gogo(&gp.sched)
  13. }

gogo 在不同处理器架构上的实现都不相同,但是不同的实现其实也大同小异,下面是该函数在 386 架构上的实现:

  1. TEXT runtime·gogo(SB), NOSPLIT, $8-4
  2. MOVL buf+0(FP), BX // gobuf
  3. MOVL gobuf_g(BX), DX
  4. MOVL 0(DX), CX // make sure g != nil
  5. get_tls(CX)
  6. MOVL DX, g(CX)
  7. MOVL gobuf_sp(BX), SP // restore SP
  8. MOVL gobuf_ret(BX), AX
  9. MOVL gobuf_ctxt(BX), DX
  10. MOVL $0, gobuf_sp(BX) // clear to help garbage collector
  11. MOVL $0, gobuf_ret(BX)
  12. MOVL $0, gobuf_ctxt(BX)
  13. MOVL gobuf_pc(BX), BX
  14. JMP BX

这个函数会从 gobuf 中取出 Goroutine 指针、栈指针、返回值、上下文以及程序计数器并将通过 JMP 指令跳转至 Goroutine 应该继续执行代码的位置。

系统调用

系统调用对于 Go 语言调度器的调度也有比较大的影响,为了处理这些特殊的系统调用,我们甚至专门在 Goroutine 中加入了 _Gsyscall 这一状态,Go 语言通过 SyscallRawsyscall 等使用汇编语言编写的方法封装了操作系统提供的所有系统调用,其中 Syscall 在 Linux 386 上的实现如下:

  1. TEXT ·Syscall(SB),NOSPLIT,$0-28
  2. CALL runtime·entersyscall(SB)
  3. MOVL trap+0(FP), AX // syscall entry
  4. MOVL a1+4(FP), BX
  5. MOVL a2+8(FP), CX
  6. MOVL a3+12(FP), DX
  7. MOVL $0, SI
  8. MOVL $0, DI
  9. INVOKE_SYSCALL
  10. CMPL AX, $0xfffff001
  11. JLS ok
  12. MOVL $-1, r1+16(FP)
  13. MOVL $0, r2+20(FP)
  14. NEGL AX
  15. MOVL AX, err+24(FP)
  16. CALL runtime·exitsyscall(SB)
  17. RET
  18. ok:
  19. MOVL AX, r1+16(FP)
  20. MOVL DX, r2+20(FP)
  21. MOVL $0, err+24(FP)
  22. CALL runtime·exitsyscall(SB)
  23. RET

在真正通过汇编指令 INVOKE_SYSCALL 执行系统调用前后,都会调用运行时的 entersyscallexitsyscall 进行一些处理,正是这一层包装能够让我们在陷入系统调用之前触发调度器的一些操作,但是另外的 RawSyscall 等方法就会省略调用运行时方法的过程。

golang-syscall-and-rawsyscal

进入系统调用

entersyscall 函数会在获取当前 PC 程序计数器和 SP 栈指针之后调用 reentersyscall,这个函数会完成绝大部分 Goroutine 进入系统调用之前的准备工作:

  1. func reentersyscall(pc, sp uintptr) {
  2. _g_ := getg()
  3. _g_.m.locks++
  4. _g_.stackguard0 = stackPreempt
  5. _g_.throwsplit = true
  6. save(pc, sp)
  7. _g_.syscallsp = sp
  8. _g_.syscallpc = pc
  9. casgstatus(_g_, _Grunning, _Gsyscall)
  10. _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
  11. _g_.sysblocktraced = true
  12. _g_.m.mcache = nil
  13. pp := _g_.m.p.ptr()
  14. pp.m = 0
  15. _g_.m.oldp.set(pp)
  16. _g_.m.p = 0
  17. atomic.Store(&pp.status, _Psyscall)
  18. if sched.gcwaiting != 0 {
  19. systemstack(entersyscall_gcwait)
  20. save(pc, sp)
  21. }
  22. _g_.m.locks--
  23. }
  • 禁止当前线程 M 上发生的抢占,防止出现内存不一致的问题;
  • 保证当前行函数不会调用任何会导致当前栈分裂或者增长的函数;
  • 保存当前的程序计数器 PC 和栈指针 SP 中的内容;
  • 将 Goroutine 的状态更新至 _Gsyscall
  • 将 Goroutine 对应的处理器 P 和线程 M 暂时分离并更新处理器 P 的状态到 _Psyscall
  • 释放当前线程 M 上的锁;需要注意的是 reentersyscall 方法会导致处理器 P 和线程 M 的分离,当前线程 M 会陷入系统调用等待返回,处理器 P 上其他的 Goroutine 在这时就可能被其他处理器『取走』并执行,避免饥饿问题的发生,这也是 Go 语言程序创建的线程数可能会多于 GOMAXPROCS 的原因。

退出系统调用

当系统调用结束之后,就会调用退出系统调用的函数 exitsyscall 为当前 Goroutine 重新分配一个新的 CPU,这个函数有两个不同的执行路径,其中第一条是快速执行路径,也就是调用 exitsyscallfast 函数,另一条路径就是较慢的路径,它会切换至调度器的 Goroutine 并调用 exitsyscall0 函数:

  1. func exitsyscall() {
  2. _g_ := getg()
  3. _g_.m.locks++
  4. _g_.waitsince = 0
  5. oldp := _g_.m.oldp.ptr()
  6. _g_.m.oldp = 0
  7. if exitsyscallfast(oldp) {
  8. _g_.m.p.ptr().syscalltick++
  9. casgstatus(_g_, _Gsyscall, _Grunning)
  10. _g_.syscallsp = 0
  11. _g_.m.locks--
  12. if _g_.preempt {
  13. _g_.stackguard0 = stackPreempt
  14. } else {
  15. _g_.stackguard0 = _g_.stack.lo + _StackGuard
  16. }
  17. _g_.throwsplit = false
  18. return
  19. }
  20. _g_.sysexitticks = 0
  21. _g_.m.locks--
  22. mcall(exitsyscall0)
  23. _g_.syscallsp = 0
  24. _g_.m.p.ptr().syscalltick++
  25. _g_.throwsplit = false
  26. }

这两种不同的路径会分别通过不同的方法查找一个用于执行当前 Goroutine 处理器 P,快速路径 exitsyscallfast 中包含两个不同的分支:

  • 如果 Goroutine 的原处理器处于 _Psyscall 状态,就会直接调用 wirep 将 Goroutine 与处理器进行关联;
  • 如果调度器中存在闲置的处理器,就会调用 acquirep 函数使用闲置的处理器处理当前 Goroutine;另一个相对较慢的路径 exitsyscall0 就会将当前 Goroutine 切换至 _Grunnable 状态,并移除线程 M 和当前 Goroutine 的关联:

  • 当我们通过 pidleget 获取到闲置的处理器时就会在该处理器上执行 Goroutine;

  • 在其他情况下,我们会将当前 Goroutine 放到全局的运行队列中,等待调度器的调度;无论哪种情况,我们在这个函数中都会调用 schedule 函数触发调度器的调度,我们在上一节中已经介绍过调度器的调度过程,所以在这里就不展开介绍了。

运行时

我们需要注意的是,不是所有的系统调用都会调用 entersyscallexitsyscall 这两个运行时函数,出于性能的考虑,一些系统调用是不会调用这两个方法的,你可以在这个 列表 中查询到 Go 语言对 Linux 386 架构上不同系统调用的分类,我们在这里只简单展示其中的一部分内容:

SyscallType
SYS_TIMERawSyscall
SYS_GETTIMEOFDAYRawSyscall
SYS_SETRLIMITRawSyscall
SYS_GETRLIMITRawSyscall
SYS_EPOLL_WAITSyscall

由于直接进行系统调用会阻塞当前的线程,所以只有可以立刻返回的系统调用才可能会被『设置』成 RawSyscall 不被 Go 语言的调度器控制,例如:SYS_EPOLL_CREATESYS_EPOLL_WAIT(超时时间为 0)、SYS_TIME 等。

调度器启动

上面的这几个场景其实都是我们在使用 Goroutine 期间会遇到,在我们创建 Goroutine 用于执行并发任务或者执行 IO 操作时都会触发调度器的调度,对于一个有经验的 Go 语言使用者,对于触发调度的方式和时机其实会有一些比较靠谱的推测的,但是调度器的启动却是我们平时比较难以接触的部分,我们在这里就介绍一下 Golang 调度器的启动过程:

  1. func schedinit() {
  2. _g_ := getg()
  3. ...
  4. sched.maxmcount = 10000
  5. ...
  6. sched.lastpoll = uint64(nanotime())
  7. procs := ncpu
  8. if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
  9. procs = n
  10. }
  11. if procresize(procs) != nil {
  12. throw("unknown runnable goroutine during bootstrap")
  13. }
  14. }

在调度器初始函数执行的过程中会将 maxmcount 设置成 10000,这也就是一个 Go 语言程序中能够创建的最大线程数,虽然最多可以创建 10000 个线程,但是可以同时运行的线程还是由 GOMAXPROCS 这个环境变量控制。

我们从环境变量 GOMAXPROCS 获取了程序能够同时运行的最多处理器 P 之后就会调用 procresize 更新程序中处理器的数量,在这时『整个世界』都会停止(不会有任何用户协程被执行)并且调度器也会进入锁定状态,procresize 的执行过程如下:

  1. import "unsafe"
  2. func procresize(nprocs int32) *p {
  3. old := gomaxprocs
  4. // grow allp if necessary.
  5. // ...
  6. // initialize new P's
  7. // ...
  8. _g_ := getg()
  9. if _g_.m.p != 0 {
  10. _g_.m.p.ptr().m = 0
  11. }
  12. _g_.m.p = 0
  13. _g_.m.mcache = nil
  14. p := allp[0]
  15. p.m = 0
  16. p.status = _Pidle
  17. acquirep(p)
  18. // release resources from unused P's
  19. // ...
  20. // trim allp.
  21. // ...
  22. var runnablePs *p
  23. for i := nprocs - 1; i >= 0; i-- {
  24. p := allp[i]
  25. if _g_.m.p.ptr() == p {
  26. continue
  27. }
  28. p.status = _Pidle
  29. if runqempty(p) {
  30. pidleput(p)
  31. } else {
  32. p.m.set(mget())
  33. p.link.set(runnablePs)
  34. runnablePs = p
  35. }
  36. }
  37. stealOrder.reset(uint32(nprocs))
  38. var int32p *int32 = &gomaxprocs
  39. atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
  40. return runnablePs
  41. }
  • 如果全局变量 allp 切片中的处理器数量少于期望数量就会对切片进行扩容;
  • 使用 new 创建新的处理器结构体并调用 init 方法初始化刚刚扩容的处理器;
  • 通过指针将线程 m0 和处理器 allp[0] 绑定到一起;
  • 调用 destroy 方法释放不再使用的处理器结构;
  • 通过截断改变全局变量 allp 的长度保证与期望处理器数量相等;
  • 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局空闲队列中;调用 procresize 就是调度器启动的最后一步,在这一步过后调度器会完成相应数量处理器的启动,等待用户创建运行新的 Goroutine 并为 Goroutine 调度处理器资源。

总结

Goroutine 和调度器是 Go 语言能够高效地处理任务并且最大化利用资源的最主要原因,我们在这一节中介绍了 Golang 用于处理并发任务的 M - G - P 模型,包括它们各自的数据结构以及状态,除此之外我们还通过一些常见的场景介绍调度器的工作原理以及不同数据结构之间的协作关系,相信能够对各位读者理解调度器有一定的帮助。

Reference

wechat-account-qrcode

本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。