5.2 select

很多 C 语言或者 Unix 开发者听到 select 想到的都是系统调用,而谈到 I/O 模型时最终大都会提到基于 selectpollepoll 等函数构建的 IO 多路复用模型。Go 语言的 select 与 C 语言中的 select 有着比较相似的功能。本节会介绍 Go 语言 select 常见的现象、数据结构以及四种不同情况下的实现原理。

C 语言中的 select 关键字可以同时监听多个文件描述符的可读或者可写的状态,Go 语言中的 select 关键字也能够让 Goroutine 同时等待多个 Channel 的可读或者可写,在多个文件或者 Channel 发生状态改变之前,select 会一直阻塞当前线程或者 Goroutine。

Golang-Select-Channels

图 5-5 Select 和 Channels

select 是一种与 switch 相似的控制结构,与 switch 不同的是,select 中虽然也有多个 case,但是这些 case 中的表达式必须都是 Channel 的收发操作。下面的代码就展示了一个包含 Channel 收发操作的 select 结构:

  1. func fibonacci(c, quit chan int) {
  2. x, y := 0, 1
  3. for {
  4. select {
  5. case c <- x:
  6. x, y = y, x+y
  7. case <-quit:
  8. fmt.Println("quit")
  9. return
  10. }
  11. }
  12. }

上述控制结构会等待 c <- x 或者 <-quit 两个表达式中任意一个的返回。无论哪一个表达式返回都会立刻执行 case 中的代码,当 select 中的两个 case 同时被触发时,就会随机选择一个 case 执行。

5.2.1 现象

当我们在 Go 语言中使用 select 控制结构时,会遇到两个有趣的现象:

  • select 能在 Channel 上进行非阻塞的收发操作;
  • select 在遇到多个 Channel 同时响应时会随机挑选 case 执行;这两个现象是学习 select 时经常会遇到的,我们来深入了解具体的场景并分析这两个现象背后的设计原理。

非阻塞的收发

在通常情况下,select 语句会阻塞当前 Goroutine 并等待多个 Channel 中的一个达到可以收发的状态。但是如果 select 控制结构中包含 default 语句,那么这个 select 语句在执行时会遇到以下两种情况:

  • 当存在可以收发的 Channel 时,直接处理该 Channel 对应的 case
  • 当不存在可以收发的 Channel 是,执行 default 中的语句;当我们运行下面的代码时就不会阻塞当前的 Goroutine,它会直接执行 default 中的代码并返回。
  1. func main() {
  2. ch := make(chan int)
  3. select {
  4. case i := <-ch:
  5. println(i)
  6. default:
  7. println("default")
  8. }
  9. }
  10. $ go run main.go
  11. default

只要我们稍微想一下,就会发现 Go 语言设计的这个现象就非常合理。select 的作用就是同时监听多个 case 是否可以执行,如果多个 Channel 都不能执行,那么运行 default 中的代码也是理所当然的。

非阻塞的 Channel 发送和接收操作还是很有必要的,在很多场景下我们不希望向 Channel 发送消息或者从 Channel 中接收消息会阻塞当前 Goroutine,我们只是向看看 Channel 的可读或者可写状态。下面就是一个常见的例子:

  1. errCh := make(chan error, len(tasks)
  2. wg := sync.WaitGroup{}
  3. wg.Add(len(tasks))
  4. for i := range tasks {
  5. go func() {
  6. defer wg.Done()
  7. if err := tasks[i].Run(); err != nil {
  8. errCh <- err
  9. }
  10. }()
  11. }
  12. wg.Wait()
  13. select {
  14. case err := <-errCh:
  15. return err
  16. default:
  17. return nil
  18. }

在上面这段代码中,我们不关心到底多少个任务执行失败了,只关心是否存在返回错误的任务,最后的 select 语句就能很好地完成这个任务。然而使用 select 的语法不是最原始的设计,它在最初版本使用 x, ok := <-c 的语法实现非阻塞的收发,以下是与非阻塞收发的相关提交:

随机执行

另一个使用 select 遇到的情况是同时有多个 case 就绪时,select 会选择那个 case 执行的问题,我们通过下面的代码可以简单了解一下:

  1. func main() {
  2. ch := make(chan int)
  3. go func() {
  4. for range time.Tick(1 * time.Second) {
  5. ch <- 0
  6. }
  7. }()
  8. for {
  9. select {
  10. case <-ch:
  11. println("case1")
  12. case <-ch:
  13. println("case2")
  14. }
  15. }
  16. }
  17. $ go run main.go
  18. case1
  19. case2
  20. case1
  21. ...

从上述代码输出的结果中我们可以看到,select 在遇到多个 <-ch 同时满足可读或者可写条件时会随机选择一个 case 执行其中的代码。

这个设计是在十多年前被 select 提交5引入并一直保留到现在的,虽然中间经历过一些修改6,但是语义一直都没有改变。在上面的代码中,两个 case 都是同时满足执行条件的,如果我们按照顺序依次判断,那么后面的条件永远都会得不到执行,而随机的引入就是为了避免饥饿问题的发生。

5.2.2 数据结构

select 在 Go 语言的源代码中不存在对应的结构体,但是 select 控制结构中的 case 却使用 runtime.scase 结构体来表示:

  1. type scase struct {
  2. c *hchan
  3. elem unsafe.Pointer
  4. kind uint16
  5. pc uintptr
  6. releasetime int64
  7. }

因为非默认的 case 中都与 Channel 的发送和接收有关,所以 runtime.scase 结构体中也包含一个 runtime.hchan 类型的字段存储 case 中使用的 Channel;除此之外,elem 是接收或者发送数据的变量地址、kind 表示 runtime.scase 的种类,总共包含以下四种:

  1. const (
  2. caseNil = iota
  3. caseRecv
  4. caseSend
  5. caseDefault
  6. )

这四种常量分别表示不同类型的 case,相信它们的命名已经能够充分帮助我们理解它们的作用了,所以这里也不一一介绍了。

5.2.3 实现原理

select 语句在编译期间会被转换成 OSELECT 节点。每一个 OSELECT 节点都会持有一组 OCASE 节点,如果 OCASE 的执行条件是空,那就意味着这是一个 default 节点:

golang-oselect-and-ocases

图 5-7 OSELECT 和多个 OCASE

上图展示的就是 select 语句在编译期间的结构,每一个 OCASE 既包含执行条件也包含满足条件后执行的代码。

编译器在中间代码生成期间会根据 selectcase 的不同对控制语句进行优化,这一过程都发生在 cmd/compile/internal/gc.walkselectcases 函数中,我们在这里会分四种情况介绍处理的过程和结果:

  • select 不存在任何的 case
  • select 只存在一个 case
  • select 存在量个 case,其中一个 casedefault
  • select 存在多个 case;上述的四种情况不仅会涉及编译器的重写和优化,还会涉及 Go 语言的运行时机制,我们会从编译期间和运行时两方面分析上述情况。

直接阻塞

首先介绍的是最简单的情况,也就是当 select 结构中不包含任何 case 时编译器是如何进行处理的,我们截取 cmd/compile/internal/gc.walkselectcases 函数的前几行代码:

  1. func walkselectcases(cases *Nodes) []*Node {
  2. n := cases.Len()
  3. if n == 0 {
  4. return []*Node{mkcall("block", nil, nil)}
  5. }
  6. ...
  7. }

这段代码非常简单并且容易理解,它直接将类似 select {} 的空语句转换成调用 runtime.block 函数:

  1. func block() {
  2. gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
  3. }

runtime.block 函数的实现非常简单,它会调用 runtime.gopark 让出当前 Goroutine 对处理器的使用权,传入的等待原因是 waitReasonSelectNoCases

简单总结一下,空的 select 语句会直接阻塞当前的 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。

单一管道

如果当前的 select 条件只包含一个 case,那么就会将 select 改写成 if 条件语句。下面展示了原始的 select 语句和被改写、优化后的代码:

  1. // 改写前
  2. select {
  3. case v, ok <-ch: // case ch <- v
  4. ...
  5. }
  6. // 改写后
  7. if ch == nil {
  8. block()
  9. }
  10. v, ok := <-ch // case ch <- v
  11. ...

cmd/compile/internal/gc.walkselectcases 在处理单操作 select 语句时,会根据 Channel 的收发情况生成不同的语句。当 case 中的 Channel 是空指针时,就会直接挂起当前 Goroutine 并永久休眠。

非阻塞操作

select 中仅包含两个 case,并且其中一个是 default 时,Go 语言的编译器就会认为这是一次非阻塞的收发操作。cmd/compile/internal/gc.walkselectcases 函数会对这种情况单独处理,不过在正式优化之前,该函数会将 case 中的所有 Channel 都转换成指向 Channel 的地址。我们会分别介绍非阻塞发送和非阻塞接收时,编译器进行的不同优化。

发送

首先是 Channel 的发送过程,当 case 中表达式的类型是 OSEND 时,编译器会使用 if/else 语句和 runtime.selectnbsend 函数改写代码:

  1. select {
  2. case ch <- i:
  3. ...
  4. default:
  5. ...
  6. }
  7. if selectnbsend(ch, i) {
  8. ...
  9. } else {
  10. ...
  11. }

这段代码中最重要的就是 runtime.selectnbsend 函数,它为我们提供了向 Channel 非阻塞地发送数据的能力。我们在 Channel 一节介绍了向 Channel 发送数据的 runtime.chansend 函数包含一个 block 参数,该参数会决定这一次的发送是不是阻塞的:

  1. func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  2. return chansend(c, elem, false, getcallerpc())
  3. }

由于我们向 runtime.chansend 函数传入了 false,所以哪怕是不存在接收方或者缓冲区空间不足都不会阻塞当前 Goroutine 而是会直接返回。

接收

由于从 Channel 中接收数据可能会返回一个或者两个值,所以接受数据的情况会比发送稍显复杂,不过改写的套路是差不多的:

  1. // 改写前
  2. select {
  3. case v <- ch: // case v, ok <- ch:
  4. ......
  5. default:
  6. ......
  7. }
  8. // 改写后
  9. if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
  10. ...
  11. } else {
  12. ...
  13. }

返回值数量不同会导致使用函数的不同,两个用于非阻塞接收消息的函数 runtime.selectnbrecvruntime.selectnbrecv2 只是对 runtime.chanrecv 返回值的处理稍有不同:

  1. func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
  2. selected, _ = chanrecv(c, elem, false)
  3. return
  4. }
  5. func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
  6. selected, *received = chanrecv(c, elem, false)
  7. return
  8. }

因为接收方不需要,所以 runtime.selectnbrecv 会直接忽略返回的布尔值,而 runtime.selectnbrecv2 会将布尔值回传给调用方。与 runtime.chansend 一样,runtime.chanrecv 也提供了一个 block 参数用于控制这一次接收是否阻塞。

常见流程

在默认的情况下,编译器会使用如下的流程处理 select 语句:

  • 将所有的 case 转换成包含 Channel 以及类型等信息的 runtime.scase 结构体;
  • 调用运行时函数 runtime.selectgo 从多个准备就绪的 Channel 中选择一个可执行的 runtime.scase 结构体;
  • 通过 for 循环生成一组 if 语句,在语句中判断自己是不是被选中的 case一个包含三个 case 的正常 select 语句其实会被展开成如下所示的逻辑,我们可以看到其中处理的三个部分:
  1. selv := [3]scase{}
  2. order := [6]uint16
  3. for i, cas := range cases {
  4. c := scase{}
  5. c.kind = ...
  6. c.elem = ...
  7. c.c = ...
  8. }
  9. chosen, revcOK := selectgo(selv, order, 3)
  10. if chosen == 0 {
  11. ...
  12. break
  13. }
  14. if chosen == 1 {
  15. ...
  16. break
  17. }
  18. if chosen == 2 {
  19. ...
  20. break
  21. }

展开后的代码片段中最重要的就是用于选择待执行 case 的运行时函数 runtime.selectgo,这也是我们要关注的重点。因为这个函数的实现比较复杂, 所以这里分两部分分析它的执行过程:

  • 执行一些必要的初始化操作并确定 case 的处理顺序;
  • 在循环中根据 case 的类型做出不同的处理;

初始化

runtime.selectgo 函数首先会进行执行必要的初始化操作并决定处理 case 的两个顺序 — 轮训顺序 pollOrder 和加锁顺序 lockOrder

  1. func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
  2. cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
  3. order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
  4. scases := cas1[:ncases:ncases]
  5. pollorder := order1[:ncases:ncases]
  6. lockorder := order1[ncases:][:ncases:ncases]
  7. for i := range scases {
  8. cas := &scases[i]
  9. }
  10. for i := 1; i < ncases; i++ {
  11. j := fastrandn(uint32(i + 1))
  12. pollorder[i] = pollorder[j]
  13. pollorder[j] = uint16(i)
  14. }
  15. // 根据 Channel 的地址排序确定加锁顺序
  16. ...
  17. sellock(scases, lockorder)
  18. ...
  19. }

轮训顺序 pollOrder 和加锁顺序 lockOrder 分别是通过以下的方式确认的:

  • 轮训顺序:通过 runtime.fastrandn 函数引入随机性;
  • 加锁顺序:按照 Channel 的地址排序后确定加锁顺序;

随机的轮训顺序可以避免 Channel 的饥饿问题,保证公平性;而根据 Channel 的地址顺序确定加锁顺序能够避免死锁的发生。这段代码最后调用的 runtime.sellock 函数会按照之前生成的加锁顺序锁定 select 语句中包含所有的 Channel。

循环

当我们为 select 语句锁定了所有 Channel 之后就会进入 runtime.selectgo 函数的主循环,它会分三个阶段查找或者等待某个 Channel 准备就绪:

  • 查找是否已经存在准备就绪的 Channel,即可以执行收发操作;
  • 将当前 Goroutine 加入 Channel 对应的收发队列上并等待其他 Goroutine 的唤醒;
  • 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理;runtime.selectgo 函数会根据不同情况通过 goto 跳转到函数内部的不同标签执行相应的逻辑,其中包括:
  • bufrecv:可以从缓冲区读取数据;
  • bufsend:可以向缓冲区写入数据;
  • recv:可以从休眠的发送方获取数据;
  • send:可以向休眠的接收方发送数据;
  • rclose:可以从关闭的 Channel 读取 EOF;
  • sclose:向关闭的 Channel 发送数据;
  • retc:结束调用并返回;

我们先来分析循环执行的第一个阶段,查找已经准备就绪的 Channel。循环会遍历所有的 case 并找到需要被唤起的 runtime.sudog 结构,在这个阶段,我们会根据 case 的四种类型分别处理:

  • caseNil:当前 case 不包含 Channel;
    • 这种 case 会被跳过;
  • caseRecv:当前 case 会从 Channel 中接收数据;
    • 如果当前 Channel 的 sendq 上有等待的 Goroutine,就会跳到 recv 标签从 Goroutine 中读取数据;
    • 如果当前 Channel 的缓冲区不为空就会跳到 bufrecv 标签处从缓冲区获取数据;
    • 如果当前 Channel 已经被关闭就会跳到 rclose 做一些清除的收尾工作;
  • caseSend:当前 case 会向 Channel 发送数据;
    • 如果当前 Channel 已经被关闭就会直接跳到 sclose 标签,触发 panic 尝试中止程序;
    • 如果当前 Channel 的 recvq 上有等待的 Goroutine 就会跳到 send 标签向 Channel 发送数据;
  • caseDefault:当前 casedefault 语句;
    • 表示前面的所有 case 都没有被执行,这里会解锁所有 Channel 并返回,意味着当前 select 结构中的收发都是非阻塞的;golang-runtime-selectgo

图 5-8 运行时 selectgo 函数

第一阶段的主要职责是查找所有 case 中 Channel 是否有可以立刻被处理的情况。无论是在包含等待的 Goroutine 还是缓冲区中存在数据,只要满足条件就会立刻处理,如果不能立刻找到活跃的 Channel 就会进入循环的下一阶段,按照需要将当前的 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中:

  1. func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
  2. ...
  3. gp = getg()
  4. nextp = &gp.waiting
  5. for _, casei := range lockorder {
  6. casi = int(casei)
  7. cas = &scases[casi]
  8. c = cas.c
  9. sg := acquireSudog()
  10. sg.g = gp
  11. sg.c = c
  12. switch cas.kind {
  13. case caseRecv:
  14. c.recvq.enqueue(sg)
  15. case caseSend:
  16. c.sendq.enqueue(sg)
  17. }
  18. }
  19. gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
  20. ...
  21. }

除了将当前 Goroutine 对应的 runtime.sudog 结构体加入队列之外,这些 runtime.sudog 结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark 函数挂起当前 Goroutine 等待调度器的唤醒。

Golang-Select-Waiting

图 5-9 Goroutine 上等待收发的 sudog 链表

等到 select 中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo 函数的第三阶段,从 runtime.sudog 结构体中获取数据:

  1. func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
  2. ...
  3. sg = (*sudog)(gp.param)
  4. gp.param = nil
  5. casi = -1
  6. cas = nil
  7. sglist = gp.waiting
  8. for _, casei := range lockorder {
  9. k = &scases[casei]
  10. if sg == sglist {
  11. casi = int(casei)
  12. cas = k
  13. } else {
  14. if k.kind == caseSend {
  15. c.sendq.dequeueSudoG(sglist)
  16. } else {
  17. c.recvq.dequeueSudoG(sglist)
  18. }
  19. }
  20. sgnext = sglist.waitlink
  21. sglist.waitlink = nil
  22. releaseSudog(sglist)
  23. sglist = sgnext
  24. }
  25. c = cas.c
  26. goto retc
  27. ...
  28. }

第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecvbufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:

  1. bufrecv:
  2. recvOK = true
  3. qp = chanbuf(c, c.recvx)
  4. if cas.elem != nil {
  5. typedmemmove(c.elemtype, cas.elem, qp)
  6. }
  7. typedmemclr(c.elemtype, qp)
  8. c.recvx++
  9. if c.recvx == c.dataqsiz {
  10. c.recvx = 0
  11. }
  12. c.qcount--
  13. selunlock(scases, lockorder)
  14. goto retc
  15. bufsend:
  16. typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  17. c.sendx++
  18. if c.sendx == c.dataqsiz {
  19. c.sendx = 0
  20. }
  21. c.qcount++
  22. selunlock(scases, lockorder)
  23. goto retc

这里在缓冲区进行的操作和直接调用 runtime.chansendruntime.chanrecv 函数差不多,上述两个过程在执行结束之后都会直接跳到 retc 字段。

两个直接对 Channel 收发的情况会调用 Channel 运行时函数 runtime.sendruntime.recv,这两个函数会直接与处于休眠状态的 Goroutine 打交道:

  1. recv:
  2. recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  3. recvOK = true
  4. goto retc
  5. send:
  6. send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  7. goto retc

不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:

  • 从一个关闭 Channel 中接收数据会直接清除 Channel 中的相关内容;
  • 向一个关闭的 Channel 发送数据就会直接 panic 造成程序崩溃:
  1. rclose:
  2. selunlock(scases, lockorder)
  3. recvOK = false
  4. if cas.elem != nil {
  5. typedmemclr(c.elemtype, cas.elem)
  6. }
  7. goto retc
  8. sclose:
  9. selunlock(scases, lockorder)
  10. panic(plainError("send on closed channel"))

总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。

5.2.4 小结

我们简单总结一下 select 结构的执行过程与实现原理,首先在编译期间,Go 语言会对 select 语句进行优化,它会根据 selectcase 的不同选择不同的优化路径:

  • 空的 select 语句会被转换成 runtime.block 函数的调用,直接挂起当前 Goroutine;
  • 如果 select 语句中只包含一个 case,就会被转换成 if ch == nil { block }; n; 表达式;
    • 首先判断操作的 Channel 是不是空的;
    • 然后执行 case 结构中的内容;
  • 如果 select 语句中只包含两个 case 并且其中一个是 default,那么会使用 runtime.selectnbrecvruntime.selectnbsend 非阻塞地执行收发操作;
  • 在默认情况下会通过 runtime.selectgo 函数获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码;在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 runtime.selectgo 函数,该函数会按照以下的流程执行:

  • 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder

  • 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel;
    • 如果存在就直接获取 case 对应的索引并返回;
    • 如果不存在就会创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
  • 当调度器唤醒当前 Goroutine 时就会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 结构对应的索引;select 关键字是 Go 语言特有的控制结构,它的实现原理比较复杂,需要编译器和运行时函数的通力合作。

wechat-account-qrcode

本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。