Runtime model

Applications written using Tokio are organized across a large number of small,non-blocking tasks. A Tokio task is similar to a goroutine or anErlang process, but is non-blocking. They are designed to belightweight, can be spawned fast, and maintain low scheduling overhead. They arealso non-blocking, as such operations that are not able to finish immediatelymust still return immediately. Instead of returning the result of the operation,they return a value indicating that the operation is in progress.

Non-blocking execution

A Tokio task is implemented using the Future trait:

  1. # extern crate futures;
  2. # use futures::*;
  3. # type MyResource = future::FutureResult<(), ()>;
  4. struct MyTask {
  5. my_resource: MyResource,
  6. }
  7. # impl MyTask {
  8. # fn process(&self, _: ()) {}
  9. # fn process_err(&self, _: ()) {}
  10. # }
  11. impl Future for MyTask {
  12. type Item = ();
  13. type Error = ();
  14. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  15. match self.my_resource.poll() {
  16. Ok(Async::Ready(value)) => {
  17. self.process(value);
  18. Ok(Async::Ready(()))
  19. }
  20. Ok(Async::NotReady) => Ok(Async::NotReady),
  21. Err(err) => {
  22. self.process_err(err);
  23. Ok(Async::Ready(()))
  24. }
  25. }
  26. }
  27. }

Tasks are submitted to an executor using tokio::spawn or by calling a spawnmethod on an executor object. The poll function drives the task. No work isdone without calling poll. It is the executor’s job to call poll on the taskuntil Ready(()) is returned.

MyTask will receive a value from my_resource and process it. Once the valuehas been processed, the task has completed its logic and is done. This isrepresented by returning Ok(Async::Ready(())).

However, in order to complete processing, the task depends on my_resourceproviding a value. Given that my_resource is a non-blocking task, it may ormay not be ready to provide the value when my_resource.poll() is called. If itis ready, it returns Ok(Async::Ready(value)). If it is not ready, it returnsOk(Async::NotReady).

When the resource is not ready to provide a value, this implies that the taskitself is not ready to complete and the task’s poll function returnsNotReady as well.

At some point in the future, the resource will become ready to provide thevalue. The resource uses the task system to signal to the executor that it isready. The executor schedules the task, which leads to MyTask::poll beingcalled again. This time, given that my_resource is ready, the value will bereturned from my_resource.poll() and the task is able to complete.

Cooperative scheduling

Cooperative scheduling is used to schedule tasks on executors. A single executoris expected to manage many tasks across a small set of threads. There will bea far greater number of tasks than threads. There also is no pre-emption. Thismeans that when a task is scheduled to execute, it blocks the current threaduntil the poll function returns.

Because of this, it is important for implementations of poll to only executefor very short periods of time. For I/O bound applications, this usually happensautomatically. However, if a task must run a longer computation, it should deferwork to a blocking pool or break up the computation into smaller chunks andyield back to the executor after each chunk.

Task system

The task system is the system by which resources notify executors of readinesschanges. A task is composed of non-blocking logic that consume resources. In theexample above, MyTask uses a single resource, my_resource, but there is nolimit to the number of resources that a task can consume.

When a task is executing and attempts to use a resource that is not ready, itbecomes logically blocked on that resource, i.e., the task is not able to makefurther progress until that resource becomes ready. Tokio tracks which resourcesa task is currently blocked on to make forward progress. When a dependentresource becomes ready, the executor schedules the task. This is done bytracking when a task expresses interest in a resource.

When MyTask executes, attempts to consume my_resource, and my_resourcereturns NotReady, MyTask has implicitly expressed interest in themy_resource resource. At this point the task and the resource are linked. Whenthe resource becomes ready, the task is scheduled again.

task::current and Task::notify

Tracking interest and notifying readiness changes is done with two APIs:

  • task::current
  • Task::notify
    When my_resource.poll() is called, if the resource is ready, it immediatelyreturns the value without using the task system. If the resource is notready, it gets a handle to the current task by calling task::current() ->
    Task
    . This handle is obtained by reading a thread-local variable setby the executor.

Some external event (data received on the network, background thread completinga computation, etc…) will result in my_resource becoming ready to produceits value. At that point, the logic that readies my_resource will callnotify on the task handle obtained from task::current. Thissignals the readiness change to the executor, which then schedules the task forexecution.

If multiple tasks have expressed interest in a resource, only the last task tohave done so will be notified. Resources are intended to be used from a singletask only.

Async::NotReady

Any function that returns Async must adhere to the contract. WhenNotReady is returned, the current task must have been registered fornotification on readiness change. The implication for resources is discussed inthe above section. For task logic, this means that NotReady cannot be returnedunless a resource has returned NotReady. By doing this, thecontract transitively upheld. The current task is registered fornotification because NotReady has been received from the resource.

Great care must be taken to avoiding returning NotReady without havingreceived NotReady from a resource. For example, the following taskimplementation results in the task never completing.

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. use futures::{Future, Poll, Async};
  5. # type Resource1 = futures::future::FutureResult<(), ()>;
  6. # struct Resource2;
  7. # impl Resource2 {
  8. # fn new(_: ()) -> Self { Resource2 }
  9. # }
  10. # impl Future for Resource2 {
  11. # type Item = ();
  12. # type Error = ();
  13. # fn poll(&mut self) -> Poll<(), ()> { unimplemented!(); }
  14. # }
  15. enum BadTask {
  16. First(Resource1),
  17. Second(Resource2),
  18. }
  19. impl Future for BadTask {
  20. type Item = ();
  21. type Error = ();
  22. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  23. use self::BadTask::*;
  24. let value = match *self {
  25. First(ref mut resource) => {
  26. try_ready!(resource.poll())
  27. }
  28. Second(ref mut resource) => {
  29. try_ready!(resource.poll());
  30. return Ok(Async::Ready(()));
  31. }
  32. };
  33. *self = Second(Resource2::new(value));
  34. Ok(Async::NotReady)
  35. }
  36. }
  37. # fn main() {}

The problem with the above implementation is that Ok(Async::NotReady) isreturned right after transitioning the state to Second. During thistransition, no resource has returned NotReady. When the task itself returnsNotReady, it has violated the contract as the task will not benotified in the future.

This situation is generally resolved by adding a loop:

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. use futures::{Future, Poll, Async};
  5. # type Resource1 = futures::future::FutureResult<(), ()>;
  6. # struct Resource2;
  7. # impl Resource2 {
  8. # fn new(_: ()) -> Self { Resource2 }
  9. # }
  10. # impl Future for Resource2 {
  11. # type Item = ();
  12. # type Error = ();
  13. # fn poll(&mut self) -> Poll<(), ()> { unimplemented!(); }
  14. # }
  15. # enum BadTask {
  16. # First(Resource1),
  17. # Second(Resource2),
  18. # }
  19. # impl Future for BadTask {
  20. # type Item = ();
  21. # type Error = ();
  22. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  23. use self::BadTask::*;
  24. loop {
  25. let value = match *self {
  26. First(ref mut resource) => {
  27. try_ready!(resource.poll())
  28. }
  29. Second(ref mut resource) => {
  30. try_ready!(resource.poll());
  31. return Ok(Async::Ready(()));
  32. }
  33. };
  34. *self = Second(Resource2::new(value));
  35. }
  36. }
  37. # }
  38. # fn main() {}

One way to think about it is that a task’s poll function must notreturn until it is unable to make any further progress due to its resources notbeing ready or it explicitly yields (see below).

Also note that functions that return Async must only be called from atask. In other words, these functions may only be called from code that hasbeen submitted to tokio::spawn or other task spawn function.

Yielding

Sometimes a task must return NotReady without being blocked on a resource.This usually happens when computation to run is large and the task wants toreturn control to the executor to allow it to execute other futures.

Yielding is done by notifying the current task and returning NotReady:

  1. # extern crate futures;
  2. use futures::task;
  3. use futures::Async;
  4. # fn poll_dox() -> Result<Async<()>, ()> {
  5. // Yield the current task. The executor will poll this task next
  6. // iteration through its run list.
  7. task::current().notify();
  8. return Ok(Async::NotReady);
  9. # }

Yield can be used to break up a CPU expensive computation:

  1. # extern crate futures;
  2. # use futures::*;
  3. struct Count {
  4. remaining: usize,
  5. }
  6. impl Future for Count {
  7. type Item = ();
  8. type Error = ();
  9. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  10. while self.remaining > 0 {
  11. self.remaining -= 1;
  12. // Yield every 10 iterations
  13. if self.remaining % 10 == 0 {
  14. task::current().notify();
  15. return Ok(Async::NotReady);
  16. }
  17. }
  18. Ok(Async::Ready(()))
  19. }
  20. }

Executors

Executors are responsible for driving many tasks to completion. A task isspawned onto an executor, at which point the executor calls its poll functionwhen needed. The executor hooks into the task system to receive resourcereadiness notifications.

By decoupling the task system with the executor implementation, the specificexecution and scheduling logic can be left to the executor implementation. Tokioprovides two executor implementations, each with unique characteristics:current_thread and thread_pool.

When a task is first spawned onto the executor, the executor wraps it withSpawn. This binds the task logic with the task state (this is mostlyrequired for legacy reasons). Executors will typically store the task on theheap, usually by storing it in a Box or an Arc. When the executor picks atask for execution, it calls Spawn::poll_future_notify.This function ensures that the task context is set to the thread-local variablesuch that task::current is able to read it.

When calling poll_future_notify, the executor alsopasses in a notify handle and an identifier. These arguments are included in thetask handle returned by task::current and are how the task islinked to the executor.

The notify handle is an implementation of Notify and the identifieris a value that the executor uses to look up the current task. WhenTask::notify is called, the notify function onthe notify handle is called with the supplied identifier. The implementation ofthis function is responsible for performing the scheduling logic.

One strategy for implementing an executor is to store each task in a Box andto use a linked list to track tasks that are scheduled for execution. WhenNotify::notify is called, then the task associated with theidentifier is pushed at the end of the scheduled linked list. When theexecutor runs, it pops from the front of the linked list and executes the taskas described above.

Note that this section does not describe how the executor is run. The details ofthis are left to the executor implementation. One option is for the executor tospawn one or more threads and dedicate these threads to draining the scheduledlinked list. Another is to provide a MyExecutor::run function that blocks thecurrent thread and drains the scheduled linked list.

Resources, drivers, and runtimes

Resources are leaf futures, i.e. futures that are not implemented in terms ofother futures. They are the types that use the task system described above tointeract with the executor. Resource types include TCP and UDP sockets, timers,channels, file handles, etc. Tokio applications rarely need to implementresources. Instead, they use resources provided by Tokio or third party crates.

Oftentimes, a resource cannot function by itself and requires a driver. Forexample, Tokio TCP sockets are backed by a Reactor. The reactor is thesocket resource driver. A single driver may power large numbers of resourceinstances. In order to use the resource, the driver must be running somewhere inthe process. Tokio provides drivers for network resources (tokio-reactor),file resources (tokio-fs), and timers (tokio-timer). Providing decoupleddriver components allows users to pick and choose which components they wish touse. Each driver can be used standalone or combined with other drivers.

Because of this, in order to use Tokio and successfully execute tasks, anapplication must start an executor and the necessary drivers for the resourcesthat the application’s tasks depend on. This requires significant boilerplate.To manage the boilerplate, Tokio offers a couple of runtime options. A runtimeis an executor bundled with all necessary drivers to power Tokio’s resources.Instead of managing all the various Tokio components individually, a runtime iscreated and started in a single call.

Tokio offers a concurrent runtime and asingle-threaded runtimee. The concurrent runtime is backed bya multi-threaded, work-stealing executor. The single-threaded runtime executesall tasks and drivers on thee current thread. The user may pick the runtime withcharacteristics best suited for the application.

Future

As mentioned above, tasks are implemented using the Future trait. This traitis not limited to implementing tasks. A Future is a value that represents anon-blocking computation that will complete sometime in the future. A task is acomputation with no output. Many resources in Tokio are represented withFuture implementations. For example, a timeout is a Future thatcompletes once the deadline has been reached.

The trait includes a number of combinators that are useful for working withfuture values.

Applications are built by either implementing Future for application specifictypes or defining application logic using combinators. Often, a mix of bothstrategies is most successful.

Next up: Non-blocking I/O