Loops

The select! macro is often used in loops. This section will go over some examples to show common ways of using the select! macro in a loop. We start by selecting over multiple channels:

  1. use tokio::sync::mpsc;
  2. #[tokio::main]
  3. async fn main() {
  4. let (tx1, mut rx1) = mpsc::channel(128);
  5. let (tx2, mut rx2) = mpsc::channel(128);
  6. let (tx3, mut rx3) = mpsc::channel(128);
  7. loop {
  8. let msg = tokio::select! {
  9. Some(msg) = rx1.recv() => msg,
  10. Some(msg) = rx2.recv() => msg,
  11. Some(msg) = rx3.recv() => msg,
  12. else => { break }
  13. };
  14. println!("Got {}", msg);
  15. }
  16. println!("All channels have been closed.");
  17. }

This example selects over the three channel receivers. When a message is received on any channel, it is written to STDOUT. When a channel is closed, recv() returns with None. By using pattern matching, the select! macro continues waiting on the remaining channels. When all channels are closed, the else branch is evaluated and the loop is terminated.

The select! macro randomly picks branches to check first for readiness. When multiple channels have pending values, a random channel will be picked to receive from. This is to handle the case where the receive loop processes messages slower than they are pushed into the channels, meaning that the channels start to fill up. If select! did not randomly pick a branch to check first, on each iteration of the loop, rx1 would be checked first. If rx1 always contained a new message, the remaining channels would never be checked.

If when select! is evaluated, multiple channels have pending messages, only one channel has a value popped. All other channels remain untouched, and their messages stay in those channels until the next loop iteration. No messages are lost.

Resuming an async operation

Now we will show how to run an asynchronous operation across multiple calls to select!. In this example, we have an MPSC channel with item type i32, and an asynchronous function. We want to run the asynchronous function until it completes or an even integer is received on the channel.

  1. async fn action() {
  2. // Some asynchronous logic
  3. }
  4. #[tokio::main]
  5. async fn main() {
  6. let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
  7. let operation = action();
  8. tokio::pin!(operation);
  9. loop {
  10. tokio::select! {
  11. _ = &mut operation => break,
  12. Some(v) = rx.recv() => {
  13. if v % 2 == 0 {
  14. break;
  15. }
  16. }
  17. }
  18. }
  19. }

Note how, instead of calling action() in the select! macro, it is called outside the loop. The return of action() is assigned to operation without calling .await. Then we call tokio::pin! on operation.

Inside the select! loop, instead of passing in operation, we pass in &mut operation. The operation variable is tracking the in-flight asynchronous operation. Each iteration of the loop uses the same operation instead of issuing a new call to action().

The other select! branch receives a message from the channel. If the message is even, we are done looping. Otherwise, start the select! again.

This is the first time we use tokio::pin!. We aren’t going to get into the details of pinning yet. The thing to note is that, to .await a reference, the value being referenced must be pinned or implement Unpin.

If we remove the tokio::pin! line and try to compile, we get the following error:

  1. error[E0599]: no method named `poll` found for struct
  2. `std::pin::Pin<&mut &mut impl std::future::Future>`
  3. in the current scope
  4. --> src/main.rs:16:9
  5. |
  6. 16 | / tokio::select! {
  7. 17 | | _ = &mut operation => break,
  8. 18 | | Some(v) = rx.recv() => {
  9. 19 | | if v % 2 == 0 {
  10. ... |
  11. 22 | | }
  12. 23 | | }
  13. | |_________^ method not found in
  14. | `std::pin::Pin<&mut &mut impl std::future::Future>`
  15. |
  16. = note: the method `poll` exists but the following trait bounds
  17. were not satisfied:
  18. `impl std::future::Future: std::marker::Unpin`
  19. which is required by
  20. `&mut impl std::future::Future: std::future::Future`

This error isn’t very clear and we haven’t talked much about Future yet either. For now, think of Future as the trait that must be implemented by a value in order to call .await on it. If you hit an error about Future not being implemented when attempting to call .await on a reference, then the future probably needs to be pinned.

Read more about Pin on the standard library.

Modifying a branch

Let’s look at a slightly more complicated loop. We have:

  1. A channel of i32 values.
  2. An async operation to perform on i32 values.

The logic we want to implement is:

  1. Wait for an even number on the channel.
  2. Start the asynchronous operation using the even number as input.
  3. Wait for the operation, but at the same time listen for more even numbers on the channel.
  4. If a new even number is received before the existing operation completes, abort the existing operation and start it over with the new even number.
  1. async fn action(input: Option<i32>) -> Option<String> {
  2. // If the input is `None`, return `None`.
  3. // This could also be written as `let i = input?;`
  4. let i = match input {
  5. Some(input) => input,
  6. None => return None,
  7. };
  8. // async logic here
  9. }
  10. #[tokio::main]
  11. async fn main() {
  12. let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
  13. let mut done = false;
  14. let operation = action(None);
  15. tokio::pin!(operation);
  16. tokio::spawn(async move {
  17. let _ = tx.send(1).await;
  18. let _ = tx.send(3).await;
  19. let _ = tx.send(2).await;
  20. });
  21. loop {
  22. tokio::select! {
  23. res = &mut operation, if !done => {
  24. done = true;
  25. if let Some(v) = res {
  26. println!("GOT = {}", v);
  27. return;
  28. }
  29. }
  30. Some(v) = rx.recv() => {
  31. if v % 2 == 0 {
  32. // `.set` is a method on `Pin`.
  33. operation.set(action(Some(v)));
  34. done = false;
  35. }
  36. }
  37. }
  38. }
  39. }

We use a similar strategy as the previous example. The async fn is called outside of the loop and assigned to operation. The operation variable is pinned. The loop selects on both operation and the channel receiver.

Notice how action takes Option<i32> as an argument. Before we receive the first even number, we need to instantiate operation to something. We make action take Option and return Option. If None is passed in, None is returned. The first loop iteration, operation completes immediately with None.

This example uses some new syntax. The first branch includes , if !done. This is a branch precondition. Before explaining how it works, lets look at what happens if the precondition is omitted. Leaving out , if !done and running the example results in the following output:

  1. thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
  2. note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

This error happens when attempting to use operation after it has already completed. Usually, when using .await, the value being awaited is consumed. In this example, we await on a reference. This means operation is still around after it has completed.

To avoid this panic, we must take care to disable the first branch if operation has completed. The done variable is used to track whether or not operation completed. A select! branch may include a precondition. This precondition is checked before select! awaits on the branch. If the condition evaluates to false then the branch is disabled. The done variable is initialized to false. When operation completes, done is set to true. The next loop iteration will disable the operation branch. When an even message is received from the channel, operation is reset and done is set to false.