Wakers

Wakers are the missing piece. This is the system by which a resource is able to notify the waiting task that the resource has become ready to continue some operation.

Let’s look at the Future::poll definition again:

  1. fn poll(self: Pin<&mut Self>, cx: &mut Context)
  2. -> Poll<Self::Output>;

The Context argument to poll has a waker() method. This method returns a Waker bound to the current task. The Waker has a wake() method. Calling this method signals to the executor that the associated task should be scheduled for execution. Resources call wake() when they transition to a ready state to notify the executor that polling the task will be able to make progress.

Updating Delay

We can update Delay to use wakers:

  1. use std::future::Future;
  2. use std::pin::Pin;
  3. use std::task::{Context, Poll};
  4. use std::time::{Duration, Instant};
  5. use std::thread;
  6. struct Delay {
  7. when: Instant,
  8. }
  9. impl Future for Delay {
  10. type Output = &'static str;
  11. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
  12. -> Poll<&'static str>
  13. {
  14. if Instant::now() >= self.when {
  15. println!("Hello world");
  16. Poll::Ready("done")
  17. } else {
  18. // Get a handle to the waker for the current task
  19. let waker = cx.waker().clone();
  20. let when = self.when;
  21. // Spawn a timer thread.
  22. thread::spawn(move || {
  23. let now = Instant::now();
  24. if now < when {
  25. thread::sleep(when - now);
  26. }
  27. waker.wake();
  28. });
  29. Poll::Pending
  30. }
  31. }
  32. }

Now, once the requested duration has elapsed, the calling task is notified and the executor can ensure the task is scheduled again. The next step is to update mini-tokio to listen for wake notifications.

There are still a few remaining issues with our Delay implementation. We will fix them later.

When a future returns Poll::Pending, it must ensure that the waker is signalled at some point. Forgetting to do this results in the task hanging indefinitely.

Forgetting to wake a task after returning Poll::Pending is a common source of bugs.

Recall the first iteration of Delay. Here was the future implementation:

  1. impl Future for Delay {
  2. type Output = &'static str;
  3. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
  4. -> Poll<&'static str>
  5. {
  6. if Instant::now() >= self.when {
  7. println!("Hello world");
  8. Poll::Ready("done")
  9. } else {
  10. // Ignore this line for now.
  11. cx.waker().wake_by_ref();
  12. Poll::Pending
  13. }
  14. }
  15. }

Before returning Poll::Pending, we called cx.waker().wake_by_ref(). This is to satisfy the future contract. By returning Poll::Pending, we are responsible for signalling the waker. Because we didn’t implement the timer thread yet, we signalled the waker inline. Doing so will result in the future being immediately re-scheduled, executed again, and probably not be ready to complete.

Notice that you are allowed to signal the waker more often than necessary. In this particular case, we signal the waker even though we are not ready to continue the operation at all. There is nothing wrong with this besides some wasted CPU cycles. However, this particular implementation will result in a busy loop.

Updating Mini Tokio

The next step is updating Mini Tokio to receive waker notifications. We want the executor to only run tasks when they are woken, and to do this, Mini Tokio will provide its own waker. When the waker is invoked, its associated task is queued to be executed. Mini-Tokio passes this waker to the future when it polls the future.

The updated Mini Tokio will use a channel to store scheduled tasks. Channels allow tasks to be queued for execution from any thread. Wakers must be Send and Sync, so we use the channel from the crossbeam crate, as the standard library channel is not Sync.

The Send and Sync traits are marker traits related to concurrency provided by Rust. Types that can be sent to a different thread are Send. Most types are Send, but something like Rc is not. Types that can be concurrently accessed through immutable references are Sync. A type can be Send but not Sync — a good example is Cell, which can be modified through an immutable reference, and is thus not safe to access concurrently.

For more details, see the related chapter in the Rust book.

Add the following dependency to your Cargo.toml to pull in channels.

  1. crossbeam = "0.7"

Then, update the MiniTokio struct.

  1. use crossbeam::channel;
  2. use std::sync::Arc;
  3. struct MiniTokio {
  4. scheduled: channel::Receiver<Arc<Task>>,
  5. sender: channel::Sender<Arc<Task>>,
  6. }
  7. struct Task {
  8. // This will be filled in soon.
  9. }

Wakers are Sync and can be cloned. When wake is called, the task must be scheduled for execution. To implement this, we have a channel. When the wake() is called on the waker, the task is pushed into the send half of the channel. Our Task structure will implement the wake logic. To do this, it needs to contain both the spawned future and the channel send half.

  1. use std::sync::{Arc, Mutex};
  2. struct Task {
  3. // The `Mutex` is to make `Task` implement `Sync`. Only
  4. // one thread accesses `future` at any given time. The
  5. // `Mutex` is not required for correctness. Real Tokio
  6. // does not use a mutex here, but real Tokio has
  7. // more lines of code than can fit in a single tutorial
  8. // page.
  9. future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
  10. executor: channel::Sender<Arc<Task>>,
  11. }
  12. impl Task {
  13. fn schedule(self: &Arc<Self>) {
  14. self.executor.send(self.clone());
  15. }
  16. }

To schedule the task, the Arc is cloned and sent through the channel. Now, we need to hook our schedule function with std::task::Waker. The standard library provides a low-level API to do this using manual vtable construction. This strategy provides maximum flexibility to implementors, but requires a bunch of unsafe boilerplate code. Instead of using RawWakerVTable directly, we will use the ArcWake utility provided by the futures crate. This allows us to implement a simple trait to expose our Task struct as a waker.

Add the following dependency to your Cargo.toml to pull in futures.

  1. futures = "0.3"

Then implement futures::task::ArcWake.

  1. use futures::task::ArcWake;
  2. use std::sync::Arc;
  3. impl ArcWake for Task {
  4. fn wake_by_ref(arc_self: &Arc<Self>) {
  5. arc_self.schedule();
  6. }
  7. }

When the timer thread above calls waker.wake(), the task is pushed into the channel. Next, we implement receiving and executing the tasks in the MiniTokio::run() function.

  1. impl MiniTokio {
  2. fn run(&self) {
  3. while let Ok(task) = self.scheduled.recv() {
  4. task.poll();
  5. }
  6. }
  7. /// Initialize a new mini-tokio instance.
  8. fn new() -> MiniTokio {
  9. let (sender, scheduled) = channel::unbounded();
  10. MiniTokio { scheduled, sender }
  11. }
  12. /// Spawn a future onto the mini-tokio instance.
  13. ///
  14. /// The given future is wrapped with the `Task` harness and pushed into the
  15. /// `scheduled` queue. The future will be executed when `run` is called.
  16. fn spawn<F>(&self, future: F)
  17. where
  18. F: Future<Output = ()> + Send + 'static,
  19. {
  20. Task::spawn(future, &self.sender);
  21. }
  22. }
  23. impl Task {
  24. fn poll(self: Arc<Self>) {
  25. // Create a waker from the `Task` instance. This
  26. // uses the `ArcWake` impl from above.
  27. let waker = task::waker(self.clone());
  28. let mut cx = Context::from_waker(&waker);
  29. // No other thread ever tries to lock the future
  30. let mut future = self.future.try_lock().unwrap();
  31. // Poll the future
  32. let _ = future.as_mut().poll(&mut cx);
  33. }
  34. // Spawns a new taks with the given future.
  35. //
  36. // Initializes a new Task harness containing the given future and pushes it
  37. // onto `sender`. The receiver half of the channel will get the task and
  38. // execute it.
  39. fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
  40. where
  41. F: Future<Output = ()> + Send + 'static,
  42. {
  43. let task = Arc::new(Task {
  44. future: Mutex::new(Box::pin(future)),
  45. executor: sender.clone(),
  46. });
  47. let _ = sender.send(task);
  48. }
  49. }

Multiple things are happening here. First, MiniTokio::run() is implemented. The function runs in a loop receiving scheduled tasks from the channel. As tasks are pushed into the channel when they are woken, these tasks are able to make progress when executed.

Additionally, the MiniTokio::new() and MiniTokio::spawn() functions are adjusted to use a channel rather than a VecDeque. When new tasks are spawned, they are given a clone of the sender-part of the channel, which the task can use to schedule itself on the runtime.

The Task::poll() function creates the waker using the ArcWake utility from the futures crate. The waker is used to create a task::Context. That task::Context is passed to poll.