并发

你可以在这里找到本章的所有代码

这是我们的计划:同事已经写了一个 CheckWebsites 的函数检查 URL 列表的状态。

  1. package concurrency
  2. type WebsiteChecker func(string) bool
  3. func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {
  4. results := make(map[string]bool)
  5. for _, url := range urls {
  6. results[url] = wc(url)
  7. }
  8. return results
  9. }

它返回一个 map,由每个 url 检查后的得到的布尔值组成,成功响应的值为 true,错误响应的值为 false

你还必须传入一个 WebsiteChecker 处理单个 URL 并返回一个布尔值。它会被函数调用以检查所有的网站。

使用 依赖注入,允许在不发起真实 HTTP 请求的情况下测试函数,这使测试变得可靠和快速。

这是他们写的测试:

  1. package concurrency
  2. import (
  3. "reflect"
  4. "testing"
  5. )
  6. func mockWebsiteChecker(url string) bool {
  7. if url == "waat://furhurterwe.geds" {
  8. return false
  9. }
  10. return true
  11. }
  12. func TestCheckWebsites(t *testing.T) {
  13. websites := []string{
  14. "http://google.com",
  15. "http://blog.gypsydave5.com",
  16. "waat://furhurterwe.geds",
  17. }
  18. actualResults := CheckWebsites(mockWebsiteChecker, websites)
  19. want := len(websites)
  20. got := len(actualResults)
  21. if want != got {
  22. t.Fatalf("Wanted %v, got %v", want, got)
  23. }
  24. expectedResults := map[string]bool{
  25. "http://google.com": true,
  26. "http://blog.gypsydave5.com": true,
  27. "waat://furhurterwe.geds": false,
  28. }
  29. if !reflect.DeepEqual(expectedResults, actualResults) {
  30. t.Fatalf("Wanted %v, got %v", expectedResults, actualResults)
  31. }
  32. }

该功能在生产环境中被用于检查数百个网站。但是你的同事开始抱怨它速度很慢,所以他们请你帮忙为程序提速。

写一个测试

首先我们对 CheckWebsites 做一个基准测试,这样就能看到我们修改的影响。

  1. package concurrency
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func slowStubWebsiteChecker(_ string) bool {
  7. time.Sleep(20 * time.Millisecond)
  8. return true
  9. }
  10. func BenchmarkCheckWebsites(b *testing.B) {
  11. urls := make([]string, 100)
  12. for i := 0; i < len(urls); i++ {
  13. urls[i] = "a url"
  14. }
  15. for i := 0; i < b.N; i++ {
  16. CheckWebsites(slowStubWebsiteChecker, urls)
  17. }
  18. }

基准测试使用一百个网址的 slice 对 CheckWebsites 进行测试,并使用 WebsiteChecker 的伪造实现。slowStubWebsiteChecker 故意放慢速度。它使用 time.Sleep 明确等待 20 毫秒,然后返回 true。

当我们运行基准测试时使用 go test -bench=. 命令 (如果在 Windows Powershell 环境下使用 go test -bench="."):

  1. pkg: github.com/gypsydave5/learn-go-with-tests/concurrency/v0
  2. BenchmarkCheckWebsites-4 1 2249228637 ns/op
  3. PASS
  4. ok github.com/gypsydave5/learn-go-with-tests/concurrency/v0 2.268s

CheckWebsite 经过基准测试的时间为 2249228637 纳秒,大约 2.25 秒。

让我们尝试去让它运行得更快。

编写足够的代码让它通过

现在我们终于可以谈论并发了,以下内容是为了说明「不止一件事情正在进行中」。这是我们每天很自然在做的事情。

比如,今天早上我泡了一杯茶。我放上水壶,然后在等待它煮沸时,从冰箱里取出了牛奶,把茶从柜子里拿出来,找到我最喜欢的杯子,把茶袋放进杯子里,然后等水壶沸了,把水倒进杯子里。

没有 做的事情是放上水壶,然后呆呆地盯着水壶等水煮沸,然后在煮沸后再做其他事情。

如果你能理解为什么第一种方式泡茶更快,那你就可以理解我们如何让 CheckWebsites 变得更快。与其等待网站响应之后再发送下一个网站的请求,不如告诉计算机在等待时就发起下一个请求。

通常在 Go 中,当调用函数 doSomething() 时,我们等待它返回(即使它没有值返回,我们仍然等待它完成)。我们说这个操作是 阻塞 的 —— 它让我们等待它完成。Go 中不会阻塞的操作将在称为 goroutine 的单独 进程 中运行。将程序想象成从上到下读 Go 的 代码,当函数被调用执行读取操作时,进入每个函数「内部」。当一个单独的进程开始时,就像开启另一个 reader(阅读程序)在函数内部执行读取操作,原来的 reader 继续向下读取 Go 代码。

要告诉 Go 开始一个新的 goroutine,我们把一个函数调用变成 go 声明,通过把关键字 go 放在它前面:go doSomething()

  1. package concurrency
  2. type WebsiteChecker func(string) bool
  3. func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {
  4. results := make(map[string]bool)
  5. for _, url := range urls {
  6. go func() {
  7. results[url] = wc(url)
  8. }()
  9. }
  10. return results
  11. }

因为开启 goroutine 的唯一方法就是将 go 放在函数调用前面,所以当我们想要启动 goroutine 时,我们经常使用 匿名函数(anonymous functions)。一个匿名函数文字看起来和正常函数声明一样,但没有名字(意料之中)。你可以在 上面的 for 循环体中看到一个。

匿名函数有许多有用的特性,其中两个上面正在使用。首先,它们可以在声明的同时执行 —— 这就是匿名函数末尾的 () 实现的。其次,它们维护对其所定义的词汇作用域的访问权 —— 在声明匿名函数时所有可用的变量也可在函数体内使用。

上面匿名函数的主体和之前循环体中的完全一样。唯一的区别是循环的每次迭代都会启动一个新的 goroutine,与当前进程(WebsiteChecker 函数)同时发生,每个循环都会将结果添加到 results map 中。

但是当我们执行 go test

  1. -------- FAIL: TestCheckWebsites (0.00s)
  2. CheckWebsites_test.go:31: Wanted map[http://google.com:true http://blog.gypsydave5.com:true waat://furhurterwe.geds:false], got map[]
  3. FAIL
  4. exit status 1
  5. FAIL github.com/gypsydave5/learn-go-with-tests/concurrency/v1 0.010s

快速进入平行宇宙……

你可能不会得到这个结果。你可能会得到一个 panic 信息,这个稍后再谈。如果你得到的是那些结果,不要担心,只要继续运行测试,直到你得到上述结果。或假装你得到了,这取决于你。欢迎来到并发编程的世界:如果处理不正确,很难预测会发生什么。别担心 —— 这就是我们编写测试的原因,当处理并发时,测试帮助我们预测可能发生的情况。

… 重新回到这些问题。

让我们困惑的是,原来的测试 WebsiteChecker 现在返回空的 map。哪里出问题了?

我们 for 循环开始的 goroutines 没有足够的时间将结果添加结果到 results map 中;WebsiteChecker 函数对于它们来说太快了,以至于它返回时仍为空的 map。

为了解决这个问题,我们可以等待所有的 goroutine 完成他们的工作,然后返回。两秒钟应该能完成了,对吧?

  1. package concurrency
  2. import "time"
  3. type WebsiteChecker func(string) bool
  4. func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {
  5. results := make(map[string]bool)
  6. for _, url := range urls {
  7. go func() {
  8. results[url] = wc(url)
  9. }()
  10. }
  11. time.Sleep(2 * time.Second)
  12. return results
  13. }

现在当我们运行测试时获得的结果(如果没有得到 —— 参考上面的做法):

  1. -------- FAIL: TestCheckWebsites (0.00s)
  2. CheckWebsites_test.go:31: Wanted map[http://google.com:true http://blog.gypsydave5.com:true waat://furhurterwe.geds:false], got map[waat://furhurterwe.geds:false]
  3. FAIL
  4. exit status 1
  5. FAIL github.com/gypsydave5/learn-go-with-tests/concurrency/v1 0.010s

这不是很好 - 为什么只有一个结果?我们可以尝试通过增加等待的时间来解决这个问题 —— 如果你愿意,可以试试。但没什么作用。这里的问题是变量 url 被重复用于 for 循环的每次迭代 —— 每次都会从 urls 获取新值。但是我们的每个 goroutine 都是 url 变量的引用 —— 它们没有自己的独立副本。所以他们 会写入在迭代结束时的 url —— 最后一个 url。这就是为什么我们得到的结果是最后一个 url。

解决这个问题:

  1. import (
  2. "time"
  3. )
  4. type WebsiteChecker func(string) bool
  5. func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {
  6. results := make(map[string]bool)
  7. for _, url := range urls {
  8. go func(u string) {
  9. results[u] = wc(u)
  10. }(url)
  11. }
  12. time.Sleep(2 * time.Second)
  13. return results
  14. }

通过给每个匿名函数一个参数 url(u),然后用 url 作为参数调用匿名函数,我们确保 u 的值固定为循环迭代的 url 值,重新启动 goroutineuurl 值的副本,因此无法更改。

现在,如果你幸运的话,你会得到:

  1. PASS
  2. ok github.com/gypsydave5/learn-go-with-tests/concurrency/v1 2.012s

但是,如果你不走运(如果你运行基准测试,这很可能会发生,因为你将发起多次的尝试)。

  1. fatal error: concurrent map writes
  2. goroutine 8 [running]:
  3. runtime.throw(0x12c5895, 0x15)
  4. /usr/local/Cellar/go/1.9.3/libexec/src/runtime/panic.go:605 +0x95 fp=0xc420037700 sp=0xc4200376e0 pc=0x102d395
  5. runtime.mapassign_faststr(0x1271d80, 0xc42007acf0, 0x12c6634, 0x17, 0x0)
  6. /usr/local/Cellar/go/1.9.3/libexec/src/runtime/hashmap_fast.go:783 +0x4f5 fp=0xc420037780 sp=0xc420037700 pc=0x100eb65
  7. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker.func1(0xc42007acf0, 0x12d3938, 0x12c6634, 0x17)
  8. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:12 +0x71 fp=0xc4200377c0 sp=0xc420037780 pc=0x12308f1
  9. runtime.goexit()
  10. /usr/local/Cellar/go/1.9.3/libexec/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc4200377c8 sp=0xc4200377c0 pc=0x105cf01
  11. created by github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker
  12. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:11 +0xa1
  13. ... many more scary lines of text ...

这看上去冗长、可怕,我们需要深呼吸并阅读错误:fatal error: concurrent map writes。有时候,当我们运行我们的测试时,两个 goroutines 完全同时写入 results map。Go 的 Maps 不喜欢多个事物试图一次性写入,所以就导致了 fatal error

这是一种 race condition(竞争条件),当软件的输出取决于事件发生的时间和顺序时,因为我们无法控制,bug 就会出现。因为我们无法准确控制每个 goroutine 写入结果 map 的时间,两个 goroutines 同一时间写入时程序将非常脆弱。

Go 可以帮助我们通过其内置的 race detector 来发现竞争条件。要启用此功能,请使用 race 标志运行测试:go test -race

你应该得到一些如下所示的输出:

  1. ==================
  2. WARNING: DATA RACE
  3. Write at 0x00c420084d20 by goroutine 8:
  4. runtime.mapassign_faststr()
  5. /usr/local/Cellar/go/1.9.3/libexec/src/runtime/hashmap_fast.go:774 +0x0
  6. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker.func1()
  7. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:12 +0x82
  8. Previous write at 0x00c420084d20 by goroutine 7:
  9. runtime.mapassign_faststr()
  10. /usr/local/Cellar/go/1.9.3/libexec/src/runtime/hashmap_fast.go:774 +0x0
  11. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker.func1()
  12. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:12 +0x82
  13. Goroutine 8 (running) created at:
  14. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker()
  15. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:11 +0xc4
  16. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.TestWebsiteChecker()
  17. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker_test.go:27 +0xad
  18. testing.tRunner()
  19. /usr/local/Cellar/go/1.9.3/libexec/src/testing/testing.go:746 +0x16c
  20. Goroutine 7 (finished) created at:
  21. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.WebsiteChecker()
  22. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:11 +0xc4
  23. github.com/gypsydave5/learn-go-with-tests/concurrency/v3.TestWebsiteChecker()
  24. /Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker_test.go:27 +0xad
  25. testing.tRunner()
  26. /usr/local/Cellar/go/1.9.3/libexec/src/testing/testing.go:746 +0x16c
  27. ==================

细节还是难以阅读 - 但 WARNING: DATA RACE 相当明确。阅读错误的内容,我们可以看到两个不同的 goroutines 在 map 上执行写入操作:

Write at 0x00c420084d20 by goroutine 8:

正在写入相同的内存块

Previous write at 0x00c420084d20 by goroutine 7:

最重要的是,我们可以看到发生写入的代码行:

/Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:12

和 goroutines 7 和 8 开始的代码行号:

/Users/gypsydave5/go/src/github.com/gypsydave5/learn-go-with-tests/concurrency/v3/websiteChecker.go:11

你需要知道的所有内容都会打印到你的终端上 - 你只需耐心阅读就可以了。

Channels

我们可以通过使用 channels 协调我们的 goroutines 来解决这个数据竞争。channels 是一个 Go 数据结构,可以同时接收和发送值。这些操作以及细节允许不同进程之间的通信。

在这种情况下,我们想要考虑父进程和每个 goroutine 之间的通信,goroutine 使用 url 来执行 WebsiteChecker 函数。

  1. package concurrency
  2. type WebsiteChecker func(string) bool
  3. type result struct {
  4. string
  5. bool
  6. }
  7. func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {
  8. results := make(map[string]bool)
  9. resultChannel := make(chan result)
  10. for _, url := range urls {
  11. go func(u string) {
  12. resultChannel <- result{u, wc(u)}
  13. }(url)
  14. }
  15. for i := 0; i < len(urls); i++ {
  16. result := <-resultChannel
  17. results[result.string] = result.bool
  18. }
  19. return results
  20. }

除了 results map 之外,我们现在还有一个 resultChannel 的变量,同样使用 make 方法创建。chan result 是 channel 类型的 —— result 的 channel。新类型的 result 是将 WebsiteChecker 的返回值与正在检查的 url 相关联 —— 它是一个 stringbool 的结构。因为我们不需要任何一个要命名的值,它们中的每一个在结构中都是匿名的;这在很难知道用什么命名值的时候可能很有用。

现在,当我们迭代 urls 时,不是直接写入 map,而是使用 send statement 将每个调用 wcresult 结构体发送到 resultChannel。这使用 <- 操作符,channel 放在左边,值放在右边:

  1. // send statement
  2. resultChannel <- result{u, wc(u)}

下一个 for 循环为每个 url 迭代一次。 我们在内部使用 receive expression,它将从通道接收到的值分配给变量。这也使用 <- 操作符,但现在两个操作数颠倒过来:现在 channel 在右边,我们指定的变量在左边:

  1. // receive expression
  2. result := <-resultChannel

然后我们使用接收到的 result 更新 map。

通过将结果发送到通道,我们可以控制每次写入 results map 的时间,确保每次写入一个结果。虽然 wc 的每个调用都发送给结果通道,但是它们在其自己的进程内并行发生,因为我们将结果通道中的值与接收表达式一起逐个处理一个结果。

我们已经将想要加快速度的那部分代码并行化,同时确保不能并发的部分仍然是线性处理。我们使用 channel 在多个进程间通信。

当我们运行基准时:

  1. pkg: github.com/gypsydave5/learn-go-with-tests/concurrency/v2
  2. BenchmarkCheckWebsites-8 100 23406615 ns/op
  3. PASS
  4. ok github.com/gypsydave5/learn-go-with-tests/concurrency/v2 2.377s

23406615 纳秒 —— 0.023 秒,速度大约是最初函数的一百倍,这是非常成功的。

总结

这个比在 TDD 上的寻常练习轻松一些。某种程度说,我们已经参与了 CheckWebsites 函数的一个长期重构;输入和输出从未改变,它只是变得更快了。但是我们所做的测试以及我们编写的基准测试允许我们重构 CheckWebsites,让我们有信心保证软件仍然可以工作,同时也证明它确实变得更快了。

在使它更快的过程中,我们明白了

  • goroutines 是 Go 的基本并发单元,它让我们可以同时检查多个网站。
  • anonymous functions(匿名函数),我们用它来启动每个检查网站的并发进程。
  • channels,用来组织和控制不同进程之间的交流,使我们能够避免 race condition(竞争条件) 的问题。
  • the race detector(竞争探测器) 帮助我们调试并发代码的问题。

使程序加快

一种构建软件的敏捷方法,常常被错误地归属于 Kent Beck,即:

让它运作,使它正确,使它快速

「运作」是通过测试,「正确」是重构代码,而「快速」是优化代码以使其快速运行。一旦我们使程序可以正确运行,我们能做的就只有使它快速。很幸运,我们得到的代码已经被证明是可以运作的,并且不需要重构。在另外两个步骤执行之前,我们绝不应该试图「使它快速」,因为

过早的优化是万恶之源—— Donald Knuth


作者:David Wickes译者:Donng校对:polaris1119

本文由 GCTT 原创编译,Go 中文网 荣誉推出