Getting asynchronous

Futures are all about managing asynchronicity. Implementing a future thatcompletes asynchonously requires correctly handling receiving Async::NotReadyfrom the inner future.

Let’s start by implementing a future that establishes a TCP socket with a remotepeer and extracts the peer socket address, writing it to STDOUT.

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. #[macro_use]
  4. extern crate futures;
  5. use tokio::net::{TcpStream, tcp::ConnectFuture};
  6. use futures::{Future, Async, Poll};
  7. struct GetPeerAddr {
  8. connect: ConnectFuture,
  9. }
  10. impl Future for GetPeerAddr {
  11. type Item = ();
  12. type Error = ();
  13. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  14. match self.connect.poll() {
  15. Ok(Async::Ready(socket)) => {
  16. println!("peer address = {}", socket.peer_addr().unwrap());
  17. Ok(Async::Ready(()))
  18. }
  19. Ok(Async::NotReady) => Ok(Async::NotReady),
  20. Err(e) => {
  21. println!("failed to connect: {}", e);
  22. Ok(Async::Ready(()))
  23. }
  24. }
  25. }
  26. }
  27. fn main() {
  28. let addr = "192.168.0.1:1234".parse().unwrap();
  29. let connect_future = TcpStream::connect(&addr);
  30. let get_peer_addr = GetPeerAddr {
  31. connect: connect_future,
  32. };
  33. # if false {
  34. tokio::run(get_peer_addr);
  35. # }
  36. }

The implementation of GetPeerAddr is very similar to the Display future fromthe previous page. The primary difference is, in this case,self.connect.poll() will (probably) return Async::NotReady a number of timesbefore returning the connected socket. When this happens, our future returnsNotReady.

GetPeerAddr contains ConnectFuture, a future that completes once a TCPstream has been established. This future is returned by TcpStream::connect.

When GetPeerAddr is passed to tokio::run, Tokio will repeatedly call polluntil Ready is returned. The exact mechanism by which this happens isdescribed in later chapters.

When implementing Future, Async::NotReady must not be returned unlessAsync::NotReady was obtained when calling poll on an inner future. One wayto think about it is, when a future is polled, it must do as much work as it canuntil it either completes or becomes blocked on an inner future.

Chaining computations

Now, let’s take the connect future and update it to write “hello world” once theTCP socket has been established.

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. extern crate bytes;
  4. #[macro_use]
  5. extern crate futures;
  6. use tokio::io::AsyncWrite;
  7. use tokio::net::{TcpStream, tcp::ConnectFuture};
  8. use bytes::{Bytes, Buf};
  9. use futures::{Future, Async, Poll};
  10. use std::io::{self, Cursor};
  11. // HelloWorld has two states, namely waiting to connect to the socket
  12. // and already connected to the socket
  13. enum HelloWorld {
  14. Connecting(ConnectFuture),
  15. Connected(TcpStream, Cursor<Bytes>),
  16. }
  17. impl Future for HelloWorld {
  18. type Item = ();
  19. type Error = io::Error;
  20. fn poll(&mut self) -> Poll<(), io::Error> {
  21. use self::HelloWorld::*;
  22. loop {
  23. match self {
  24. Connecting(ref mut f) => {
  25. let socket = try_ready!(f.poll());
  26. let data = Cursor::new(Bytes::from_static(b"hello world"));
  27. *self = Connected(socket, data);
  28. }
  29. Connected(ref mut socket, ref mut data) => {
  30. // Keep trying to write the buffer to the socket as long as the
  31. // buffer has more bytes available for consumption
  32. while data.has_remaining() {
  33. try_ready!(socket.write_buf(data));
  34. }
  35. return Ok(Async::Ready(()));
  36. }
  37. }
  38. }
  39. }
  40. }
  41. fn main() {
  42. let addr = "127.0.0.1:1234".parse().unwrap();
  43. let connect_future = TcpStream::connect(&addr);
  44. let hello_world = HelloWorld::Connecting(connect_future);
  45. # let hello_world = futures::future::ok::<(), io::Error>(());
  46. // Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
  47. tokio::run(hello_world.map_err(|e| println!("{0}", e)))
  48. }

It is very common to implement futures as an enum of the possiblestates. This allows the future implementation to track its stateinternally by transitioning between the enum’s variants.

This future is represented as an enumeration of states:

  • Connecting
  • Writing “hello world” to the socket.
    The future starts in the connecting state with an inner future of typeConnectFuture. It repeatedly polls this future until the socket is returned.The state is then transitioned to Connected.

From the Connected state, the future writes data to the socket. This is donewith the write_buf function. I/O functions are covered in more detail in thenext section. Briefly, write_buf is a non-blocking function towrite data to the socket. If the socket is not ready to accept the write,NotReady is returned. If some data (but not necessarily all) was written,Ready(n) is returned, where n is the number of written bytes. The cursor isalso advanced.

Once in the Connected state, the future must loop as long as there is dataleft to write. Because write_buf is wrapped with try_ready!(), whenwrite_buf returns NotReady, our poll function returns with NotReady.

At some point in the future, our poll function is called again. Because it isin the Connected state, it jumps directly to writing data.

Note the loops are important. Many future implementations contain loops.These loops are necessary because poll cannot return until either all the datais written to the socket, or an inner future (ConnectFuture orwrite_buf) returns NotReady.

Next up: Combinators