9.11 管道

9.11.1 生产无限序列

管道(Pipeline)是一种模式, 我们可以用一个协程生产无限序列:

  1. fun produceNumbers() = produce<Long>(CommonPool) {
  2. var x = 1L
  3. while (true) send(x++) // infinite stream of integers starting from 1
  4. }

我们的消费序列的函数如下:

  1. fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
  2. for (x in numbers) send(x * x)
  3. }

主代码启动并连接整个管线:

  1. fun testPipeline() = runBlocking {
  2. val numbers = produceNumbers() // produces integers from 1 and on
  3. val squares = consumeNumbers(numbers) // squares integers
  4. //for (i in 1..6) println(squares.receive())
  5. while (true) {
  6. println(squares.receive())
  7. }
  8. println("Done!")
  9. squares.cancel()
  10. numbers.cancel()
  11. }

运行上面的代码,我们将会发现控制台在打印一个无限序列,完全没有停止的意思。

9.11.2 管道与无穷质数序列

我们使用协程管道来生成一个无穷质数序列。

我们从无穷大的自然数序列开始:

  1. fun numbersProducer(context: CoroutineContext, start: Int) = produce<Int>(context) {
  2. var n = start
  3. while (true) send(n++) // infinite stream of integers from start
  4. }

这次我们引入一个显式上下文参数context, 以便调用方可以控制我们的协程运行的位置。

下面的管道将筛选传入的数字流, 过滤掉可以被当前质数整除的所有数字:

  1. fun filterPrimes(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
  2. for (x in numbers) if (x % prime != 0) send(x)
  3. }

现在我们通过从2开始, 从当前通道中取一个质数, 并为找到的每个质数启动新的管道阶段, 从而构建出我们的管道:

  1. numbersFrom(2) -> filterPrimes(2) -> filterPrimes(3) -> filterPrimes(5) -> filterPrimes(7) ...

测试无穷质数序列:

  1. fun producePrimesSequences() = runBlocking {
  2. var producerJob = numbersProducer(context, 2)
  3. while (true) {
  4. val prime = producerJob.receive()
  5. print("${prime} \t")
  6. producerJob = filterPrimes(context, producerJob, prime)
  7. }
  8. }

运行上面的代码,我们将会看到控制台一直在无限打印出质数序列:

Kotlin极简教程

9.11.3 通道缓冲区

我们可以给通道设置一个缓冲区:

  1. fun main(args: Array<String>) = runBlocking<Unit> {
  2. val channel = Channel<Int>(4) // 创建一个缓冲区容量为4的通道
  3. launch(context) {
  4. repeat(10) {
  5. println("Sending $it")
  6. channel.send(it) // 当缓冲区已满的时候, send将会挂起
  7. }
  8. }
  9. delay(1000)
  10. }

输出:

  1. Sending 0
  2. Sending 1
  3. Sending 2
  4. Sending 3
  5. Sending 4