Streams

Streams are similar to futures, but instead of yielding a single value, theyasynchronously yield one or more values. They can be thought of as asynchronousiterators.

Just like futures, streams are able to represent a wide range of things as longas those things produce discrete values at different points sometime in thefuture. For instance:

  • UI Events caused by the user interacting with a GUI in different ways. When anevent happens the stream yields a different message to your app over time.
  • Push Notifications from a server. Sometimes a request/response model is notwhat you need. A client can establish a notification stream with a server to beable to receive messages from the server without specifically being requested.
  • Incoming socket connections. As different clients connect to a server, theconnections stream will yield socket connections.

The Stream trait

Just like Future, implementing Stream is common when using Tokio. TheStream trait is as follows:

  1. trait Stream {
  2. /// The type of the value yielded by the stream.
  3. type Item;
  4. /// The type representing errors that occurred while processing the computation.
  5. type Error;
  6. /// The function that will be repeatedly called to see if the stream has
  7. /// another value it can yield
  8. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
  9. }

The Item associated type is the type yielded by the stream. The Errorassociated type is the type of the error yielded when something unexpectedhappens. The poll function is very similar to Future’s poll function. Theonly difference is that, this time, Option<Self::Item> is returned.

Stream implementations have the poll function called many times. When the nextvalue is ready, Ok(Async::Ready(Some(value))) is returned. When the stream isnot ready to yield a value, Ok(Async::NotReady) is returned. When thestream is exhausted and will yield no further values, Ok(Async::Ready(None))is returned. Just like with futures, streams must not returnAsync::NotReady unless Async::NotReady was obtained by an inner stream orfuture.

When the stream encounters an error, Err(error) is returned. Returning anerror does not signify that the stream is exhausted. The error may betransient and the caller may try calling poll again in the future and valuesmay be produced again. If the error is fatal, then the next call to pollshould return Ok(Async::Ready(None)).

Fibonacci

The following example shows how to implement the fibonacci sequence as a stream.

  1. extern crate futures;
  2. use futures::{Stream, Poll, Async};
  3. pub struct Fibonacci {
  4. curr: u64,
  5. next: u64,
  6. }
  7. impl Fibonacci {
  8. fn new() -> Fibonacci {
  9. Fibonacci {
  10. curr: 1,
  11. next: 1,
  12. }
  13. }
  14. }
  15. impl Stream for Fibonacci {
  16. type Item = u64;
  17. // The stream will never yield an error
  18. type Error = ();
  19. fn poll(&mut self) -> Poll<Option<u64>, ()> {
  20. let curr = self.curr;
  21. let next = curr + self.next;
  22. self.curr = self.next;
  23. self.next = next;
  24. Ok(Async::Ready(Some(curr)))
  25. }
  26. }

To use the stream, a future must be built that consumes it. The following futurewill take a stream and display 10 items from it.

  1. #[macro_use]
  2. extern crate futures;
  3. use futures::{Future, Stream, Poll, Async};
  4. use std::fmt;
  5. pub struct Display10<T> {
  6. stream: T,
  7. curr: usize,
  8. }
  9. impl<T> Display10<T> {
  10. fn new(stream: T) -> Display10<T> {
  11. Display10 {
  12. stream,
  13. curr: 0,
  14. }
  15. }
  16. }
  17. impl<T> Future for Display10<T>
  18. where
  19. T: Stream,
  20. T::Item: fmt::Display,
  21. {
  22. type Item = ();
  23. type Error = T::Error;
  24. fn poll(&mut self) -> Poll<(), Self::Error> {
  25. while self.curr < 10 {
  26. let value = match try_ready!(self.stream.poll()) {
  27. Some(value) => value,
  28. // There were less than 10 values to display, terminate the
  29. // future.
  30. None => break,
  31. };
  32. println!("value #{} = {}", self.curr, value);
  33. self.curr += 1;
  34. }
  35. Ok(Async::Ready(()))
  36. }
  37. }
  38. # fn main() {}

Now, the fibonacci sequence can be displayed:

  1. extern crate tokio;
  2. # extern crate futures;
  3. # struct Fibonacci;
  4. # impl Fibonacci { fn new() { } }
  5. # struct Display10<T> { v: T };
  6. # impl<T> Display10<T> {
  7. # fn new(_: T) -> futures::future::FutureResult<(), ()> {
  8. # futures::future::ok(())
  9. # }
  10. # }
  11. let fib = Fibonacci::new();
  12. let display = Display10::new(fib);
  13. tokio::run(display);

Getting asynchronous

So far, the fibonacci stream is synchronous. Lets make it asynchronous bywaiting a second between values. To do this,tokio::timer::Interval is used. Interval is, itself, a streamthat yields () values at the requested time interval. Calling Interval::pollbetween intervals results in Async::NotReady being returned.

The Fibonacci stream is updated as such:

  1. #[macro_use]
  2. extern crate futures;
  3. extern crate tokio;
  4. use tokio::timer::Interval;
  5. use futures::{Stream, Poll, Async};
  6. use std::time::Duration;
  7. pub struct Fibonacci {
  8. interval: Interval,
  9. curr: u64,
  10. next: u64,
  11. }
  12. impl Fibonacci {
  13. fn new(duration: Duration) -> Fibonacci {
  14. Fibonacci {
  15. interval: Interval::new_interval(duration),
  16. curr: 1,
  17. next: 1,
  18. }
  19. }
  20. }
  21. impl Stream for Fibonacci {
  22. type Item = u64;
  23. // The stream will never yield an error
  24. type Error = ();
  25. fn poll(&mut self) -> Poll<Option<u64>, ()> {
  26. // Wait until the next interval
  27. try_ready!(
  28. self.interval.poll()
  29. // The interval can fail if the Tokio runtime is unavailable.
  30. // In this example, the error is ignored.
  31. .map_err(|_| ())
  32. );
  33. let curr = self.curr;
  34. let next = curr + self.next;
  35. self.curr = self.next;
  36. self.next = next;
  37. Ok(Async::Ready(Some(curr)))
  38. }
  39. }
  40. # fn main() {}

The Display10 future already supports asynchronicity so it does not need to beupdated.

To run the throttled fibonacci sequence, include an interval:

  1. extern crate tokio;
  2. # extern crate futures;
  3. # struct Fibonacci;
  4. # impl Fibonacci { fn new(dur: Duration) { } }
  5. # struct Display10<T> { v: T };
  6. # impl<T> Display10<T> {
  7. # fn new(_: T) -> futures::future::FutureResult<(), ()> {
  8. # futures::future::ok(())
  9. # }
  10. # }
  11. use std::time::Duration;
  12. let fib = Fibonacci::new(Duration::from_secs(1));
  13. let display = Display10::new(fib);
  14. tokio::run(display);

Combinators

Just like futures, streams come with a number of combinators for reducingboilerplate. Many of these combinators exist as functions on theStream trait.

Updating fibonacci stream can be rewritten using the unfold function:

  1. extern crate futures;
  2. use futures::{stream, Stream};
  3. fn fibonacci() -> impl Stream<Item = u64, Error = ()> {
  4. stream::unfold((1, 1), |(curr, next)| {
  5. let new_next = curr + next;
  6. Some(Ok((curr, (next, new_next))))
  7. })
  8. }

Just like with futures, using stream combinators requires a functional style ofprogramming. Also, impl Stream is used to return the stream from the function.The returning futures strategies apply equally to returning streams.

Display10 is reimplemented using take and for_each:

  1. extern crate tokio;
  2. extern crate futures;
  3. use futures::Stream;
  4. # use futures::stream;
  5. # fn fibonacci() -> impl Stream<Item = u64, Error = ()> {
  6. # stream::once(Ok(1))
  7. # }
  8. tokio::run(
  9. fibonacci().take(10)
  10. .for_each(|num| {
  11. println!("{}", num);
  12. Ok(())
  13. })
  14. );

The take combinator limits the fibonacci stream to 10 values. The for_eachcombinator asynchronously iterates the stream values. for_each consumes thestream and returns a future that completes once the closure was called once foreach stream value. It is the asynchronous equivalent to a rust for loop.

Essential combinators

It is worth spending some time with the Stream trait andmodule documentation to gain some familiarity with the full set ofavailable combinators. This guide will provide a very quick overview.

Concrete streams

The stream module contains functions for converting values anditerators into streams.

  • once converts the provided value into an immediately ready stream thatyields a single item: the provided value.
  • iter_ok and iter_result both take IntoIterator values and convertsthem to an immediately ready stream that yields the iterator values.
  • empty returns a stream that immediately yields None.
    For example:
  1. extern crate tokio;
  2. extern crate futures;
  3. use futures::{stream, Stream};
  4. let values = vec!["one", "two", "three"];
  5. tokio::run(
  6. stream::iter_ok(values).for_each(|value| {
  7. println!("{}", value);
  8. Ok(())
  9. })
  10. )

Adapters

Like Iterator, the Stream trait includes a broad range of “adapter”methods. These methods all consume the stream, returning a new stream providingthe requested behavior. Using these adapter combinators, it is possible to: