Implementing futures

Implementing futures is very common when using Tokio. Let’s start with a verybasic future that performs no asynchronous logic and simply returns a message(the venerable “hello world”).

The Future trait.

The Future trait is as follows:

  1. trait Future {
  2. /// The type of the value returned when the future completes.
  3. type Item;
  4. /// The type representing errors that occurred while processing the computation.
  5. type Error;
  6. /// The function that will be repeatedly called to see if the future
  7. /// has completed or not. The `Async` enum can either be `Ready` or
  8. /// `NotReady` and indicates whether the future is ready to produce
  9. /// a value or not.
  10. fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
  11. }

Let’s implement it for our “hello world” future:

  1. # #![deny(deprecated)]
  2. extern crate futures;
  3. // `Poll` is a type alias for `Result<Async<T>, E>`
  4. use futures::{Future, Async, Poll};
  5. struct HelloWorld;
  6. impl Future for HelloWorld {
  7. type Item = String;
  8. type Error = ();
  9. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  10. Ok(Async::Ready("hello world".to_string()))
  11. }
  12. }

The Item and Error associated types define the types returned by the futureonce it completes. Item is the success value and Error is returned when thefuture encounters an error while processing. By convention, infallible futuresset Error to ().

Futures use a poll based model. The consumer of a future repeatedly calls thepoll function. The future then attempts to complete. If the future is able tocomplete, it returns Async::Ready(value). If the future is unable to completedue to being blocked on an internal resource (such as a TCP socket), it returnsAsync::NotReady.

When a future’s poll function is called, the implementation willsynchronously do as much work as possible until it is logicallyblocked on some asynchronous event that has not occured yet. The futureimplementation then saves its state internally so that the next timepoll is called (after an external event is received), it resumesprocessing from the point it left off. Work is not repeated.

The hello world future requires no asynchronous processing and is immediatelyready, so it returns Ok(Async::Ready(value)).

Running the future

Tokio is responsible for running futures to completion. This is done by passingthe future to tokio::run.

The tokio::run accepts futures where both Item and Error are set to ().This is because Tokio only executes the futures, it does not do anything withvalues. The user of Tokio is required to fully process all values in the future.

In our case, let’s print the future to STDOUT. We will do that by implementing aDisplay future.

  1. # #![deny(deprecated)]
  2. extern crate futures;
  3. use futures::{Future, Async, Poll};
  4. use std::fmt;
  5. struct Display<T>(T);
  6. impl<T> Future for Display<T>
  7. where
  8. T: Future,
  9. T::Item: fmt::Display,
  10. {
  11. type Item = ();
  12. type Error = T::Error;
  13. fn poll(&mut self) -> Poll<(), T::Error> {
  14. let value = match self.0.poll() {
  15. Ok(Async::Ready(value)) => value,
  16. Ok(Async::NotReady) => return Ok(Async::NotReady),
  17. Err(err) => return Err(err),
  18. };
  19. println!("{}", value);
  20. Ok(Async::Ready(()))
  21. }
  22. }

The Display takes a future that yields items that can be displayed. When it ispolled, it first tries to poll the inner future. If the inner future is notready then Display cannot complete. In this case, Display also returnsNotReady.

poll implementations must never return NotReady unless they receivedNotReady by calling an inner future. This will be explained in more detailin a later section.

The Display future will error when the inner future errors. The error isbubbled up.

When HelloWorld is combined with Display, both the Item and Error typesare () and the future can be executed by Tokio:

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. # extern crate futures;
  4. # struct HelloWorld;
  5. # struct Display<T>(T);
  6. # impl<T> futures::Future for Display<T> {
  7. # type Item = ();
  8. # type Error = ();
  9. # fn poll(&mut self) -> futures::Poll<(), ()> {
  10. # Ok(().into())
  11. # }
  12. # }
  13. let future = Display(HelloWorld);
  14. tokio::run(future);

Running this results in “hello world” being outputted to standard out.

Cleaning things up

The pattern of waiting on an inner future is common enough that there is ahelper macro: try_ready!.

The poll function can be rewritten using the macro as such:

  1. # #![deny(deprecated)]
  2. #[macro_use]
  3. extern crate futures;
  4. use futures::{Future, Async, Poll};
  5. use std::fmt;
  6. struct Display<T>(T);
  7. impl<T> Future for Display<T>
  8. where
  9. T: Future,
  10. T::Item: fmt::Display,
  11. {
  12. type Item = ();
  13. type Error = T::Error;
  14. fn poll(&mut self) -> Poll<(), T::Error> {
  15. let value = try_ready!(self.0.poll());
  16. println!("{}", value);
  17. Ok(Async::Ready(()))
  18. }
  19. }
  20. # fn main() {}

Next up: Getting asynchronous