9.9 协程上下文与调度器

到这里,我们已经看到了下面这些启动协程的方式:

  1. launch(CommonPool) {...}
  2. async(CommonPool) {...}
  3. run(NonCancellable) {...}

这里的CommonPool 和 NonCancellable 是协程上下文(coroutine contexts)。本小节我们简单介绍一下自定义协程上下文。

9.9.1 调度和线程

协程上下文包括一个协程调度程序, 它可以指定由哪个线程来执行协程。调度器可以将协程的执行调度到一个线程池,限制在特定的线程中;也可以不作任何限制,让它无约束地运行。请看下面的示例:

  1. fun testDispatchersAndThreads() = runBlocking {
  2. val jobs = arrayListOf<Job>()
  3. jobs += launch(Unconfined) {
  4. // 未作限制 -- 将会在 main thread 中执行
  5. println("Unconfined: I'm working in thread ${Thread.currentThread()}")
  6. }
  7. jobs += launch(coroutineContext) {
  8. // 父协程的上下文 : runBlocking coroutine
  9. println("coroutineContext: I'm working in thread ${Thread.currentThread()}")
  10. }
  11. jobs += launch(CommonPool) {
  12. // 调度指派给 ForkJoinPool.commonPool
  13. println("CommonPool: I'm working in thread ${Thread.currentThread()}")
  14. }
  15. jobs += launch(newSingleThreadContext("MyOwnThread")) {
  16. // 将会在这个协程自己的新线程中执行
  17. println("newSingleThreadContext: I'm working in thread ${Thread.currentThread()}")
  18. }
  19. jobs.forEach { it.join() }
  20. }

运行上面的代码,我们将得到以下输出 (可能按不同的顺序):

  1. Unconfined: I'm working in thread Thread[main,5,main]
  2. CommonPool: I'm working in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. newSingleThreadContext: I'm working in thread Thread[MyOwnThread,5,main]
  4. context: I'm working in thread Thread[main,5,main]

从上面的结果,我们可以看出:
使用无限制的Unconfined上下文的协程运行在主线程中;
继承了 runBlocking {…} 的context的协程继续在主线程中执行;
而CommonPool在ForkJoinPool.commonPool中;
我们使用newSingleThreadContext函数新建的协程上下文,该协程运行在自己的新线程Thread[MyOwnThread,5,main]中。

另外,我们还可以在使用 runBlocking的时候显式指定上下文, 同时使用 run 函数来更改协程的上下文:

  1. fun log(msg: String) = println("${Thread.currentThread()} $msg")
  2. fun testRunBlockingWithSpecifiedContext() = runBlocking {
  3. log("$context")
  4. log("${context[Job]}")
  5. log("开始")
  6. val ctx1 = newSingleThreadContext("线程A")
  7. val ctx2 = newSingleThreadContext("线程B")
  8. runBlocking(ctx1) {
  9. log("Started in Context1")
  10. run(ctx2) {
  11. log("Working in Context2")
  12. }
  13. log("Back to Context1")
  14. }
  15. log("结束")
  16. }

运行输出:

  1. Thread[main,5,main] [BlockingCoroutine{Active}@b1bc7ed, EventLoopImpl@7cd84586]
  2. Thread[main,5,main] BlockingCoroutine{Active}@b1bc7ed
  3. Thread[main,5,main] 开始
  4. Thread[线程A,5,main] Started in Context1
  5. Thread[线程B,5,main] Working in Context2
  6. Thread[线程A,5,main] Back to Context1
  7. Thread[main,5,main] 结束

9.9.2 父子协程

当我们使用协程A的上下文启动另一个协程B时, B将成为A的子协程。当父协程A任务被取消时, B以及它的所有子协程都会被递归地取消。代码示例如下:

  1. fun testChildrenCoroutine()= runBlocking<Unit> {
  2. val request = launch(CommonPool) {
  3. log("ContextA1: ${context}")
  4. val job1 = launch(CommonPool) {
  5. println("job1: 独立的协程上下文!")
  6. delay(1000)
  7. println("job1: 不会受到request.cancel()的影响")
  8. }
  9. // 继承父上下文:request的context
  10. val job2 = launch(context) {
  11. log("ContextA2: ${context}")
  12. println("job2: 是request coroutine的子协程")
  13. delay(1000)
  14. println("job2: 当request.cancel(),job2也会被取消")
  15. }
  16. job1.join()
  17. job2.join()
  18. }
  19. delay(500)
  20. request.cancel()
  21. delay(1000)
  22. println("main: Who has survived request cancellation?")
  23. }

运行输出:

  1. Thread[ForkJoinPool.commonPool-worker-1,5,main] ContextA1: [StandaloneCoroutine{Active}@5b646af2, CommonPool]
  2. job1: 独立的协程上下文!
  3. Thread[ForkJoinPool.commonPool-worker-3,5,main] ContextA2: [StandaloneCoroutine{Active}@75152aa4, CommonPool]
  4. job2: request coroutine的子协程
  5. job1: 不会受到request.cancel()的影响
  6. main: Who has survived request cancellation?