Futures: In Depth

Futures, hinted at earlier in the guide, are the building block used to manageasynchronous logic. They are the underlying asynchronous abstraction used byTokio.

The future implementation is provided by the futures crate. However, forconvenience, Tokio re-exports a number of the types.

What Are Futures?

A future is a value that represents the completion of an asynchronouscomputation. Usually, the future completes due to an event that happenselsewhere in the system. While we’ve been looking at things from the perspectiveof basic I/O, you can use a future to represent a wide range of events, e.g.:

  • A database query that’s executing in a thread pool. When the queryfinishes, the future is completed, and its value is the result of the query.

  • An RPC invocation to a server. When the server replies, the future iscompleted, and its value is the server’s response.

  • A timeout. When time is up, the future is completed, and its value is().

  • A long-running CPU-intensive task, running on a thread pool. When the taskfinishes, the future is completed, and its value is the return value of thetask.

  • Reading bytes from a socket. When the bytes are ready, the future iscompleted – and depending on the buffering strategy, the bytes might bereturned directly, or written as a side-effect into some existing buffer.

The entire point of the future abstraction is to allow asynchronous functions,i.e., functions that cannot immediately return a value, to be able to returnsomething.

For example, an asynchronous HTTP client could provide a get function thatlooks like this:

  1. pub fn get(&self, uri: &str) -> ResponseFuture { ... }

Then, the user of the library would use the function as so:

  1. let response_future = client.get("https://www.example.com");

Now, the response_future isn’t the actual response. It is a future that willcomplete once the response is received. However, since the caller has a concretething (the future), they can start to use it. For example, they may chaincomputations to perform once the response is received or they might pass thefuture to a function.

  1. let response_is_ok = response_future
  2. .map(|response| {
  3. response.status().is_ok()
  4. });
  5. track_response_success(response_is_ok);

All of those actions taken with the future don’t immediately perform any work.They cannot because they don’t have the actual HTTP response. Instead, theydefine the work to be done when the response future completes.

Both the futures crate and Tokio come with a collection of combinatorfunctions that can be used to work with futures.

Implementing Future

Implementing the Future is pretty common when using Tokio, so it is importantto be comfortable with it.

As discussed in the previous section, Rust futures are poll based. This is aunique aspect of the Rust future library. Most future libraries for otherprogramming languages use a push based model where callbacks are supplied to thefuture and the computation invokes the callback immediately with the computationresult.

Using a poll based model offers many advantages, including being a zero costabstraction, i.e., using Rust futures has no added overhead compared to writingthe asynchronous code by hand.

The Future trait is as follows:

  1. trait Future {
  2. /// The type of the value returned when the future completes.
  3. type Item;
  4. /// The type representing errors that occurred while processing the
  5. /// computation.
  6. type Error;
  7. fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
  8. }

Usually, when you implement a Future, you will be defining a computation thatis a composition of sub (or inner) futures. In this case, the future implementation triesto call the inner future(s) and returns NotReady if the inner futures are notready.

The following example is a future that is composed of another future thatreturns a usize and will double that value:

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::*;
  4. pub struct Doubler<T> {
  5. inner: T,
  6. }
  7. pub fn double<T>(inner: T) -> Doubler<T> {
  8. Doubler { inner }
  9. }
  10. impl<T> Future for Doubler<T>
  11. where T: Future<Item = usize>
  12. {
  13. type Item = usize;
  14. type Error = T::Error;
  15. fn poll(&mut self) -> Result<Async<usize>, T::Error> {
  16. match self.inner.poll()? {
  17. Async::Ready(v) => Ok(Async::Ready(v * 2)),
  18. Async::NotReady => Ok(Async::NotReady),
  19. }
  20. }
  21. }
  22. # pub fn main() {}

When the Doubler future is polled, it polls its inner future. If the innerfuture is not ready, the Doubler future returns NotReady. If the innerfuture is ready, then the Doubler future doubles the return value and returnsReady.

Because the matching pattern above is common, the futures crate provides amacro: try_ready!. It is similar to try! or ?, but it also returns onNotReady. The above poll function can be rewritten using try_ready! asfollows:

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. # use futures::*;
  5. # pub struct Doubler<T> {
  6. # inner: T,
  7. # }
  8. #
  9. # impl<T> Future for Doubler<T>
  10. # where T: Future<Item = usize>
  11. # {
  12. # type Item = usize;
  13. # type Error = T::Error;
  14. #
  15. fn poll(&mut self) -> Result<Async<usize>, T::Error> {
  16. let v = try_ready!(self.inner.poll());
  17. Ok(Async::Ready(v * 2))
  18. }
  19. # }
  20. # pub fn main() {}

Returning NotReady

The last section handwaved a bit and said that once a Future transitioned to theready state, the executor is notified. This enables the executor to be efficientin scheduling tasks.

When a function returns Async::NotReady, it signals that it is currently not ina ready state and is unable to complete the operation. It is critical that theexecutor is notified when the state transitions to “ready”. Otherwise, the taskwill hang infinitely, never getting run again.

For most future implementations, this is done transitively. When a futureimplementation is a combination of sub futures, the outer future only returnsNotReady when at least one inner future returned NotReady. Thus, the outerfuture will transition to a ready state once the inner future transitions to aready state. In this case, the NotReady contract is already satisfied as theinner future will notify the executor when it becomes ready.

Innermost futures, sometimes called “resources”, are the ones responsible fornotifying the executor. This is done by calling notify on the task returnedby task::current().

We will be exploring implementing resources and the task system in more depth ina later section. The key take away here is do not return NotReady unless yougot NotReady from an inner future.

A More Complicated Future

Let’s look at a slightly more complicated future implementation. In this case, wewill implement a future that takes a host name, does DNS resolution, thenestablishes a connection to the remote host. We assume a resolve functionexists that looks like this:

  1. pub fn resolve(host: &str) -> ResolveFuture;

where ResolveFuture is a future returning a SocketAddr.

The steps to implement the future are:

  • Call resolve to get a ResolveFuture instance.
  • Call ResolveFuture::poll until it returns a SocketAddr.
  • Pass the SocketAddr to TcpStream::connect.
  • Call ConnectFuture::poll until it returns the TcpStream.
  • Complete the outer future with the TcpStream.
    We will use an enum to track the state of the future as it advances throughthese steps.
  1. # extern crate tokio;
  2. # use tokio::net::tcp::ConnectFuture;
  3. # pub struct ResolveFuture;
  4. enum State {
  5. // Currently resolving the host name
  6. Resolving(ResolveFuture),
  7. // Establishing a TCP connection to the remote host
  8. Connecting(ConnectFuture),
  9. }
  10. # pub fn main() {}

And the ResolveAndConnect future is defined as:

  1. # pub struct State;
  2. pub struct ResolveAndConnect {
  3. state: State,
  4. }

Now, the implementation:

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. # extern crate tokio;
  5. # use tokio::net::tcp::{ConnectFuture, TcpStream};
  6. # use futures::prelude::*;
  7. # use std::io;
  8. # pub struct ResolveFuture;
  9. # enum State {
  10. # Resolving(ResolveFuture),
  11. # Connecting(ConnectFuture),
  12. # }
  13. # fn resolve(host: &str) -> ResolveFuture { unimplemented!() }
  14. # impl Future for ResolveFuture {
  15. # type Item = ::std::net::SocketAddr;
  16. # type Error = io::Error;
  17. # fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  18. # unimplemented!();
  19. # }
  20. # }
  21. #
  22. # pub struct ResolveAndConnect {
  23. # state: State,
  24. # }
  25. pub fn resolve_and_connect(host: &str) -> ResolveAndConnect {
  26. let state = State::Resolving(resolve(host));
  27. ResolveAndConnect { state }
  28. }
  29. impl Future for ResolveAndConnect {
  30. type Item = TcpStream;
  31. type Error = io::Error;
  32. fn poll(&mut self) -> Result<Async<TcpStream>, io::Error> {
  33. use self::State::*;
  34. loop {
  35. let addr = match self.state {
  36. Resolving(ref mut fut) => {
  37. try_ready!(fut.poll())
  38. }
  39. Connecting(ref mut fut) => {
  40. return fut.poll();
  41. }
  42. };
  43. // If we reach here, the state was `Resolving`
  44. // and the call to the inner Future returned `Ready`
  45. let connecting = TcpStream::connect(&addr);
  46. self.state = Connecting(connecting);
  47. }
  48. }
  49. }
  50. # pub fn main() {}

This illustrates how Future implementations are state machines. This futurecan be in either of two states:

  • Resolving
  • Connecting
    Each time poll is called, we try to advance the state machine to the nextstate.

Now, the future is basically a re-implementation of the combinator AndThen, so we wouldprobably just use that combinator.

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. # extern crate tokio;
  5. # use tokio::net::tcp::{ConnectFuture, TcpStream};
  6. # use futures::prelude::*;
  7. # use std::io;
  8. # pub struct ResolveFuture;
  9. # fn resolve(host: &str) -> ResolveFuture { unimplemented!() }
  10. # impl Future for ResolveFuture {
  11. # type Item = ::std::net::SocketAddr;
  12. # type Error = io::Error;
  13. # fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  14. # unimplemented!();
  15. # }
  16. # }
  17. # pub fn dox(my_host: &str) {
  18. # let _ =
  19. resolve(my_host)
  20. .and_then(|addr| TcpStream::connect(&addr))
  21. # ;
  22. # }
  23. # pub fn main() {}

This is much shorter and does the same thing.

Next up: Tasks