9.6 协程执行的取消

我们知道,启动函数launch返回一个Job引用当前协程,该Job引用可用于取消正在运行协程:

  1. fun testCancellation() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. repeat(1000) { i ->
  4. println("I'm sleeping $i ... CurrentThread: ${Thread.currentThread()}")
  5. delay(500L)
  6. }
  7. }
  8. delay(1300L)
  9. println("CurrentThread: ${Thread.currentThread()}")
  10. println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  11. val b1 = job.cancel() // cancels the job
  12. println("job cancel: $b1")
  13. delay(1300L)
  14. println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  15. val b2 = job.cancel() // cancels the job, job already canceld, return false
  16. println("job cancel: $b2")
  17. println("main: Now I can quit.")
  18. }

运行上面的代码,将会输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. CurrentThread: Thread[main,5,main]
  5. Job is alive: true Job is completed: false
  6. job cancel: true
  7. Job is alive: false Job is completed: true
  8. job cancel: false
  9. main: Now I can quit.

我们可以看出,当job还在运行时,isAlive是true,isCompleted是false。当调用job.cancel取消该协程任务,cancel函数本身返回true, 此时协程的打印动作就停止了。此时,job的状态是isAlive是false,isCompleted是true。 如果,再次调用job.cancel函数,我们将会看到cancel函数返回的是false。

9.6.1 计算代码的协程取消失效

kotlin 协程的所有suspend 函数都是可以取消的。我们可以通过job的isActive状态来判断协程的状态,或者检查是否有抛出 CancellationException 时取消。

例如,协程正工作在循环计算中,并且不检查协程当前的状态, 那么调用cancel来取消协程将无法停止协程的运行, 如下面的示例所示:

  1. fun testCooperativeCancellation1() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. var nextPrintTime = 0L
  4. var i = 0
  5. while (i < 20) { // computation loop
  6. val currentTime = System.currentTimeMillis()
  7. if (currentTime >= nextPrintTime) {
  8. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  9. nextPrintTime = currentTime + 500L
  10. }
  11. }
  12. }
  13. delay(3000L)
  14. println("CurrentThread: ${Thread.currentThread()}")
  15. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. val b1 = job.cancel() // cancels the job
  17. println("job cancel1: $b1")
  18. println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  19. delay(30000L)
  20. val b2 = job.cancel() // cancels the job, job already canceld, return false
  21. println("job cancel2: $b2")
  22. println("main: Now I can quit.")
  23. }

运行上面的代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. ...
  4. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  5. CurrentThread: Thread[main,5,main]
  6. Before cancel, Job is alive: true Job is completed: false
  7. job cancel1: true
  8. After Cancel, Job is alive: false Job is completed: true
  9. I'm sleeping 7 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  10. ...
  11. I'm sleeping 18 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  12. I'm sleeping 19 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  13. job cancel2: false
  14. main: Now I can quit.

我们可以看出,即使我们调用了cancel函数,当前的job状态isAlive是false了,但是协程的代码依然一直在运行,并没有停止。

9.6.2 计算代码协程的有效取消

有两种方法可以使计算代码取消成功。

方法一: 显式检查取消状态isActive

我们直接给出实现的代码:

  1. fun testCooperativeCancellation2() = runBlocking<Unit> {
  2. val job = launch(CommonPool) {
  3. var nextPrintTime = 0L
  4. var i = 0
  5. while (i < 20) { // computation loop
  6. if (!isActive) {
  7. return@launch
  8. }
  9. val currentTime = System.currentTimeMillis()
  10. if (currentTime >= nextPrintTime) {
  11. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  12. nextPrintTime = currentTime + 500L
  13. }
  14. }
  15. }
  16. delay(3000L)
  17. println("CurrentThread: ${Thread.currentThread()}")
  18. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  19. val b1 = job.cancel() // cancels the job
  20. println("job cancel1: $b1")
  21. println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  22. delay(3000L)
  23. val b2 = job.cancel() // cancels the job, job already canceld, return false
  24. println("job cancel2: $b2")
  25. println("main: Now I can quit.")
  26. }

运行这段代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  5. I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  6. I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  7. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  8. CurrentThread: Thread[main,5,main]
  9. Before cancel, Job is alive: true Job is completed: false
  10. job cancel1: true
  11. After Cancel, Job is alive: false Job is completed: true
  12. job cancel2: false
  13. main: Now I can quit.

正如您所看到的, 现在这个循环可以被取消了。这里的isActive属性是CoroutineScope中的属性。这个接口的定义是:

  1. public interface CoroutineScope {
  2. public val isActive: Boolean
  3. public val context: CoroutineContext
  4. }

该接口用于通用协程构建器的接收器,以便协程中的代码可以方便的访问其isActive状态值(取消状态),以及其上下文CoroutineContext信息。

方法二: 循环调用一个挂起函数yield()

该方法实质上是通过job的isCompleted状态值来捕获CancellationException完成取消功能。

我们只需要在while循环体中循环调用yield()来检查该job的取消状态,如果已经被取消,那么isCompleted值将会是true,yield函数就直接抛出CancellationException异常,从而完成取消的功能:

  1. val job = launch(CommonPool) {
  2. var nextPrintTime = 0L
  3. var i = 0
  4. while (i < 20) { // computation loop
  5. yield()
  6. val currentTime = System.currentTimeMillis()
  7. if (currentTime >= nextPrintTime) {
  8. println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
  9. nextPrintTime = currentTime + 500L
  10. }
  11. }
  12. }

运行上面的代码,输出:

  1. I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  3. I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  4. I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  5. I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  6. I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
  7. I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
  8. CurrentThread: Thread[main,5,main]
  9. Before cancel, Job is alive: true Job is completed: false
  10. job cancel1: true
  11. After Cancel, Job is alive: false Job is completed: true
  12. job cancel2: false
  13. main: Now I can quit.

如果我们想看看yield函数抛出的异常,我们可以加上try catch打印出日志:

  1. try {
  2. yield()
  3. } catch (e: Exception) {
  4. println("$i ${e.message}")
  5. }

我们可以看到类似:Job was cancelled 这样的信息。

这个yield函数的实现是:

  1. suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
  2. val context = cont.context
  3. val job = context[Job]
  4. if (job != null && job.isCompleted) throw job.getCompletionException()
  5. if (cont !is DispatchedContinuation<Unit>) return@sc Unit
  6. if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
  7. cont.dispatchYield(job, Unit)
  8. COROUTINE_SUSPENDED
  9. }

如果调用此挂起函数时,当前协程的Job已经完成 (isActive = false, isCompleted = true),当前协程将以CancellationException取消。

9.6.3 在finally中的协程代码

当我们取消一个协程任务时,如果有try {...} finally {...}代码块,那么finally {…}中的代码会被正常执行完毕:

  1. fun finallyCancelDemo() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. println("I'm running finally")
  10. }
  11. }
  12. delay(2000L)
  13. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  14. job.cancel()
  15. println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. delay(2000L)
  17. println("main: Now I can quit.")
  18. }

运行这段代码,输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...
  5. Before cancel, Job is alive: true Job is completed: false
  6. I'm running finally
  7. After cancel, Job is alive: false Job is completed: true
  8. main: Now I can quit.

我们可以看出,在调用cancel之后,就算当前协程任务Job已经结束了,finally{...}中的代码依然被正常执行。

但是,如果我们在finally{...}中放入挂起函数:

  1. fun finallyCancelDemo() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. println("I'm running finally")
  10. delay(1000L)
  11. println("And I've delayed for 1 sec ?")
  12. }
  13. }
  14. delay(2000L)
  15. println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  16. job.cancel()
  17. println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
  18. delay(2000L)
  19. println("main: Now I can quit.")
  20. }

运行上述代码,我们将会发现只输出了一句:I’m running finally。因为主线程在挂起函数delay(1000L)以及后面的打印逻辑还没执行完,就已经结束退出。

  1. finally {
  2. println("I'm running finally")
  3. delay(1000L)
  4. println("And I've delayed for 1 sec ?")
  5. }

9.6.4 协程执行不可取消的代码块

如果我们想要上面的例子中的finally{...}完整执行,不被取消函数操作所影响,我们可以使用 run 函数和 NonCancellable 上下文将相应的代码包装在 run (NonCancellable) {…} 中, 如下面的示例所示:

  1. fun testNonCancellable() = runBlocking {
  2. val job = launch(CommonPool) {
  3. try {
  4. repeat(1000) { i ->
  5. println("I'm sleeping $i ...")
  6. delay(500L)
  7. }
  8. } finally {
  9. run(NonCancellable) {
  10. println("I'm running finally")
  11. delay(1000L)
  12. println("And I've just delayed for 1 sec because I'm non-cancellable")
  13. }
  14. }
  15. }
  16. delay(2000L)
  17. println("main: I'm tired of waiting!")
  18. job.cancel()
  19. delay(2000L)
  20. println("main: Now I can quit.")
  21. }

运行输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. I'm sleeping 3 ...
  5. main: I'm tired of waiting!
  6. I'm running finally
  7. And I've just delayed for 1 sec because I'm non-cancellable
  8. main: Now I can quit.