取消与超时

这一部分包含了协程的取消与超时。

取消协程的执行

在一个长时间运行的应用程序中,你也许需要对你的后台协程进行细粒度的控制。 比如说,一个用户也许关闭了一个启动了协程的界面,那么现在协程的执行结果已经不再被需要了,这时,它应该是可以被取消的。 该 launch 函数返回了一个可以被用来取消运行中的协程的 Job

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val job = launch {
  5. repeat(1000) { i ->
  6. println("job: I'm sleeping $i ...")
  7. delay(500L)
  8. }
  9. }
  10. delay(1300L) // 延迟一段时间
  11. println("main: I'm tired of waiting!")
  12. job.cancel() // 取消该作业
  13. job.join() // 等待作业执行结束
  14. println("main: Now I can quit.")
  15. //sampleEnd
  16. }

可以在这里获取完整代码。

取消与超时 - 图1

程序执行后的输出如下:

  1. job: I'm sleeping 0 ...
  2. job: I'm sleeping 1 ...
  3. job: I'm sleeping 2 ...
  4. main: I'm tired of waiting!
  5. main: Now I can quit.

一旦 main 函数调用了 job.cancel,我们在其它的协程中就看不到任何输出,因为它被取消了。 这里也有一个可以使 Job 挂起的函数 cancelAndJoin 它合并了对 cancel 以及 join 的调用。

取消是协作的

协程的取消是 协作 的。一段协程代码必须协作才能被取消。 所有 kotlinx.coroutines 中的挂起函数都是 可被取消的 。它们检查协程的取消, 并在取消时抛出 CancellationException。 然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的,就如如下示例代码所示:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val startTime = System.currentTimeMillis()
  5. val job = launch(Dispatchers.Default) {
  6. var nextPrintTime = startTime
  7. var i = 0
  8. while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
  9. // 每秒打印消息两次
  10. if (System.currentTimeMillis() >= nextPrintTime) {
  11. println("job: I'm sleeping ${i++} ...")
  12. nextPrintTime += 500L
  13. }
  14. }
  15. }
  16. delay(1300L) // 等待一段时间
  17. println("main: I'm tired of waiting!")
  18. job.cancelAndJoin() // 取消一个作业并且等待它结束
  19. println("main: Now I can quit.")
  20. //sampleEnd
  21. }

可以在这里获取完整代码。

取消与超时 - 图2

运行示例代码,并且我们可以看到它连续打印出了“I’m sleeping”,甚至在调用取消后, 作业仍然执行了五次循环迭代并运行到了它结束为止。

The same problem can be observed by catching a CancellationException and not rethrowing it:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val job = launch(Dispatchers.Default) {
  5. repeat(5) { i ->
  6. try {
  7. // print a message twice a second
  8. println("job: I'm sleeping $i ...")
  9. delay(500)
  10. } catch (e: Exception) {
  11. // log the exception
  12. println(e)
  13. }
  14. }
  15. }
  16. delay(1300L) // delay a bit
  17. println("main: I'm tired of waiting!")
  18. job.cancelAndJoin() // cancels the job and waits for its completion
  19. println("main: Now I can quit.")
  20. //sampleEnd
  21. }

You can get the full code here.

取消与超时 - 图3

While catching Exception is an anti-pattern, this issue may surface in more subtle ways, like when using the runCatching function, which does not rethrow CancellationException.

使计算代码可取消

我们有两种方法来使执行计算的代码可以被取消。第一种方法是定期调用挂起函数来检查取消。对于这种目的 yield 是一个好的选择。 另一种方法是显式的检查取消状态。让我们试试第二种方法。

将前一个示例中的 while (i < 5) 替换为 while (isActive) 并重新运行它。

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val startTime = System.currentTimeMillis()
  5. val job = launch(Dispatchers.Default) {
  6. var nextPrintTime = startTime
  7. var i = 0
  8. while (isActive) { // 可以被取消的计算循环
  9. // 每秒打印消息两次
  10. if (System.currentTimeMillis() >= nextPrintTime) {
  11. println("job: I'm sleeping ${i++} ...")
  12. nextPrintTime += 500L
  13. }
  14. }
  15. }
  16. delay(1300L) // 等待一段时间
  17. println("main: I'm tired of waiting!")
  18. job.cancelAndJoin() // 取消该作业并等待它结束
  19. println("main: Now I can quit.")
  20. //sampleEnd
  21. }

可以在这里获取完整代码。

取消与超时 - 图4

你可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。

finally 中释放资源

我们通常使用如下的方法处理在被取消时抛出 CancellationException 的可被取消的挂起函数。比如说,try {……} finally {……} 表达式以及 Kotlin 的 use 函数一般在协程被取消的时候执行它们的终结动作:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val job = launch {
  5. try {
  6. repeat(1000) { i ->
  7. println("job: I'm sleeping $i ...")
  8. delay(500L)
  9. }
  10. } finally {
  11. println("job: I'm running finally")
  12. }
  13. }
  14. delay(1300L) // 延迟一段时间
  15. println("main: I'm tired of waiting!")
  16. job.cancelAndJoin() // 取消该作业并且等待它结束
  17. println("main: Now I can quit.")
  18. //sampleEnd
  19. }

可以在这里获取完整代码。

取消与超时 - 图5

joincancelAndJoin 等待了所有的终结动作执行完毕, 所以运行示例得到了下面的输出:

  1. job: I'm sleeping 0 ...
  2. job: I'm sleeping 1 ...
  3. job: I'm sleeping 2 ...
  4. main: I'm tired of waiting!
  5. job: I'm running finally
  6. main: Now I can quit.

运行不能取消的代码块

在前一个例子中任何尝试在 finally 块中调用挂起函数的行为都会抛出 CancellationException,因为这里持续运行的代码是可以被取消的。通常,这并不是一个问题,所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。然而,在真实的案例中,当你需要挂起一个被取消的协程,你可以将相应的代码包装在 withContext(NonCancellable) {……} 中,并使用 withContext 函数以及 NonCancellable 上下文,见如下示例所示:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val job = launch {
  5. try {
  6. repeat(1000) { i ->
  7. println("job: I'm sleeping $i ...")
  8. delay(500L)
  9. }
  10. } finally {
  11. withContext(NonCancellable) {
  12. println("job: I'm running finally")
  13. delay(1000L)
  14. println("job: And I've just delayed for 1 sec because I'm non-cancellable")
  15. }
  16. }
  17. }
  18. delay(1300L) // 延迟一段时间
  19. println("main: I'm tired of waiting!")
  20. job.cancelAndJoin() // 取消该作业并等待它结束
  21. println("main: Now I can quit.")
  22. //sampleEnd
  23. }

可以在这里获取完整代码。

取消与超时 - 图6

超时

在实践中绝大多数取消一个协程的理由是它有可能超时。 当你手动追踪一个相关 Job 的引用并启动了一个单独的协程在延迟后取消追踪,这里已经准备好使用 withTimeout 函数来做这件事。 来看看示例代码:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. withTimeout(1300L) {
  5. repeat(1000) { i ->
  6. println("I'm sleeping $i ...")
  7. delay(500L)
  8. }
  9. }
  10. //sampleEnd
  11. }

可以在这里获取完整代码。

取消与超时 - 图7

运行后得到如下输出:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

withTimeout 抛出了 TimeoutCancellationException,它是 CancellationException 的子类。 我们之前没有在控制台上看到堆栈跟踪信息的打印。这是因为在被取消的协程中 CancellationException 被认为是协程执行结束的正常原因。 然而,在这个示例中我们在 main 函数中正确地使用了 withTimeout

由于取消只是一个例外,所有的资源都使用常用的方法来关闭。 如果你需要做一些各类使用超时的特别的额外操作,可以使用类似 withTimeoutwithTimeoutOrNull 函数,并把这些会超时的代码包装在 try {...} catch (e: TimeoutCancellationException) {...} 代码块中,而 withTimeoutOrNull 通过返回 null 来进行超时操作,从而替代抛出一个异常:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking {
  3. //sampleStart
  4. val result = withTimeoutOrNull(1300L) {
  5. repeat(1000) { i ->
  6. println("I'm sleeping $i ...")
  7. delay(500L)
  8. }
  9. "Done" // 在它运行得到结果之前取消它
  10. }
  11. println("Result is $result")
  12. //sampleEnd
  13. }

可以在这里获取完整代码。

取消与超时 - 图8

运行这段代码时不再抛出异常:

  1. I'm sleeping 0 ...
  2. I'm sleeping 1 ...
  3. I'm sleeping 2 ...
  4. Result is null

Asynchronous timeout and resources

The timeout event in withTimeout is asynchronous with respect to the code running in its block and may happen at any time, even right before the return from inside of the timeout block. Keep this in mind if you open or acquire some resource inside the block that needs closing or release outside of the block.

For example, here we imitate a closeable resource with the Resource class that simply keeps track of how many times it was created by incrementing the acquired counter and decrementing the counter in its close function. Now let us create a lot of coroutines, each of which creates a Resource at the end of the withTimeout block and releases the resource outside the block. We add a small delay so that it is more likely that the timeout occurs right when the withTimeout block is already finished, which will cause a resource leak.

  1. import kotlinx.coroutines.*
  2. //sampleStart
  3. var acquired = 0
  4. class Resource {
  5. init { acquired++ } // Acquire the resource
  6. fun close() { acquired-- } // Release the resource
  7. }
  8. fun main() {
  9. runBlocking {
  10. repeat(10_000) { // Launch 10K coroutines
  11. launch {
  12. val resource = withTimeout(60) { // Timeout of 60 ms
  13. delay(50) // Delay for 50 ms
  14. Resource() // Acquire a resource and return it from withTimeout block
  15. }
  16. resource.close() // Release the resource
  17. }
  18. }
  19. }
  20. // Outside of runBlocking all coroutines have completed
  21. println(acquired) // Print the number of resources still acquired
  22. }
  23. //sampleEnd

You can get the full code here.

取消与超时 - 图9

If you run the above code, you’ll see that it does not always print zero, though it may depend on the timings of your machine. You may need to tweak the timeout in this example to actually see non-zero values.

Note that incrementing and decrementing acquired counter here from 10K coroutines is completely thread-safe, since it always happens from the same thread, the one used by runBlocking. More on that will be explained in the chapter on coroutine context.

取消与超时 - 图10

To work around this problem you can store a reference to the resource in a variable instead of returning it from the withTimeout block.

  1. import kotlinx.coroutines.*
  2. var acquired = 0
  3. class Resource {
  4. init { acquired++ } // Acquire the resource
  5. fun close() { acquired-- } // Release the resource
  6. }
  7. fun main() {
  8. //sampleStart
  9. runBlocking {
  10. repeat(10_000) { // Launch 10K coroutines
  11. launch {
  12. var resource: Resource? = null // Not acquired yet
  13. try {
  14. withTimeout(60) { // Timeout of 60 ms
  15. delay(50) // Delay for 50 ms
  16. resource = Resource() // Store a resource to the variable if acquired
  17. }
  18. // We can do something else with the resource here
  19. } finally {
  20. resource?.close() // Release the resource if it was acquired
  21. }
  22. }
  23. }
  24. }
  25. // Outside of runBlocking all coroutines have completed
  26. println(acquired) // Print the number of resources still acquired
  27. //sampleEnd
  28. }

You can get the full code here.

取消与超时 - 图11

This example always prints zero. Resources do not leak.