Combinators

Often times, Future implementations follow similar patterns. To help reduceboilerplate, the futures crate provides a number of utilities, called“combinators”, that abstract these patterns. Many of these combinators exist asfunctions on the Future trait.

Building blocks

Let’s revisit the future implementations from the previous pages and see howthey can be simplified by using combinators.

map

The map combinator takes a future and returns a new future that applies afunction to the value yielded by the first future.

This was the Display future previously implemented:

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. # extern crate tokio;
  5. #
  6. # use futures::{Future, Async, Poll};
  7. # use std::fmt;
  8. #
  9. # struct Display<T>(T);
  10. #
  11. impl<T> Future for Display<T>
  12. where
  13. T: Future,
  14. T::Item: fmt::Display,
  15. {
  16. type Item = ();
  17. type Error = T::Error;
  18. fn poll(&mut self) -> Poll<(), T::Error> {
  19. let value = try_ready!(self.0.poll());
  20. println!("{}", value);
  21. Ok(Async::Ready(()))
  22. }
  23. }
  24. fn main() {
  25. # let HelloWorld = futures::future::ok::<_, ()>("hello");
  26. let future = Display(HelloWorld);
  27. tokio::run(future);
  28. }

With the map combinator, it becomes:

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. extern crate futures;
  4. use futures::Future;
  5. fn main() {
  6. # let HelloWorld = futures::future::ok::<_, ()>("hello");
  7. let future = HelloWorld.map(|value| {
  8. println!("{}", value);
  9. });
  10. tokio::run(future);
  11. }

This is how map is implemented:

  1. # #![deny(deprecated)]
  2. # #[macro_use]
  3. # extern crate futures;
  4. # use futures::{Future, Async, Poll};
  5. pub struct Map<A, F> where A: Future {
  6. future: A,
  7. f: Option<F>,
  8. }
  9. impl<U, A, F> Future for Map<A, F>
  10. where A: Future,
  11. F: FnOnce(A::Item) -> U,
  12. {
  13. type Item = U;
  14. type Error = A::Error;
  15. fn poll(&mut self) -> Poll<U, A::Error> {
  16. let value = try_ready!(self.future.poll());
  17. let f = self.f.take().expect("cannot poll Map twice");
  18. Ok(Async::Ready(f(value)))
  19. }
  20. }
  21. # fn main() {}

Comparing Map with our Display implementation, it is clear how they both arevery similar. Where Display calls println!, Map passes the value to thefunction.

and_then

Now, let’s use combinators to rewrite the future that established a TCP streamand wrote “hello world” to the peer using the and_then combinator.

The and_then combinator allows sequencing two asynchronous operations. Oncethe first operation completes, the value is passed to a function. The functionuses that value to produce a new future and that future is then executed. Thedifference between and_then and map is that and_then’s function returns afuture where as map’s function returns a value.

The original implementation is found here. Once updated touse combinators, it becomes:

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. extern crate bytes;
  4. extern crate futures;
  5. use tokio::io;
  6. use tokio::net::TcpStream;
  7. use futures::Future;
  8. fn main() {
  9. let addr = "127.0.0.1:1234".parse().unwrap();
  10. let future = TcpStream::connect(&addr)
  11. .and_then(|socket| {
  12. io::write_all(socket, b"hello world")
  13. })
  14. .map(|_| println!("write complete"))
  15. .map_err(|_| println!("failed"));
  16. # let future = futures::future::ok::<(), ()>(());
  17. tokio::run(future);
  18. }

Further computations may be sequenced by chaining calls to and_then. Forexample:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate bytes;
  4. # extern crate futures;
  5. #
  6. # use tokio::io;
  7. # use tokio::net::TcpStream;
  8. # use futures::Future;
  9. fn main() {
  10. let addr = "127.0.0.1:1234".parse().unwrap();
  11. let future = TcpStream::connect(&addr)
  12. .and_then(|socket| {
  13. io::write_all(socket, b"hello world")
  14. })
  15. .and_then(|(socket, _)| {
  16. // read exactly 11 bytes
  17. io::read_exact(socket, vec![0; 11])
  18. })
  19. .and_then(|(socket, buf)| {
  20. println!("got {:?}", buf);
  21. Ok(())
  22. });
  23. # let future = futures::future::ok::<(), ()>(());
  24. tokio::run(future);
  25. }

The future returned by and_then executes identically to the future weimplemented by hand on the previous page.

Essential combinators

It is worth spending some time with the Future trait andmodule documentation to gain familiarity with the full set ofavailable combinators. This guide will provide a very quick overview.

Concrete futures

Any value can be turned into an immediately complete future. There are a fewfunctions in the future module for creating such a future:

  • ok, analogous to Result::Ok, converts the provided value into aimmediately ready future that yields back the value.
  • err, analogous to Result::Err, converts the provided error into animmediately ready future that fails with the error. as an immediately failedfuture.
  • result lifts a result to an immediately complete future.
    In addition, there is also a function, lazy, which allows constructing afuture given a closure. The closure is not immediately invoked, instead it isinvoked the first time the future is polled.

IntoFuture

A crucial API to know about is the IntoFuture trait, which is a trait forvalues that can be converted into futures. Most APIs that you think of as takingfutures actually work with this trait instead. The key reason: the trait isimplemented for Result, allowing you to return Result values in many placesthat futures are expected.

Most combinator closures that return a future actually return an instance ofIntoFuture.

Adapters

Like Iterator, the Future trait includes a broad range of “adapter”methods. These methods all consume the future, returning a new future providingthe requested behavior. Using these adapter combinators, it is possible to:

When to use combinators

Using combinators can reduce a lot of boilerplate, but they are not always agood fit. Due to limitations, implementing Future manually is going to be common.

Functional style

Closures passed to combinators must be 'static. This means it is not possibleto pass references into the closure. Ownership of all state must be moved intothe closure. The reason for this is that lifetimes are based on the stack. Withasynchronous code, the ability to rely on the stack is lost.

Because of this, code written using combinators end up being very functional instyle. Let’s compare Future combinators with synchronous Result combinators.

  1. use std::io;
  2. # struct Data;
  3. fn get_data() -> Result<Data, io::Error> {
  4. # unimplemented!();
  5. // ...
  6. }
  7. fn get_ok_data() -> Result<Vec<Data>, io::Error> {
  8. let mut dst = vec![];
  9. for _ in 0..10 {
  10. get_data().and_then(|data| {
  11. dst.push(data);
  12. Ok(())
  13. });
  14. }
  15. Ok(dst)
  16. }

This works because the closure passed to and_then is able to obtain a mutableborrow to dst. The Rust compiler is able to guarantee that dst will outlivethe closure.

However, when using futures, it is no longer possible to borrow dst. Instead,dst must be passed around. Something like:

  1. extern crate futures;
  2. use futures::{stream, Future, Stream};
  3. use std::io;
  4. # struct Data;
  5. fn get_data() -> impl Future<Item = Data, Error = io::Error> {
  6. # futures::future::ok(Data)
  7. // ...
  8. }
  9. fn get_ok_data() -> impl Future<Item = Vec<Data>, Error = io::Error> {
  10. let mut dst = vec![];
  11. // Start with an unbounded stream that uses unit values.
  12. stream::repeat(())
  13. // Only take 10. This is how the for loop is simulated using a functional
  14. // style.
  15. .take(10)
  16. // The `fold` combinator is used here because, in order to be
  17. // functional, the state must be moved into the combinator. In this
  18. // case, the state is the `dst` vector.
  19. .fold(dst, move |mut dst, _| {
  20. // Once again, the `dst` vector must be moved into the nested
  21. // closure.
  22. get_data().and_then(move |item| {
  23. dst.push(item);
  24. // The state must be included as part of the return value, so
  25. // `dst` is returned.
  26. Ok(dst)
  27. })
  28. })
  29. }
  30. # fn main() {}

Another strategy, which tends to work best with immutable data, is to store thedata in an Arc and clone handles into the closures. One case in which thisworks well is sharing configuration values in multiple closures. For example:

  1. extern crate futures;
  2. use futures::{future, Future};
  3. use std::io;
  4. use std::sync::Arc;
  5. fn get_message() -> impl Future<Item = String, Error = io::Error> {
  6. // ....
  7. # futures::future::ok("".to_string())
  8. }
  9. fn print_multi() -> impl Future<Item = (), Error = io::Error> {
  10. let name = Arc::new("carl".to_string());
  11. let futures: Vec<_> = (0..10).map(|_| {
  12. // Clone the `name` handle, this allows multiple concurrent futures
  13. // to access the name to print.
  14. let name = name.clone();
  15. get_message()
  16. .and_then(move |message| {
  17. println!("Hello {}, {}", name, message);
  18. Ok(())
  19. })
  20. })
  21. .collect();
  22. future::join_all(futures)
  23. .map(|_| ())
  24. }

Returning futures

Because combinators often use closures as part of their type signature, it isnot possible to name the future type. This, in turn, means that the future typecannot be used as part of a function’s signature. When passing a future as afunction argument, generics can be used in almost all cases. For example:

  1. extern crate futures;
  2. use futures::Future;
  3. fn get_message() -> impl Future<Item = String> {
  4. // ...
  5. # futures::future::ok::<_, ()>("".to_string())
  6. }
  7. fn with_future<T: Future<Item = String>>(f: T) {
  8. // ...
  9. # drop(f);
  10. }
  11. let my_future = get_message().map(|message| {
  12. format!("MESSAGE = {}", message)
  13. });
  14. with_future(my_future);

However, for returning futures, it isn’t as simple. There are a few options withpros and cons:

Use impl Future

As of Rust version 1.26, the language feature impl Trait can be used forreturning combinator futures. This allows writing the following:

  1. # extern crate futures;
  2. # use futures::Future;
  3. fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
  4. where F: Future<Item = i32>,
  5. {
  6. f.map(|i| i + 10)
  7. }

The add_10 function has a return type that is “something that implementsFuture” with the specified associated types. This allows returning a futurewithout explicitly naming the future type.

The pros to this approach are that it is zero overhead and covers a wide varietyof cases. However, there is a problem when returning futures from differentcode branches. For example:

  1. if some_condition {
  2. return get_message()
  3. .map(|message| format!("MESSAGE = {}", message));
  4. } else {
  5. return futures::ok("My MESSAGE".to_string());
  6. }

Returning from multiple branches

This results in rustc outputting a compilation error of error[E0308]: if and
else have incompatible types
. Functions returning impl Future must still havea single return type. The impl Future syntax just means that the return typedoes not have to be named. However, each combinator type has a differenttype, so the types being returned in each conditional branch are different.

Given the above scenario, there are two options. The first is to change thefunction to return a trait object. The second is to use theEither type:

  1. # extern crate futures;
  2. # use futures::Future;
  3. # use futures::future::{self, Either};
  4. # fn get_message() -> impl Future<Item = String> {
  5. # future::ok::<_, ()>("".to_string())
  6. # }
  7. # fn my_op() -> impl Future<Item = String> {
  8. # let some_condition = true;
  9. if some_condition {
  10. return Either::A(get_message()
  11. .map(|message| format!("MESSAGE = {}", message)));
  12. } else {
  13. return Either::B(
  14. future::ok("My MESSAGE".to_string()));
  15. }
  16. # }
  17. # fn main() {}

This ensures that the function has a single return type: Either.

In situations where there are more than two branches, Either enums must benested (Either<Either<A, B>, C>) or a custom, multi variant, enum is defined.

This scenario comes up often when trying to conditional return errors.Consider:

  1. # extern crate futures;
  2. # use futures::{future::{self, Either}, Future};
  3. # fn is_valid(_: &str) -> bool { true }
  4. # fn get_message() -> impl Future<Item = String, Error = &'static str> { future::ok("".to_string()) }
  5. fn my_operation(arg: String) -> impl Future<Item = String> {
  6. if is_valid(&arg) {
  7. return Either::A(get_message().map(|message| {
  8. format!("MESSAGE = {}", message)
  9. }));
  10. }
  11. Either::B(future::err("something went wrong"))
  12. }
  13. # fn main() {}

In order to return early when an error has been encountered, an Either variantmust be used to contain the error future.

Associated types

Traits with functions that return futures must include an associated type forthat future. For example, consider a simplified version of the Tower Servicetrait:

  1. pub trait Service {
  2. /// Requests handled by the service.
  3. type Request;
  4. /// Responses given by the service.
  5. type Response;
  6. /// Errors produced by the service.
  7. type Error;
  8. /// The future response value.
  9. type Future: Future<Item = Self::Response, Error = Self::Error>;
  10. fn call(&mut self, req: Self::Request) -> Self::Future;
  11. }

In order to implement this trait, the future returned by call must benameable and set to the Future associated type. In this case, impl Futuredoes not work and the future must either be boxed as a traitobject or a custom future must be defined.

Trait objects

Another strategy is to return a boxed future as a trait object:

  1. # extern crate futures;
  2. # use std::io;
  3. # use futures::Future;
  4. # fn main() {}
  5. fn foo() -> Box<Future<Item = u32, Error = io::Error> + Send> {
  6. // ...
  7. # loop {}
  8. }

The pro of this strategy is that it is easy to write Box. It also is able tohandle the “branching” described above with arbitrary number of branches:

  1. # extern crate futures;
  2. # use futures::{future::{self, Either}, Future};
  3. # fn is_valid(_: &str) -> bool { true }
  4. # fn get_message() -> impl Future<Item = String, Error = &'static str> { future::ok("".to_string()) }
  5. fn my_operation(arg: String) -> Box<Future<Item = String, Error = &'static str> + Send> {
  6. if is_valid(&arg) {
  7. if arg == "foo" {
  8. return Box::new(get_message().map(|message| {
  9. format!("FOO = {}", message)
  10. }));
  11. } else {
  12. return Box::new(get_message().map(|message| {
  13. format!("MESSAGE = {}", message)
  14. }));
  15. }
  16. }
  17. Box::new(future::err("something went wrong"))
  18. }
  19. # fn main() {}

The downside is that the boxing approach requires more overhead. An allocationis required to store the returned future value. In addition, whenever the futureis used Rust needs to dynamically unbox it via a runtime lookup (vtable).This can make boxed futures slightly slower in practice, though the differenceis often not noticeable.

There is one caveat that can trip up authors trying to use a Box<Future<…>>,particularly with tokio::run. By default, Box<Future<…>> is not Sendand cannot be sent across threads, even if the future contained in the box isSend.

To make a boxed future Send, it must be annotated as such:

  1. fn my_operation() -> Box<Future<Item = String, Error = &'static str> + Send> {
  2. // ...
  3. }

Implement Future by hand

Finally, when the above strategies fail, it is always possible to fall back onimplementing Future by hand. Doing so provides full control, but comes at acost of additional boilerplate given that no combinator functions can be usedwith this approach.

When to use combinators

Combinators are powerful ways to reduce boilerplate in your Tokio basedapplication, but as discussed in this section, they are not a silver bullet. Itis common to implement custom futures as well as custom combinators. This raisesthe question of when combinators should be used versus implementing Future byhand.

As per the discussion above, if the future type must be nameable and a Box isnot acceptable overhead, then combinators may not be used. Besides this, itdepends on the complexity of the state that must be passed around betweencombinators.

Scenarios when the state must be accessed concurrently from multiple combinatorsmay be a good case for implementing a Future by hand.

TODO: This section needs to be expanded with examples. If you have ideas toimprove this section, visit the doc-push repo and open an issue with yourthoughts.

Next up: Streams