9.10 通道

延迟对象提供了一种在协程之间传输单个值的方法。而通道(Channel)提供了一种传输数据流的方法。通道是使用 SendChannel 和使用 ReceiveChannel 之间的非阻塞通信。

9.10.1 通道 vs 阻塞队列

通道的概念类似于 阻塞队列(BlockingQueue)。在Java的Concurrent包中,BlockingQueue很好的解决了多线程中如何高效安全“传输”数据的问题。它有两个常用的方法如下:

  • E take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空, 阻塞进入等待状态直到BlockingQueue有新的数据被加入;

  • put(E e): 把对象 e 加到BlockingQueue里, 如果BlockQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续。

通道跟阻塞队列一个关键的区别是:通道有挂起的操作, 而不是阻塞的, 同时它可以关闭。

代码示例:

  1. package com.easy.kotlin
  2. import kotlinx.coroutines.experimental.CommonPool
  3. import kotlinx.coroutines.experimental.channels.Channel
  4. import kotlinx.coroutines.experimental.launch
  5. import kotlinx.coroutines.experimental.runBlocking
  6. class ChannelsDemo {
  7. fun testChannel() = runBlocking<Unit> {
  8. val channel = Channel<Int>()
  9. launch(CommonPool) {
  10. for (x in 1..10) channel.send(x * x)
  11. }
  12. println("channel = ${channel}")
  13. // here we print five received integers:
  14. repeat(10) { println(channel.receive()) }
  15. println("Done!")
  16. }
  17. }
  18. fun main(args: Array<String>) {
  19. val cd = ChannelsDemo()
  20. cd.testChannel()
  21. }

运行输出:

  1. channel = kotlinx.coroutines.experimental.channels.RendezvousChannel@2e817b38
  2. 1
  3. 4
  4. 9
  5. 16
  6. 25
  7. 36
  8. 49
  9. 64
  10. 81
  11. 100
  12. Done!

我们可以看出使用Channel<Int>()背后调用的是会合通道RendezvousChannel(),会合通道中没有任何缓冲区。send函数被挂起直到另外一个协程调用receive函数, 然后receive函数挂起直到另外一个协程调用send函数。它是一个完全无锁的实现。

9.10.2 关闭通道和迭代遍历元素

与队列不同, 通道可以关闭, 以指示没有更多的元素。在接收端, 可以使用 for 循环从通道接收元素。代码示例:

  1. fun testClosingAndIterationChannels() = runBlocking {
  2. val channel = Channel<Int>()
  3. launch(CommonPool) {
  4. for (x in 1..5) channel.send(x * x)
  5. channel.close() // 我们结束 sending
  6. }
  7. // 打印通道中的值,直到通道关闭
  8. for (x in channel) println(x)
  9. println("Done!")
  10. }

其中, close函数在这个通道上发送一个特殊的 “关闭令牌”。这是一个幂等运算:对此函数的重复调用不起作用, 并返回 “false”。此函数执行后,isClosedForSend返回 “true”。但是, ReceiveChannelisClosedForReceive在所有之前发送的元素收到之后才返回 “true”。

我们把上面的代码加入打印日志:

  1. fun testClosingAndIterationChannels() = runBlocking {
  2. val channel = Channel<Int>()
  3. launch(CommonPool) {
  4. for (x in 1..5) {
  5. channel.send(x * x)
  6. }
  7. println("Before Close => isClosedForSend = ${channel.isClosedForSend}")
  8. channel.close() // 我们结束 sending
  9. println("After Close => isClosedForSend = ${channel.isClosedForSend}")
  10. }
  11. // 打印通道中的值,直到通道关闭
  12. for (x in channel) {
  13. println("${x} => isClosedForReceive = ${channel.isClosedForReceive}")
  14. }
  15. println("Done! => isClosedForReceive = ${channel.isClosedForReceive}")
  16. }

运行输出:

  1. 1 => isClosedForReceive = false
  2. 4 => isClosedForReceive = false
  3. 9 => isClosedForReceive = false
  4. 16 => isClosedForReceive = false
  5. 25 => isClosedForReceive = false
  6. Before Close => isClosedForSend = false
  7. After Close => isClosedForSend = true
  8. Done! => isClosedForReceive = true

9.10.3 生产者-消费者模式

使用协程生成元素序列的模式非常常见。这是在并发代码中经常有的生产者-消费者模式。代码示例:

  1. fun produceSquares() = produce<Int>(CommonPool) {
  2. for (x in 1..7) send(x * x)
  3. }
  4. fun consumeSquares() = runBlocking{
  5. val squares = produceSquares()
  6. squares.consumeEach { println(it) }
  7. println("Done!")
  8. }

这里的produce函数定义如下:

  1. public fun <E> produce(
  2. context: CoroutineContext,
  3. capacity: Int = 0,
  4. block: suspend ProducerScope<E>.() -> Unit
  5. ): ProducerJob<E> {
  6. val channel = Channel<E>(capacity)
  7. return ProducerCoroutine(newCoroutineContext(context), channel).apply {
  8. initParentJob(context[Job])
  9. block.startCoroutine(this, this)
  10. }
  11. }

其中,参数说明如下:

参数名 说明
context 协程上下文
capacity 通道缓存容量大小 (默认没有缓存)
block 协程代码块

produce函数会启动一个新的协程, 协程中发送数据到通道来生成数据流,并以 ProducerJob 对象返回对协程的引用。ProducerJob继承了Job, ReceiveChannel类型。