tokio::select!

The tokio::select! macro allows waiting on multiple async computations and returns when a single computation completes.

For example:

  1. use tokio::sync::oneshot;
  2. #[tokio::main]
  3. async fn main() {
  4. let (tx1, rx1) = oneshot::channel();
  5. let (tx2, rx2) = oneshot::channel();
  6. tokio::spawn(async {
  7. let _ = tx1.send("one");
  8. });
  9. tokio::spawn(async {
  10. let _ = tx2.send("two");
  11. });
  12. tokio::select! {
  13. val = rx1 => {
  14. println!("rx1 completed first with {:?}", val);
  15. }
  16. val = rx2 => {
  17. println!("rx2 completed first with {:?}", val);
  18. }
  19. }
  20. }

Two oneshot channels are used. Either channel could complete first. The select! statement awaits on both channels and binds val to the value returned by the task. When either tx1 or tx2 complete, the associated block is executed.

The branch that does not complete is dropped. In the example, the computation is awaiting the oneshot::Receiver for each channel. The oneshot::Receiver for the channel that did not complete yet is dropped.

Cancellation

With asynchronous Rust, cancellation is performed by dropping a future. Recall from “Async in depth”, async Rust operation are implemented using futures and futures are lazy. The operation only proceeds when the future is polled. If the future is dropped, the operation cannot proceed because all associated state has been dropped.

That said, sometimes an asynchronous operation will spawn background tasks or start other operation that run in the background. For example, in the above example, a task is spawned to send a message back. Usually, the task will perform some computation to generate the value.

Futures or other types can implement Drop to cleanup background resources. Tokio’s oneshot::Receiver implements Drop by sending a closed notification to the Sender half. The sender half can receive this notification and abort the in-progress operation by dropping it.

  1. use tokio::sync::oneshot;
  2. async fn some_operation() -> String {
  3. // Compute value here
  4. }
  5. #[tokio::main]
  6. async fn main() {
  7. let (mut tx1, rx1) = oneshot::channel();
  8. let (tx2, rx2) = oneshot::channel();
  9. tokio::spawn(async {
  10. // Select on the operation and the oneshot's
  11. // `closed()` notification.
  12. tokio::select! {
  13. val = some_operation() => {
  14. let _ = tx1.send(val);
  15. }
  16. _ = tx1.closed() => {
  17. // `some_operation()` is canceled, the
  18. // task completes and `tx1` is dropped.
  19. }
  20. }
  21. });
  22. tokio::spawn(async {
  23. let _ = tx2.send("two");
  24. });
  25. tokio::select! {
  26. val = rx1 => {
  27. println!("rx1 completed first with {:?}", val);
  28. }
  29. val = rx2 => {
  30. println!("rx2 completed first with {:?}", val);
  31. }
  32. }
  33. }

The Future implementation

To help better understand how select! works, lets look at a hypothetical Future implementation would look like. This is a simplified version. In practice, select! includes additional functionality like randomly selecting the branch to poll first.

  1. use tokio::sync::oneshot;
  2. use std::future::Future;
  3. use std::pin::Pin;
  4. use std::task::{Context, Poll};
  5. struct MySelect {
  6. rx1: oneshot::Receiver<&'static str>,
  7. rx2: oneshot::Receiver<&'static str>,
  8. }
  9. impl Future for MySelect {
  10. type Output = ();
  11. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
  12. if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
  13. println!("rx1 completed first with {:?}", val);
  14. return Poll::Ready(());
  15. }
  16. if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
  17. println!("rx2 completed first with {:?}", val);
  18. return Poll::Ready(());
  19. }
  20. Poll::Pending
  21. }
  22. }
  23. #[tokio::main]
  24. async fn main() {
  25. let (tx1, rx1) = oneshot::channel();
  26. let (tx2, rx2) = oneshot::channel();
  27. // use tx1 and tx2
  28. MySelect {
  29. rx1,
  30. rx2,
  31. }.await;
  32. }

The MySelect future contains the futures from each branch. When MySelect is polled, the first branch is polled. If it is ready, the value is used and MySelect completes. After .await receives the output from a future, the future is dropped. This results in the futures for both branches to be dropped. As one branch did not complete, the operation is effectively cancelled.

Remember from the previous section:

When a future returns Poll::Pending, it must ensure the waker is signalled at some point in the future. Forgetting to do this results in the task hanging indefinitely.

There is no explicit usage of the Context argument in the MySelect implementation. Instead, the waker requirement is met by passing cx to the inner futures. As the inner future must also meet the waker requirement, by only returning Poll::Pending when receiving Poll::Pending from an inner future, MySelect also meets the waker requirement.