轻量级线程:协程

在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-class或者library的形式提供对基于协程的并发模型的支持。其中比较典型的有Scheme、Lua、Python、Perl、Go等以first-class的方式提供对协程的支持。

同样地,Kotlin也支持协程。

本章我们主要介绍:

  • 什么是协程
  • 协程的用法实例
  • 挂起函数
  • 通道与管道
  • 协程的实现原理
  • coroutine库等

协程简介

从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升。如果将程序分为IO密集型应用和CPU密集型应用,二者的发展历程大致如下:

IO密集型应用: 多进程->多线程->事件驱动->协程

CPU密集型应用:多进程->多线程

如果说多进程对于多CPU,多线程对应多核CPU,那么事件驱动和协程则是在充分挖掘不断提高性能的单核CPU的潜力。

常见的有性能瓶颈的API (例如网络 IO、文件 IO、CPU 或 GPU 密集型任务等),要求调用者阻塞(blocking)直到它们完成才能进行下一步。后来,我们又使用异步回调的方式来实现非阻塞,但是异步回调代码写起来并不简单。

协程提供了一种避免阻塞线程并用更简单、更可控的操作替代线程阻塞的方法:协程挂起。

协程主要是让原来要使用“异步+回调方式”写出来的复杂代码, 简化成可以用看似同步的方式写出来(对线程的操作进一步抽象)。这样我们就可以按串行的思维模型去组织原本分散在不同上下文中的代码逻辑,而不需要去处理复杂的状态同步问题。

协程最早的描述是由Melvin Conway于1958年给出:“subroutines who act as the master program”(与主程序行为类似的子例程)。此后他又在博士论文中给出了如下定义:

  • 数据在后续调用中始终保持( The values of data local to a coroutine persist between successive calls 协程的局部)

  • 当控制流程离开时,协程的执行被挂起,此后控制流程再次进入这个协程时,这个协程只应从上次离开挂起的地方继续 (The execution of a coroutine is suspended as control leaves it, only to carry on where it left off when control re-enters the coroutine at some later stage)。

协程的实现要维护一组局部状态,在重新进入协程前,保证这些状态不被改变,从而能顺利定位到之前的位置。

协程可以用来解决很多问题,比如nodejs的嵌套回调,Erlang以及Golang的并发模型实现等。

实质上,协程(coroutine)是一种用户态的轻量级线程。它由协程构建器(launch coroutine builder)启动。

下面我们通过代码实践来学习协程的相关内容。

搭建协程代码工程

首先,我们来新建一个Kotlin Gradle工程。生成标准gradle工程后,在配置文件build.gradle中,配置kotlinx-coroutines-core依赖:

添加 dependencies :

  1. compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.16'

kotlinx-coroutines还提供了下面的模块:

  1. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-jdk8', version: '0.16'
  2. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-nio', version: '0.16'
  3. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-reactive', version: '0.16'

我们使用Kotlin最新的1.1.3-2 版本:

  1. buildscript {
  2. ext.kotlin_version = '1.1.3-2'
  3. ...
  4. dependencies {
  5. classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
  6. }
  7. }

其中,kotlin-gradle-plugin是Kotlin集成Gradle的插件。

另外,配置一下JCenter 的仓库:

  1. repositories {
  2. jcenter()
  3. }

简单协程示例

下面我们先来看一个简单的协程示例。

运行下面的代码:

  1. fun firstCoroutineDemo0() {
  2. launch(CommonPool) {
  3. delay(3000L, TimeUnit.MILLISECONDS)
  4. println("Hello,")
  5. }
  6. println("World!")
  7. Thread.sleep(5000L)
  8. }

你将会发现输出:

  1. World!
  2. Hello,

上面的这段代码:

  1. launch(CommonPool) {
  2. delay(3000L, TimeUnit.MILLISECONDS)
  3. println("Hello,")
  4. }

等价于:

  1. launch(CommonPool, CoroutineStart.DEFAULT, {
  2. delay(3000L, TimeUnit.MILLISECONDS)
  3. println("Hello, ")
  4. })

launch函数

这个launch函数定义在kotlinx.coroutines.experimental下面。

  1. public fun launch(
  2. context: CoroutineContext,
  3. start: CoroutineStart = CoroutineStart.DEFAULT,
  4. block: suspend CoroutineScope.() -> Unit
  5. ): Job {
  6. val newContext = newCoroutineContext(context)
  7. val coroutine = if (start.isLazy)
  8. LazyStandaloneCoroutine(newContext, block) else
  9. StandaloneCoroutine(newContext, active = true)
  10. coroutine.initParentJob(context[Job])
  11. start(block, coroutine, coroutine)
  12. return coroutine
  13. }

launch函数有3个入参:context、start、block,这些函数参数分别说明如下:

参数 说明
context 协程上下文
start 协程启动选项
block 协程真正要执行的代码块,必须是suspend修饰的挂起函数

这个launch函数返回一个Job类型,Job是协程创建的后台任务的概念,它持有该协程的引用。Job接口实际上继承自CoroutineContext类型。一个Job有如下三种状态:

State isActive isCompleted
New (optional initial state) 新建 (可选的初始状态) false false
Active (default initial state) 活动中(默认初始状态) true false
Completed (final state) 已结束(最终状态) false true

也就是说,launch函数它以非阻塞(non-blocking)当前线程的方式,启动一个新的协程后台任务,并返回一个Job类型的对象作为当前协程的引用。

另外,这里的delay()函数类似Thread.sleep()的功能,但更好的是:它不会阻塞线程,而只是挂起协程本身。当协程在等待时,线程将返回到池中, 当等待完成时, 协程将在池中的空闲线程上恢复。

CommonPool:共享线程池

我们再来看一下launch(CommonPool) {...}这段代码。

首先,这个CommonPool是代表共享线程池,它的主要作用是用来调度计算密集型任务的协程的执行。它的实现使用的是java.util.concurrent包下面的API。它首先尝试创建一个java.util.concurrent.ForkJoinPool (ForkJoinPool是一个可以执行ForkJoinTask的ExcuteService,它采用了work-stealing模式:所有在池中的线程尝试去执行其他线程创建的子任务,这样很少有线程处于空闲状态,更加高效);如果不可用,就使用java.util.concurrent.Executors来创建一个普通的线程池:Executors.newFixedThreadPool。相关代码在kotlinx/coroutines/experimental/CommonPool.kt中:

  1. private fun createPool(): ExecutorService {
  2. val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
  3. ?: return createPlainPool()
  4. if (!usePrivatePool) {
  5. Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
  6. ?.let { return it }
  7. }
  8. Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
  9. ?. let { return it }
  10. return createPlainPool()
  11. }
  12. private fun createPlainPool(): ExecutorService {
  13. val threadId = AtomicInteger()
  14. return Executors.newFixedThreadPool(defaultParallelism()) {
  15. Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
  16. }
  17. }

这个CommonPool对象类是CoroutineContext的子类型。它们的类型集成层次结构如下:

协程

挂起函数

代码块中的delay(3000L, TimeUnit.MILLISECONDS)函数,是一个用suspend关键字修饰的函数,我们称之为挂起函数。挂起函数只能从协程代码内部调用,普通的非协程的代码不能调用。

挂起函数只允许由协程或者另外一个挂起函数里面调用, 例如我们在协程代码中调用一个挂起函数,代码示例如下:

  1. suspend fun runCoroutineDemo() {
  2. run(CommonPool) {
  3. delay(3000L, TimeUnit.MILLISECONDS)
  4. println("suspend,")
  5. }
  6. println("runCoroutineDemo!")
  7. Thread.sleep(5000L)
  8. }
  9. fun callSuspendFun() {
  10. launch(CommonPool) {
  11. runCoroutineDemo()
  12. }
  13. }

如果我们用Java中的Thread类来写类似功能的代码,上面的代码可以写成这样:

  1. fun threadDemo0() {
  2. Thread({
  3. Thread.sleep(3000L)
  4. println("Hello,")
  5. }).start()
  6. println("World!")
  7. Thread.sleep(5000L)
  8. }

输出结果也是:

World! Hello, 另外, 我们不能使用Thread来启动协程代码。例如下面的写法编译器会报错:

  1. /**
  2. * 错误反例:用线程调用协程 error
  3. */
  4. fun threadCoroutineDemo() {
  5. Thread({
  6. delay(3000L, TimeUnit.MILLISECONDS) // error, Suspend functions are only allowed to be called from a coroutine or another suspend function
  7. println("Hello,")
  8. })
  9. println("World!")
  10. Thread.sleep(5000L)
  11. }

概念解释: Continuation 与 suspension point

协程的执行其实是断断续续的: 执行一段, 挂起来, 再执行一段, 再挂起来, … , 每个挂起的地方是一个suspension point, 每一小段执行是一个Continuation.协程的执行流被它的 “suspension point” 分割成了很多个 “Continuation” .我们可以用一条画了很多点的线段来表示:

轻量级线程:协程1 - 图2

其中的Continuation 0比较特殊, 是从起点开始, 到第一个suspension point结束, 由于它的特殊性, 又被称为Initial Continuation.

协程创建后, 并不总是立即执行, 要分是怎么创建的协程, 通过launch方法的第二个参数是一个枚举类型CoroutineStart, 如果不填, 默认值是DEFAULT, 那么协程创建后立即启动, 如果传入LAZY, 创建后就不会立即启动, 直到调用Job的start方法才会启动.

桥接 阻塞和非阻塞

上面的例子中,我们给出的是使用非阻塞的delay函数,同时又使用了阻塞的Thread.sleep函数,这样代码写在一起可读性不是那么地好。让我们来使用纯的Kotlin的协程代码来实现上面的 阻塞+非阻塞 的例子(不用Thread)。

runBlocking函数

Kotlin中提供了runBlocking 函数来实现类似主协程的功能:

  1. fun main(args: Array<String>) = runBlocking<Unit> {
  2. // 主协程
  3. println("${format(Date())}: T0")
  4. // 启动主协程
  5. launch(CommonPool) {
  6. //在common thread pool中创建协程
  7. println("${format(Date())}: T1")
  8. delay(3000L)
  9. println("${format(Date())}: T2 Hello,")
  10. }
  11. println("${format(Date())}: T3 World!") // 当子协程被delay,主协程仍然继续运行
  12. delay(5000L)
  13. println("${format(Date())}: T4")
  14. }

运行结果:

  1. 14:37:59.640: T0
  2. 14:37:59.721: T1
  3. 14:37:59.721: T3 World!
  4. 14:38:02.763: T2 Hello,
  5. 14:38:04.738: T4

可以发现,运行结果跟之前的是一样的,但是我们没有使用Thread.sleep,我们只使用了非阻塞的delay函数。如果main函数不加 = runBlocking<Unit> , 那么我们是不能在main函数体内调用delay(5000L)的。

如果这个阻塞的线程被中断,runBlocking抛出InterruptedException异常。

该runBlocking函数不是用来当做普通协程函数使用的,它的设计主要是用来桥接普通阻塞代码和挂起风格的(suspending style)的非阻塞代码的, 例如用在 main 函数中,或者用于测试用例代码中。

  1. @RunWith(JUnit4::class)
  2. class RunBlockingTest {
  3. @Test fun testRunBlocking() = runBlocking<Unit> {
  4. // 这样我们就可以在这里调用任何suspend fun了
  5. launch(CommonPool) {
  6. delay(3000L)
  7. }
  8. delay(5000L)
  9. }
  10. }

等待一个任务执行完毕

我们先来看一段代码:

  1. fun firstCoroutineDemo() {
  2. launch(CommonPool) {
  3. delay(3000L, TimeUnit.MILLISECONDS)
  4. println("[firstCoroutineDemo] Hello, 1")
  5. }
  6. launch(CommonPool, CoroutineStart.DEFAULT, {
  7. delay(3000L, TimeUnit.MILLISECONDS)
  8. println("[firstCoroutineDemo] Hello, 2")
  9. })
  10. println("[firstCoroutineDemo] World!")
  11. }

运行这段代码,我们会发现只输出:

  1. [firstCoroutineDemo] World!

这是为什么?

为了弄清上面的代码执行的内部过程,我们打印一些日志看下:

  1. fun testJoinCoroutine() = runBlocking<Unit> {
  2. // Start a coroutine
  3. val c1 = launch(CommonPool) {
  4. println("C1 Thread: ${Thread.currentThread()}")
  5. println("C1 Start")
  6. delay(3000L)
  7. println("C1 World! 1")
  8. }
  9. val c2 = launch(CommonPool) {
  10. println("C2 Thread: ${Thread.currentThread()}")
  11. println("C2 Start")
  12. delay(5000L)
  13. println("C2 World! 2")
  14. }
  15. println("Main Thread: ${Thread.currentThread()}")
  16. println("Hello,")
  17. println("Hi,")
  18. println("c1 is active: ${c1.isActive} ${c1.isCompleted}")
  19. println("c2 is active: ${c2.isActive} ${c2.isCompleted}")
  20. }

再次运行:

  1. C1 Thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. C1 Start
  3. C2 Thread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  4. C2 Start
  5. Main Thread: Thread[main,5,main]
  6. Hello,
  7. Hi,
  8. c1 is active: true false
  9. c2 is active: true false

我们可以看到,这里的C1、C2代码也开始执行了,使用的是ForkJoinPool.commonPool-worker线程池中的worker线程。但是,我们在代码执行到最后打印出这两个协程的状态isCompleted都是false,这表明我们的C1、C2的代码,在Main Thread结束的时刻(此时的运行main函数的Java进程也退出了),还没有执行完毕,然后就跟着主线程一起退出结束了。

所以我们可以得出结论:运行 main () 函数的主线程, 必须要等到我们的协程完成之前结束 , 否则我们的程序在 打印Hello, 1和Hello, 2之前就直接结束掉了。

我们怎样让这两个协程参与到主线程的时间顺序里呢?我们可以使用join, 让主线程一直等到当前协程执行完毕再结束, 例如下面的这段代码

  1. fun testJoinCoroutine() = runBlocking<Unit> {
  2. // Start a coroutine
  3. val c1 = launch(CommonPool) {
  4. println("C1 Thread: ${Thread.currentThread()}")
  5. println("C1 Start")
  6. delay(3000L)
  7. println("C1 World! 1")
  8. }
  9. val c2 = launch(CommonPool) {
  10. println("C2 Thread: ${Thread.currentThread()}")
  11. println("C2 Start")
  12. delay(5000L)
  13. println("C2 World! 2")
  14. }
  15. println("Main Thread: ${Thread.currentThread()}")
  16. println("Hello,")
  17. println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
  18. println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
  19. c1.join() // the main thread will wait until child coroutine completes
  20. println("Hi,")
  21. println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
  22. println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
  23. c2.join() // the main thread will wait until child coroutine completes
  24. println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
  25. println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
  26. }

将会输出:

  1. C1 Thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. C1 Start
  3. C2 Thread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  4. C2 Start
  5. Main Thread: Thread[main,5,main]
  6. Hello,
  7. c1 is active: true isCompleted: false
  8. c2 is active: true isCompleted: false
  9. C1 World! 1
  10. Hi,
  11. c1 is active: false isCompleted: true
  12. c2 is active: true isCompleted: false
  13. C2 World! 2
  14. c1 is active: false isCompleted: true
  15. c2 is active: false isCompleted: true

通常,良好的代码风格我们会把一个单独的逻辑放到一个独立的函数中,我们可以重构上面的代码如下:

  1. fun testJoinCoroutine2() = runBlocking<Unit> {
  2. // Start a coroutine
  3. val c1 = launch(CommonPool) {
  4. fc1()
  5. }
  6. val c2 = launch(CommonPool) {
  7. fc2()
  8. }
  9. ...
  10. }
  11. private suspend fun fc2() {
  12. println("C2 Thread: ${Thread.currentThread()}")
  13. println("C2 Start")
  14. delay(5000L)
  15. println("C2 World! 2")
  16. }
  17. private suspend fun fc1() {
  18. println("C1 Thread: ${Thread.currentThread()}")
  19. println("C1 Start")
  20. delay(3000L)
  21. println("C1 World! 1")
  22. }

可以看出,我们这里的fc1, fc2函数是suspend fun。

协程是轻量级的

直接运行下面的代码:

  1. fun testThread() {
  2. val jobs = List(100_1000) {
  3. Thread({
  4. Thread.sleep(1000L)
  5. print(".")
  6. })
  7. }
  8. jobs.forEach { it.start() }
  9. jobs.forEach { it.join() }
  10. }

我们应该会看到输出报错:

  1. Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
  2. at java.lang.Thread.start0(Native Method)
  3. at java.lang.Thread.start(Thread.java:714)
  4. at com.easy.kotlin.LightWeightCoroutinesDemo.testThread(LightWeightCoroutinesDemo.kt:30)
  5. at com.easy.kotlin.LightWeightCoroutinesDemoKt.main(LightWeightCoroutinesDemo.kt:40)
  6. ...........................................................................................

我们这里直接启动了100,000个线程,并join到一起打印”.”, 不出意外的我们收到了 java.lang.OutOfMemoryError

这个异常问题本质原因是我们创建了太多的线程,而能创建的线程数是有限制的,导致了异常的发生。在Java中, 当我们创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 能创建的线程数的具体计算公式如下:

Number of Threads = (MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize)

其中,参数说明如下:

参数 说明
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory JVM内存
ReservedOsMemory 保留的操作系统内存
ThreadStackSize 线程栈的大小

我们通常在优化这种问题的时候,要么是采用减小thread stack的大小的方法,要么是采用减小heap或permgen初始分配的大小方法等方式来临时解决问题。

在协程中,情况完全就不一样了。我们看一下实现上面的逻辑的协程代码:

  1. fun testLightWeightCoroutine() = runBlocking {
  2. val jobs = List(100_000) {
  3. // create a lot of coroutines and list their jobs
  4. launch(CommonPool) {
  5. delay(1000L)
  6. print(".")
  7. }
  8. }
  9. jobs.forEach { it.join() } // wait for all jobs to complete
  10. }

运行上面的代码,我们将看到输出:

  1. START: 21:22:28.913
  2. .....................
  3. .....................(100000个)
  4. .....END: 21:22:30.956

上面的程序在2s左右的时间内正确执行完毕。

协程 vs 守护线程

在Java中有两类线程:用户线程 (User Thread)、守护线程 (Daemon Thread)。

所谓守护线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。

我们来看一段Thread的守护线程的代码:

  1. fun testDaemon2() {
  2. val t = Thread({
  3. repeat(100) { i ->
  4. println("I'm sleeping $i ...")
  5. Thread.sleep(500L)
  6. }
  7. })
  8. t.isDaemon = true // 必须在启动线程前调用,否则会报错:Exception in thread "main" java.lang.IllegalThreadStateException
  9. t.start()
  10. Thread.sleep(2000L) // just quit after delay
  11. }

这段代码启动一个线程,并设置为守护线程。线程内部是间隔500ms 重复打印100次输出。外部主线程睡眠2s。

运行这段代码,将会输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...

协程跟守护线程很像,用协程来写上面的逻辑,代码如下:

  1. fun testDaemon1() = runBlocking {
  2. launch(CommonPool) {
  3. repeat(100) { i ->
  4. println("I'm sleeping $i ...")
  5. delay(500L)
  6. }
  7. }
  8. delay(2000L) // just quit after delay
  9. }

运行这段代码,我们发现也输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...

我们可以看出,活动的协程不会使进程保持活动状态。它们的行为就像守护程序线程。

协程执行的取消

我们知道,启动函数launch返回一个Job引用当前协程,该Job引用可用于取消正在运行协程:

  1. fun testCancellation() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. repeat(1000) { i ->
  4. println("I'm sleeping $i ... CurrentThread: ${Thread.currentThread()}")
  5. delay(500L)
  6. }
  7. }
  8. delay(1300L)
  9. println("CurrentThread: ${Thread.currentThread()}")
  10. println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  11. val b1 = job.cancel() // cancels the job
  12. println("job cancel: $b1")
  13. delay(1300L)
  14. println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  15. val b2 = job.cancel() // cancels the job, job already canceld, return false
  16. println("job cancel: $b2")
  17. println("main: Now I can quit.")
  18. }

运行上面的代码,将会输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. CurrentThread: Thread[main,5,main]
  5. Job is alive: true Job is completed: false
  6. job cancel: true
  7. Job is alive: false Job is completed: true
  8. job cancel: false
  9. main: Now I can quit.

我们可以看出,当job还在运行时,isAlive是true,isCompleted是false。当调用job.cancel取消该协程任务,cancel函数本身返回true, 此时协程的打印动作就停止了。此时,job的状态是isAlive是false,isCompleted是true。 如果,再次调用job.cancel函数,我们将会看到cancel函数返回的是false。

计算代码的协程取消失效

kotlinx 协程的所有suspend函数都是可以取消的。我们可以通过job的isActive状态来判断协程的状态,或者检查是否有抛出 CancellationException 时取消。

例如,协程正工作在循环计算中,并且不检查协程当前的状态, 那么调用cancel来取消协程将无法停止协程的运行, 如下面的示例所示:

  1. fun testCooperativeCancellation1() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. var nextPrintTime = 0L
  4. var i = 0
  5. while (i < 20) { // computation loop
  6. val currentTime = System.currentTimeMillis()
  7. if (currentTime >= nextPrintTime) {
  8. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  9. nextPrintTime = currentTime + 500L
  10. }
  11. }
  12. }
  13. delay(3000L)
  14. println("CurrentThread: ${Thread.currentThread()}")
  15. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. val b1 = job.cancel() // cancels the job
  17. println("job cancel1: $b1")
  18. println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  19. delay(30000L)
  20. val b2 = job.cancel() // cancels the job, job already canceld, return false
  21. println("job cancel2: $b2")
  22. println("main: Now I can quit.")
  23. }

运行上面的代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. ...
  4. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  5. CurrentThread: Thread[main,5,main]
  6. Before cancel, Job is alive: true Job is completed: false
  7. job cancel1: true
  8. After Cancel, Job is alive: false Job is completed: true
  9. I'm sleeping 7 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  10. ...
  11. I'm sleeping 18 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  12. I'm sleeping 19 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  13. job cancel2: false
  14. main: Now I can quit.

我们可以看出,即使我们调用了cancel函数,当前的job状态isAlive是false了,但是协程的代码依然一直在运行,并没有停止。

计算代码协程的有效取消

有两种方法可以使计算代码取消成功。

方法一: 显式检查取消状态isActive

我们直接给出实现的代码:

  1. fun testCooperativeCancellation2() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. var nextPrintTime = 0L
  4. var i = 0
  5. while (i < 20) { // computation loop
  6. if (!isActive) {
  7. return@launch
  8. }
  9. val currentTime = System.currentTimeMillis()
  10. if (currentTime >= nextPrintTime) {
  11. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  12. nextPrintTime = currentTime + 500L
  13. }
  14. }
  15. }
  16. delay(3000L)
  17. println("CurrentThread: ${Thread.currentThread()}")
  18. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  19. val b1 = job.cancel() // cancels the job
  20. println("job cancel1: $b1")
  21. println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  22. delay(3000L)
  23. val b2 = job.cancel() // cancels the job, job already canceld, return false
  24. println("job cancel2: $b2")
  25. println("main: Now I can quit.")
  26. }

运行这段代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  5. I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  6. I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  7. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  8. CurrentThread: Thread[main,5,main]
  9. Before cancel, Job is alive: true Job is completed: false
  10. job cancel1: true
  11. After Cancel, Job is alive: false Job is completed: true
  12. job cancel2: false
  13. main: Now I can quit.

正如您所看到的, 现在这个循环可以被取消了。这里的isActive属性是CoroutineScope中的属性。这个接口的定义是:

  1. public interface CoroutineScope {
  2. public val isActive: Boolean
  3. public val context: CoroutineContext
  4. }

该接口用于通用协程构建器的接收器,以便协程中的代码可以方便的访问其isActive状态值(取消状态),以及其上下文CoroutineContext信息。

方法二: 循环调用一个挂起函数yield()

该方法实质上是通过job的isCompleted状态值来捕获CancellationException完成取消功能。

我们只需要在while循环体中循环调用yield()来检查该job的取消状态,如果已经被取消,那么isCompleted值将会是true,yield函数就直接抛出CancellationException异常,从而完成取消的功能:

  1. val job = launch(CommonPool) {
  2. var nextPrintTime = 0L
  3. var i = 0
  4. while (i < 20) { // computation loop
  5. yield()
  6. val currentTime = System.currentTimeMillis()
  7. if (currentTime >= nextPrintTime) {
  8. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  9. nextPrintTime = currentTime + 500L
  10. }
  11. }
  12. }

运行上面的代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  4. I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  5. I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  6. I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  7. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  8. CurrentThread: Thread[main,5,main]
  9. Before cancel, Job is alive: true Job is completed: false
  10. job cancel1: true
  11. After Cancel, Job is alive: false Job is completed: true
  12. job cancel2: false
  13. main: Now I can quit.

如果我们想看看yield函数抛出的异常,我们可以加上try catch打印出日志:

  1. try {
  2. yield()
  3. } catch (e: Exception) {
  4. println("$i ${e.message}")
  5. }

我们可以看到类似:Job was cancelled 这样的信息。

这个yield函数的实现是:

  1. suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
  2. val context = cont.context
  3. val job = context[Job]
  4. if (job != null && job.isCompleted) throw job.getCompletionException()
  5. if (cont !is DispatchedContinuation<Unit>) return@sc Unit
  6. if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
  7. cont.dispatchYield(job, Unit)
  8. COROUTINE_SUSPENDED
  9. }

如果调用此挂起函数时,当前协程的Job已经完成 (isActive = false, isCompleted = true),当前协程将以CancellationException取消。

在finally中的协程代码

当我们取消一个协程任务时,如果有try {...} finally {...}代码块,那么finally {…}中的代码会被正常执行完毕:

  1. fun finallyCancelDemo() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. println("I'm running finally")
  10. }
  11. }
  12. delay(2000L)
  13. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  14. job.cancel()
  15. println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. delay(2000L)
  17. println("main: Now I can quit.")
  18. }

运行这段代码,输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...
  5. Before cancel, Job is alive: true Job is completed: false
  6. I'm running finally
  7. After cancel, Job is alive: false Job is completed: true
  8. main: Now I can quit.

我们可以看出,在调用cancel之后,就算当前协程任务Job已经结束了,finally{...}中的代码依然被正常执行。

但是,如果我们在finally{...}中放入挂起函数:

  1. fun finallyCancelDemo() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. println("I'm running finally")
  10. delay(1000L)
  11. println("And I've delayed for 1 sec ?")
  12. }
  13. }
  14. delay(2000L)
  15. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. job.cancel()
  17. println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  18. delay(2000L)
  19. println("main: Now I can quit.")
  20. }

运行上述代码,我们将会发现只输出了一句:I’m running finally。因为主线程在挂起函数delay(1000L)以及后面的打印逻辑还没执行完,就已经结束退出。

  1. } finally {
  2. println("I'm running finally")
  3. delay(1000L)
  4. println("And I've delayed for 1 sec ?")
  5. }

协程执行不可取消的代码块

如果我们想要上面的例子中的finally{...}完整执行,不被取消函数操作所影响,我们可以使用 run 函数和 NonCancellable 上下文将相应的代码包装在 run (NonCancellable) {…} 中, 如下面的示例所示:

  1. fun testNonCancellable() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. run(NonCancellable) {
  10. println("I'm running finally")
  11. delay(1000L)
  12. println("And I've just delayed for 1 sec because I'm non-cancellable")
  13. }
  14. }
  15. }
  16. delay(2000L)
  17. println("main: I'm tired of waiting!")
  18. job.cancel()
  19. delay(2000L)
  20. println("main: Now I can quit.")
  21. }

运行输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...
  5. main: I'm tired of waiting!
  6. I'm running finally
  7. And I've just delayed for 1 sec because I'm non-cancellable
  8. main: Now I can quit.

设置协程超时时间

我们通常取消协同执行的原因给协程的执行时间设定一个执行时间上限。我们也可以使用 withTimeout 函数来给一个协程任务的执行设定最大执行时间,超出这个时间,就直接终止掉。代码示例如下:

  1. fun testTimeouts() = runBlocking {
  2. withTimeout(3000L) {
  3. repeat(100) { i ->
  4. println("I'm sleeping $i ...")
  5. delay(500L)
  6. }
  7. }
  8. }

运行上述代码,我们将会看到如下输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...
  5. I'm sleeping 4 ...
  6. I'm sleeping 5 ...
  7. Exception in thread "main" kotlinx.coroutines.experimental.TimeoutException: Timed out waiting for 3000 MILLISECONDS
  8. at kotlinx.coroutines.experimental.TimeoutExceptionCoroutine.run(Scheduled.kt:110)
  9. at kotlinx.coroutines.experimental.EventLoopImpl$DelayedRunnableTask.invoke(EventLoop.kt:199)
  10. at kotlinx.coroutines.experimental.EventLoopImpl$DelayedRunnableTask.invoke(EventLoop.kt:195)
  11. at kotlinx.coroutines.experimental.EventLoopImpl.processNextEvent(EventLoop.kt:111)
  12. at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:205)
  13. at kotlinx.coroutines.experimental.BuildersKt.runBlocking(Builders.kt:150)
  14. at kotlinx.coroutines.experimental.BuildersKt.runBlocking$default(Builders.kt:142)
  15. at com.easy.kotlin.CancellingCoroutineDemo.testTimeouts(CancellingCoroutineDemo.kt:169)
  16. at com.easy.kotlin.CancellingCoroutineDemoKt.main(CancellingCoroutineDemo.kt:193)

由 withTimeout 抛出的 TimeoutException 是 CancellationException 的一个子类。这个TimeoutException类型定义如下:

  1. private class TimeoutException(
  2. time: Long,
  3. unit: TimeUnit,
  4. @JvmField val coroutine: Job
  5. ) : CancellationException("Timed out waiting for $time $unit")

如果您需要在超时时执行一些附加操作, 则可以把逻辑放在 try {…} catch (e: CancellationException) {…} 代码块中。例如:

  1. try {
  2. ccd.testTimeouts()
  3. } catch (e: CancellationException) {
  4. println("I am timed out!")
  5. }