A few loose ends

Recall when we were implementing the Delay future, we said there were a few more things to fix. Rust’s asynchronous model allows a single future to migrate across tasks while it executes. Consider the following:

  1. use futures::future::poll_fn;
  2. use std::future::Future;
  3. use std::pin::Pin;
  4. #[tokio::main]
  5. async fn main() {
  6. let when = Instant::now() + Duration::from_millis(10);
  7. let mut delay = Some(Delay { when });
  8. poll_fn(move |cx| {
  9. let mut delay = delay.take().unwrap();
  10. let res = Pin::new(&mut delay).poll(cx);
  11. assert!(res.is_pending());
  12. tokio::spawn(async move {
  13. delay.await;
  14. });
  15. Poll::Ready(())
  16. }).await;
  17. }

The poll_fn function creates a Future instance using a closure. The snippet above creates a Delay instance, polls it once, then sends the Delay instance to a new task where it is awaited. In this example, Delay::poll is called more than once with different Waker instances. Our earlier implementation did not handle this case and the spawned task would sleep forever, as the wrong task is notified.

When implementing a future, it is critical to assume that each call to poll could supply a different Waker instance. The poll function must update any previously recorded waker with the new one.

To fix our earlier implementation, we could do something like this:

  1. use std::future::Future;
  2. use std::pin::Pin;
  3. use std::sync::{Arc, Mutex};
  4. use std::task::{Context, Poll, Waker};
  5. use std::thread;
  6. use std::time::{Duration, Instant};
  7. struct Delay {
  8. when: Instant,
  9. waker: Option<Arc<Mutex<Waker>>>,
  10. }
  11. impl Future for Delay {
  12. type Output = ();
  13. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
  14. // First, if this is the first time the future is called, spawn the
  15. // timer thread. If the timer thread is already running, ensure the
  16. // stored `Waker` matches the current task's waker.
  17. if let Some(waker) = &self.waker {
  18. let mut waker = waker.lock().unwrap();
  19. // Check if the stored waker matches the current task's waker.
  20. // This is necessary as the `Delay` future instance may move to
  21. // a different task between calls to `poll`. If this happens, the
  22. // waker contained by the given `Context` will differ and we
  23. // must update our stored waker to reflect this change.
  24. if !waker.will_wake(cx.waker()) {
  25. *waker = cx.waker().clone();
  26. }
  27. } else {
  28. let when = self.when;
  29. let waker = Arc::new(Mutex::new(cx.waker().clone()));
  30. self.waker = Some(waker.clone());
  31. // This is the first time `poll` is called, spawn the timer thread.
  32. thread::spawn(move || {
  33. let now = Instant::now();
  34. if now < when {
  35. thread::sleep(when - now);
  36. }
  37. // The duration has elapsed. Notify the caller by invoking
  38. // the waker.
  39. let waker = waker.lock().unwrap();
  40. waker.wake_by_ref();
  41. });
  42. }
  43. // Once the waker is stored and the timer thread is started, it is
  44. // time to check if the delay has completed. This is done by
  45. // checking the current instant. If the duration has elapsed, then
  46. // the future has completed and `Poll::Ready` is returned.
  47. if Instant::now() >= self.when {
  48. Poll::Ready(())
  49. } else {
  50. // The duration has not elapsed, the future has not completed so
  51. // return `Poll::Pending`.
  52. //
  53. // The `Future` trait contract requires that when `Pending` is
  54. // returned, the future ensures that the given waker is signalled
  55. // once the future should be polled again. In our case, by
  56. // returning `Pending` here, we are promising that we will
  57. // invoke the given waker included in the `Context` argument
  58. // once the requested duration has elapsed. We ensure this by
  59. // spawning the timer thread above.
  60. //
  61. // If we forget to invoke the waker, the task will hang
  62. // indefinitely.
  63. Poll::Pending
  64. }
  65. }
  66. }

It is a bit involved, but the idea is, on each call to poll, the future checks if the supplied waker matches the previously recorded waker. If the two wakers match, then there is nothing else to do. If they do not match, then the recorded waker must be updated.

Notify utility

We demonstrated how a Delay future could be implemented by hand using wakers. Wakers are the foundation of how asynchronous Rust works. Usually, it is not necessary to drop down to that level. For example, in the case of Delay, we could implement it entirely with async/await by using the tokio::sync::Notify utility. This utility provides a basic task notification mechanism. It handles the details of wakers, including making sure that the recorded waker matches the current task.

Using Notify, we can implement a delay function using async/await like this:

  1. use tokio::sync::Notify;
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use std::thread;
  5. async fn delay(dur: Duration) {
  6. let when = Instant::now() + dur;
  7. let notify = Arc::new(Notify::new());
  8. let notify2 = notify.clone();
  9. thread::spawn(move || {
  10. let now = Instant::now();
  11. if now < when {
  12. thread::sleep(when - now);
  13. }
  14. notify2.notify_one();
  15. });
  16. notify.notified().await;
  17. }