9.1 协程简介

从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升。如果将程序分为IO密集型应用和CPU密集型应用,二者的发展历程大致如下:

IO密集型应用: 多进程->多线程->事件驱动->协程
CPU密集型应用:多进程->多线程

如果说多进程对于多CPU,多线程对应多核CPU,那么事件驱动和协程则是在充分挖掘不断提高性能的单核CPU的潜力。

常见的有性能瓶颈的API (例如网络 IO、文件 IO、CPU 或 GPU 密集型任务等),要求调用者阻塞(blocking)直到它们完成才能进行下一步。后来,我们又使用异步回调的方式来实现非阻塞,但是异步回调代码写起来并不简单。

协程提供了一种避免阻塞线程并用更简单、更可控的操作替代线程阻塞的方法:协程挂起。

协程主要是让原来要使用“异步+回调方式”写出来的复杂代码, 简化成可以用看似同步的方式写出来(对线程的操作进一步抽象)。这样我们就可以按串行的思维模型去组织原本分散在不同上下文中的代码逻辑,而不需要去处理复杂的状态同步问题。

协程最早的描述是由Melvin Conway于1958年给出:“subroutines who act as the master program”(与主程序行为类似的子例程)。此后他又在博士论文中给出了如下定义:

数据在后续调用中始终保持( The values of data local to a coroutine persist between successive calls 协程的局部)

当控制流程离开时,协程的执行被挂起,此后控制流程再次进入这个协程时,这个协程只应从上次离开挂起的地方继续 (The execution of a coroutine is suspended as control leaves it, only to carry on where it left off when control re-enters the coroutine at some later stage)。

协程的实现要维护一组局部状态,在重新进入协程前,保证这些状态不被改变,从而能顺利定位到之前的位置。

协程可以用来解决很多问题,比如nodejs的嵌套回调,Erlang以及Golang的并发模型实现等。

实质上,协程(coroutine)是一种用户态的轻量级线程。它由协程构建器(launch coroutine builder)启动。

下面我们通过代码实践来学习协程的相关内容。

9.1.1 搭建协程代码工程

首先,我们来新建一个Kotlin Gradle工程。生成标准gradle工程后,在配置文件build.gradle中,配置kotlinx-coroutines-core依赖:

添加 dependencies :

  1. compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.16'

kotlinx-coroutines还提供了下面的模块:

  1. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-jdk8', version: '0.16'
  2. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-nio', version: '0.16'
  3. compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-reactive', version: '0.16'

我们使用Kotlin最新的1.1.3-2 版本:

  1. buildscript {
  2. ext.kotlin_version = '1.1.3-2'
  3. ...
  4. dependencies {
  5. classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
  6. }
  7. }

其中,kotlin-gradle-plugin是Kotlin集成Gradle的插件。

另外,配置一下JCenter 的仓库:

  1. repositories {
  2. jcenter()
  3. }

9.1.2 简单协程示例

下面我们先来看一个简单的协程示例。

运行下面的代码:

  1. fun firstCoroutineDemo0() {
  2. launch(CommonPool) {
  3. delay(3000L, TimeUnit.MILLISECONDS)
  4. println("Hello,")
  5. }
  6. println("World!")
  7. Thread.sleep(5000L)
  8. }

你将会发现输出:

  1. World!
  2. Hello,

上面的这段代码:

  1. launch(CommonPool) {
  2. delay(3000L, TimeUnit.MILLISECONDS)
  3. println("Hello,")
  4. }

等价于:

  1. launch(CommonPool, CoroutineStart.DEFAULT, {
  2. delay(3000L, TimeUnit.MILLISECONDS)
  3. println("Hello, ")
  4. })

9.1.3 launch函数

这个launch函数定义在kotlinx.coroutines.experimental下面。

  1. public fun launch(
  2. context: CoroutineContext,
  3. start: CoroutineStart = CoroutineStart.DEFAULT,
  4. block: suspend CoroutineScope.() -> Unit
  5. ): Job {
  6. val newContext = newCoroutineContext(context)
  7. val coroutine = if (start.isLazy)
  8. LazyStandaloneCoroutine(newContext, block) else
  9. StandaloneCoroutine(newContext, active = true)
  10. coroutine.initParentJob(context[Job])
  11. start(block, coroutine, coroutine)
  12. return coroutine
  13. }

launch函数有3个入参:context、start、block,这些函数参数分别说明如下:

参数 说明
context 协程上下文
start 协程启动选项
block 协程真正要执行的代码块,必须是suspend修饰的挂起函数

这个launch函数返回一个Job类型,Job是协程创建的后台任务的概念,它持有该协程的引用。Job接口实际上继承自CoroutineContext类型。一个Job有如下三种状态:

State isActive isCompleted
New (optional initial state) 新建 (可选的初始状态) false false
Active (default initial state) 活动中(默认初始状态) true false
Completed (final state) 已结束(最终状态) false true

也就是说,launch函数它以非阻塞(non-blocking)当前线程的方式,启动一个新的协程后台任务,并返回一个Job类型的对象作为当前协程的引用。

另外,这里的delay()函数类似Thread.sleep()的功能,但更好的是:它不会阻塞线程,而只是挂起协程本身。当协程在等待时,线程将返回到池中, 当等待完成时, 协程将在池中的空闲线程上恢复。

9.1.4 CommonPool:共享线程池

我们再来看一下launch(CommonPool) {...}这段代码。

首先,这个CommonPool是代表共享线程池,它的主要作用是用来调度计算密集型任务的协程的执行。它的实现使用的是java.util.concurrent包下面的API。它首先尝试创建一个java.util.concurrent.ForkJoinPool(ForkJoinPool是一个可以执行ForkJoinTask的ExcuteService,它采用了work-stealing模式:所有在池中的线程尝试去执行其他线程创建的子任务,这样很少有线程处于空闲状态,更加高效);如果不可用,就使用java.util.concurrent.Executors来创建一个普通的线程池:Executors.newFixedThreadPool。相关代码在kotlinx/coroutines/experimental/CommonPool.kt中:

  1. private fun createPool(): ExecutorService {
  2. val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
  3. ?: return createPlainPool()
  4. if (!usePrivatePool) {
  5. Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
  6. ?.let { return it }
  7. }
  8. Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
  9. ?. let { return it }
  10. return createPlainPool()
  11. }
  12. private fun createPlainPool(): ExecutorService {
  13. val threadId = AtomicInteger()
  14. return Executors.newFixedThreadPool(defaultParallelism()) {
  15. Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
  16. }
  17. }

这个CommonPool对象类是CoroutineContext的子类型。它们的类型集成层次结构如下:

Kotlin极简教程

9.1.5 挂起函数

代码块中的delay(3000L, TimeUnit.MILLISECONDS)函数,是一个用suspend关键字修饰的函数,我们称之为挂起函数。挂起函数只能从协程代码内部调用,普通的非协程的代码不能调用。

挂起函数只允许由协程或者另外一个挂起函数里面调用, 例如我们在协程代码中调用一个挂起函数,代码示例如下:

  1. suspend fun runCoroutineDemo() {
  2. run(CommonPool) {
  3. delay(3000L, TimeUnit.MILLISECONDS)
  4. println("suspend,")
  5. }
  6. println("runCoroutineDemo!")
  7. Thread.sleep(5000L)
  8. }
  9. fun callSuspendFun() {
  10. launch(CommonPool) {
  11. runCoroutineDemo()
  12. }
  13. }

如果我们用Java中的Thread类来写类似功能的代码,上面的代码可以写成这样:

  1. fun threadDemo0() {
  2. Thread({
  3. Thread.sleep(3000L)
  4. println("Hello,")
  5. }).start()
  6. println("World!")
  7. Thread.sleep(5000L)
  8. }

输出结果也是:

  1. World!
  2. Hello,

另外, 我们不能使用Thread来启动协程代码。例如下面的写法编译器会报错:

  1. /**
  2. * 错误反例:用线程调用协程 error
  3. */
  4. fun threadCoroutineDemo() {
  5. Thread({
  6. delay(3000L, TimeUnit.MILLISECONDS) // error, Suspend functions are only allowed to be called from a coroutine or another suspend function
  7. println("Hello,")
  8. })
  9. println("World!")
  10. Thread.sleep(5000L)
  11. }