Runtime Model

Now we will go over the Tokio / futures runtime model. Tokio is built on top ofthe futures crate and uses its runtime model. This allows it to interopwith other libraries also using the futures crate.

Note: This runtime model is very different than async libraries found inother languages. While, at a high level, APIs can look similar, the way codegets executed differs.

Synchronous Model

First, let’s talk briefly about the synchronous (or blocking) model. This is themodel that the Rust standard library uses.

  1. # use std::io::prelude::*;
  2. # use std::net::TcpStream;
  3. # fn dox(mut socket: TcpStream) {
  4. // let socket = ...;
  5. let mut buf = [0; 1024];
  6. let n = socket.read(&mut buf).unwrap();
  7. // Do something with &buf[..n];
  8. # }

When socket.read is called, either the socket has pending data in its receivebuffer or it does not. If there is pending data, then the call to read willreturn immediately and buf will be filled with that data. However, if there isno pending data, then the read function will block the current thread untildata is received. At which time, buf will be filled with this newly receiveddata and the read function will return.

In order to perform reads on many different sockets concurrently, a thread persocket is required. Using a thread per socket does not scale up very well tolarge numbers of sockets. This is known as the c10k problem.

Non-blocking sockets

The way to avoid blocking a thread when performing an operation like read is tonot block the thread! When the socket has no pending data in its receive buffer,the read function returns immediately, indicating that the socket was “notready” to perform the read operation.

When using a Tokio TcpStream, a call to read will always immediately returna value (ErrorKind::WouldBlock) even if there is no pending data to read.If there is no pending data, the caller is responsible for calling read againat a later time. The trick is to know when that “later time” is.

Another way to think about a non-blocking read is as “polling” the socket fordata to read.

Polling Model

The strategy of polling a socket for data can be generalized to any operation.For example, a function to get a “widget” in the polling model would looksomething like this:

  1. fn poll_widget() -> Async<Widget> { ... }

This function returns an Async<Widget> where Async is an enum ofReady(Widget) or NotReady. The Async enum is provided by the futurescrate and is one of the building blocks of the polling model.

Now, lets define an asynchronous task without combinators that uses thispoll_widget function. The task will do the following:

  • Acquire a widget.
  • Print the widget to STDOUT.
  • Terminate the task.
    To define a task, we implement the Future trait.
  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::{Async, Future};
  4. #
  5. # #[derive(Debug)]
  6. # pub struct Widget;
  7. # fn poll_widget() -> Async<Widget> { unimplemented!() }
  8. #
  9. /// A task that polls a single widget and writes it to STDOUT.
  10. pub struct MyTask;
  11. impl Future for MyTask {
  12. // The value this future will have when ready
  13. type Item = ();
  14. type Error = ();
  15. fn poll(&mut self) -> Result<Async<()>, ()> {
  16. match poll_widget() {
  17. Async::Ready(widget) => {
  18. println!("widget={:?}", widget);
  19. Ok(Async::Ready(()))
  20. }
  21. Async::NotReady => {
  22. Ok(Async::NotReady)
  23. }
  24. }
  25. }
  26. }
  27. #
  28. # fn main() {
  29. # }
Important: Returning Async::NotReady has special meaning. See the nextsection for more details.

The key thing to note is, when MyTask::poll is called, it immediately tries toget the widget. If the call to poll_widget returns NotReady, then the taskis unable to make further progress. The task then returns NotReady itself,indicating that it is not ready to complete processing.

The task implementation does not block. Instead, “sometime in the future”, theexecutor will call MyTask::poll again. poll_widget will be called again. Ifpoll_widget is ready to return a widget, then the task, in turn, is ready toprint the widget. The task can then complete by returning Ready.

Executors

In order for the task to make progress, something has to call MyTask::poll.This is the job of an executor.

Executors are responsible for repeatedly calling poll on a task until Readyis returned. There are many different ways to do this. For example, theCurrentThread executor will block the current thread and loop through allspawned tasks, calling poll on them. ThreadPool schedules tasks across a threadpool. This is also the default executor used by the runtime.

All tasks must be spawned on an executor or no work will be performed.

At the very simplest, an executor could look something like this:

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::{Async, Future};
  4. # use std::collections::VecDeque;
  5. #
  6. pub struct SpinExecutor {
  7. // the tasks an executor is responsible for in
  8. // a double ended queue
  9. tasks: VecDeque<Box<Future<Item = (), Error = ()> + Send>>,
  10. }
  11. impl SpinExecutor {
  12. pub fn spawn<T>(&mut self, task: T)
  13. where T: Future<Item = (), Error = ()> + 'static + Send
  14. {
  15. self.tasks.push_back(Box::new(task));
  16. }
  17. pub fn run(&mut self) {
  18. // Pop tasks off the front in a tight loop
  19. while let Some(mut task) = self.tasks.pop_front() {
  20. match task.poll().unwrap() {
  21. Async::Ready(_) => {}
  22. Async::NotReady => {
  23. // If the task is not ready put it to the back of queue
  24. self.tasks.push_back(task);
  25. }
  26. }
  27. }
  28. }
  29. }
  30. # pub fn main() {}

Of course, this would not be very efficient. The executor spins in a busy loopand tries to poll all tasks even if the task will just return NotReady again.

Ideally, there would be some way for the executor to know when the “readiness”state of a task is changed, i.e. when a call to poll will return Ready.Then, the executor would look something like this:

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::{Async, Future};
  4. # use std::collections::VecDeque;
  5. #
  6. # pub struct SpinExecutor {
  7. # ready_tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
  8. # not_ready_tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
  9. # }
  10. #
  11. # impl SpinExecutor {
  12. # fn sleep_until_tasks_are_ready(&self) {}
  13. #
  14. pub fn run(&mut self) {
  15. loop {
  16. while let Some(mut task) = self.ready_tasks.pop_front() {
  17. match task.poll().unwrap() {
  18. Async::Ready(_) => {}
  19. Async::NotReady => {
  20. self.not_ready_tasks.push_back(task);
  21. }
  22. }
  23. }
  24. if self.not_ready_tasks.is_empty() {
  25. return;
  26. }
  27. // Put the thread to sleep until there is work to do
  28. self.sleep_until_tasks_are_ready();
  29. }
  30. }
  31. # }
  32. # pub fn main() {}

Being able to get notified when a task goes from “not ready” to “ready” is thecore of the futures task model.

Next up: I/O with Tokio