Table of contents

Channels

Deferred values provide a convenient way to transfer a single value between coroutines. Channels provide a way to transfer a stream of values.

Channel basics

A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking take operation it has a suspending receive.

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking {
  4. //sampleStart
  5. val channel = Channel<Int>()
  6. launch {
  7. // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
  8. for (x in 1..5) channel.send(x * x)
  9. }
  10. // here we print five received integers:
  11. repeat(5) { println(channel.receive()) }
  12. println("Done!")
  13. //sampleEnd
  14. }

You can get the full code here.

The output of this code is:

  1. 1
  2. 4
  3. 9
  4. 16
  5. 25
  6. Done!

Closing and iteration over channels

Unlike a queue, a channel can be closed to indicate that no more elements are coming. On the receiver side it is convenient to use a regular for loop to receive elements from the channel.

Conceptually, a close is like sending a special close token to the channel. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking {
  4. //sampleStart
  5. val channel = Channel<Int>()
  6. launch {
  7. for (x in 1..5) channel.send(x * x)
  8. channel.close() // we're done sending
  9. }
  10. // here we print received values using `for` loop (until the channel is closed)
  11. for (y in channel) println(y)
  12. println("Done!")
  13. //sampleEnd
  14. }

You can get the full code here.

Building channel producers

The pattern where a coroutine is producing a sequence of elements is quite common. This is a part of producer-consumer pattern that is often found in concurrent code. You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary to common sense that results must be returned from functions.

There is a convenient coroutine builder named produce that makes it easy to do it right on producer side, and an extension function consumeEach, that replaces a for loop on the consumer side:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
  4. for (x in 1..5) send(x * x)
  5. }
  6. fun main() = runBlocking {
  7. //sampleStart
  8. val squares = produceSquares()
  9. squares.consumeEach { println(it) }
  10. println("Done!")
  11. //sampleEnd
  12. }

You can get the full code here.

Pipelines

A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:

  1. fun CoroutineScope.produceNumbers() = produce<Int> {
  2. var x = 1
  3. while (true) send(x++) // infinite stream of integers starting from 1
  4. }

And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. In the example below, the numbers are just squared:

  1. fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
  2. for (x in numbers) send(x * x)
  3. }

The main code starts and connects the whole pipeline:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking {
  4. //sampleStart
  5. val numbers = produceNumbers() // produces integers from 1 and on
  6. val squares = square(numbers) // squares integers
  7. repeat(5) {
  8. println(squares.receive()) // print first five
  9. }
  10. println("Done!") // we are done
  11. coroutineContext.cancelChildren() // cancel children coroutines
  12. //sampleEnd
  13. }
  14. fun CoroutineScope.produceNumbers() = produce<Int> {
  15. var x = 1
  16. while (true) send(x++) // infinite stream of integers starting from 1
  17. }
  18. fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
  19. for (x in numbers) send(x * x)
  20. }

You can get the full code here.

All functions that create coroutines are defined as extensions on CoroutineScope, so that we can rely on structured concurrency to make sure that we don’t have lingering global coroutines in our application.

Prime numbers with pipeline

Let’s take pipelines to the extreme with an example that generates prime numbers using a pipeline of coroutines. We start with an infinite sequence of numbers.

  1. fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
  2. var x = start
  3. while (true) send(x++) // infinite stream of integers from start
  4. }

The following pipeline stage filters an incoming stream of numbers, removing all the numbers that are divisible by the given prime number:

  1. fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
  2. for (x in numbers) if (x % prime != 0) send(x)
  3. }

Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, and launching new pipeline stage for each prime number found:

  1. numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

The following example prints the first ten prime numbers, running the whole pipeline in the context of the main thread. Since all the coroutines are launched in the scope of the main runBlocking coroutine we don’t have to keep an explicit list of all the coroutines we have started. We use cancelChildren extension function to cancel all the children coroutines after we have printed the first ten prime numbers.

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking {
  4. //sampleStart
  5. var cur = numbersFrom(2)
  6. repeat(10) {
  7. val prime = cur.receive()
  8. println(prime)
  9. cur = filter(cur, prime)
  10. }
  11. coroutineContext.cancelChildren() // cancel all children to let main finish
  12. //sampleEnd
  13. }
  14. fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
  15. var x = start
  16. while (true) send(x++) // infinite stream of integers from start
  17. }
  18. fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
  19. for (x in numbers) if (x % prime != 0) send(x)
  20. }

You can get the full code here.

The output of this code is:

  1. 2
  2. 3
  3. 5
  4. 7
  5. 11
  6. 13
  7. 17
  8. 19
  9. 23
  10. 29

Note that you can build the same pipeline using iterator coroutine builder from the standard library. Replace produce with iterator, send with yield, receive with next, ReceiveChannel with Iterator, and get rid of the coroutine scope. You will not need runBlocking either. However, the benefit of a pipeline that uses channels as shown above is that it can actually use multiple CPU cores if you run it in Dispatchers.Default context.

Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be built using sequence/iterator, because they do not allow arbitrary suspension, unlike produce, which is fully asynchronous.

Fan-out

Multiple coroutines may receive from the same channel, distributing work between themselves. Let us start with a producer coroutine that is periodically producing integers (ten numbers per second):

  1. fun CoroutineScope.produceNumbers() = produce<Int> {
  2. var x = 1 // start from 1
  3. while (true) {
  4. send(x++) // produce next
  5. delay(100) // wait 0.1s
  6. }
  7. }

Then we can have several processor coroutines. In this example, they just print their id and received number:

  1. fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
  2. for (msg in channel) {
  3. println("Processor #$id received $msg")
  4. }
  5. }

Now let us launch five processors and let them work for almost a second. See what happens:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking<Unit> {
  4. //sampleStart
  5. val producer = produceNumbers()
  6. repeat(5) { launchProcessor(it, producer) }
  7. delay(950)
  8. producer.cancel() // cancel producer coroutine and thus kill them all
  9. //sampleEnd
  10. }
  11. fun CoroutineScope.produceNumbers() = produce<Int> {
  12. var x = 1 // start from 1
  13. while (true) {
  14. send(x++) // produce next
  15. delay(100) // wait 0.1s
  16. }
  17. }
  18. fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
  19. for (msg in channel) {
  20. println("Processor #$id received $msg")
  21. }
  22. }

You can get the full code here.

The output will be similar to the the following one, albeit the processor ids that receive each specific integer may be different:

  1. Processor #2 received 1
  2. Processor #4 received 2
  3. Processor #0 received 3
  4. Processor #1 received 4
  5. Processor #3 received 5
  6. Processor #2 received 6
  7. Processor #4 received 7
  8. Processor #0 received 8
  9. Processor #1 received 9
  10. Processor #3 received 10

Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration over the channel that processor coroutines are doing.

Also, pay attention to how we explicitly iterate over channel with for loop to perform fan-out in launchProcessor code. Unlike consumeEach, this for loop pattern is perfectly safe to use from multiple coroutines. If one of the processor coroutines fails, then others would still be processing the channel, while a processor that is written via consumeEach always consumes (cancels) the underlying channel on its normal or abnormal completion.

Fan-in

Multiple coroutines may send to the same channel. For example, let us have a channel of strings, and a suspending function that repeatedly sends a specified string to this channel with a specified delay:

  1. suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
  2. while (true) {
  3. delay(time)
  4. channel.send(s)
  5. }
  6. }

Now, let us see what happens if we launch a couple of coroutines sending strings (in this example we launch them in the context of the main thread as main coroutine’s children):

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking {
  4. //sampleStart
  5. val channel = Channel<String>()
  6. launch { sendString(channel, "foo", 200L) }
  7. launch { sendString(channel, "BAR!", 500L) }
  8. repeat(6) { // receive first six
  9. println(channel.receive())
  10. }
  11. coroutineContext.cancelChildren() // cancel all children to let main finish
  12. //sampleEnd
  13. }
  14. suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
  15. while (true) {
  16. delay(time)
  17. channel.send(s)
  18. }
  19. }

You can get the full code here.

The output is:

  1. foo
  2. foo
  3. BAR!
  4. foo
  5. foo
  6. BAR!

Buffered channels

The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.

Both Channel() factory function and produce builder take an optional capacity parameter to specify buffer size. Buffer allows senders to send multiple elements before suspending, similar to the BlockingQueue with a specified capacity, which blocks when buffer is full.

Take a look at the behavior of the following code:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking<Unit> {
  4. //sampleStart
  5. val channel = Channel<Int>(4) // create buffered channel
  6. val sender = launch { // launch sender coroutine
  7. repeat(10) {
  8. println("Sending $it") // print before sending each element
  9. channel.send(it) // will suspend when buffer is full
  10. }
  11. }
  12. // don't receive anything... just wait....
  13. delay(1000)
  14. sender.cancel() // cancel sender coroutine
  15. //sampleEnd
  16. }

You can get the full code here.

It prints “sending” five times using a buffered channel with capacity of four:

  1. Sending 0
  2. Sending 1
  3. Sending 2
  4. Sending 3
  5. Sending 4

The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.

Channels are fair

Send and receive operations to channels are fair with respect to the order of their invocation from multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke receive gets the element. In the following example two coroutines “ping” and “pong” are receiving the “ball” object from the shared “table” channel.

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. //sampleStart
  4. data class Ball(var hits: Int)
  5. fun main() = runBlocking {
  6. val table = Channel<Ball>() // a shared table
  7. launch { player("ping", table) }
  8. launch { player("pong", table) }
  9. table.send(Ball(0)) // serve the ball
  10. delay(1000) // delay 1 second
  11. coroutineContext.cancelChildren() // game over, cancel them
  12. }
  13. suspend fun player(name: String, table: Channel<Ball>) {
  14. for (ball in table) { // receive the ball in a loop
  15. ball.hits++
  16. println("$name $ball")
  17. delay(300) // wait a bit
  18. table.send(ball) // send the ball back
  19. }
  20. }
  21. //sampleEnd

You can get the full code here.

The “ping” coroutine is started first, so it is the first one to receive the ball. Even though “ping” coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets received by the “pong” coroutine, because it was already waiting for it:

  1. ping Ball(hits=1)
  2. pong Ball(hits=2)
  3. ping Ball(hits=3)
  4. pong Ball(hits=4)

Note that sometimes channels may produce executions that look unfair due to the nature of the executor that is being used. See this issue for details.

Ticker channels

Ticker channel is a special rendezvous channel that produces Unit every time given delay passes since last consumption from this channel. Though it may seem to be useless standalone, it is a useful building block to create complex time-based produce pipelines and operators that do windowing and other time-dependent processing. Ticker channel can be used in select to perform “on tick” action.

To create such channel use a factory method ticker. To indicate that no further elements are needed use ReceiveChannel.cancel method on it.

Now let’s see how it works in practice:

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.*
  3. fun main() = runBlocking<Unit> {
  4. val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
  5. var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
  6. println("Initial element is available immediately: $nextElement") // no initial delay
  7. nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
  8. println("Next element is not ready in 50 ms: $nextElement")
  9. nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
  10. println("Next element is ready in 100 ms: $nextElement")
  11. // Emulate large consumption delays
  12. println("Consumer pauses for 150ms")
  13. delay(150)
  14. // Next element is available immediately
  15. nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
  16. println("Next element is available immediately after large consumer delay: $nextElement")
  17. // Note that the pause between `receive` calls is taken into account and next element arrives faster
  18. nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
  19. println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
  20. tickerChannel.cancel() // indicate that no more elements are needed
  21. }

You can get the full code here.

It prints following lines:

  1. Initial element is available immediately: kotlin.Unit
  2. Next element is not ready in 50 ms: null
  3. Next element is ready in 100 ms: kotlin.Unit
  4. Consumer pauses for 150ms
  5. Next element is available immediately after large consumer delay: kotlin.Unit
  6. Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

Note that ticker is aware of possible consumer pauses and, by default, adjusts next produced element delay if a pause occurs, trying to maintain a fixed rate of produced elements.

Optionally, a mode parameter equal to TickerMode.FIXED_DELAY can be specified to maintain a fixed delay between elements.