Iteration

Currently, the Rust programming language does not support async for loops. Instead, iterating streams is done using a while let loop paired with StreamExt::next().

  1. use tokio_stream::StreamExt;
  2. #[tokio::main]
  3. async fn main() {
  4. let mut stream = tokio_stream::iter(&[1, 2, 3]);
  5. while let Some(v) = stream.next().await {
  6. println!("GOT = {:?}", v);
  7. }
  8. }

Like iterators, the next() method returns Option<T> where T is the stream’s value type. Receiving None indicates that stream iteration is terminated.

Mini-Redis broadcast

Let’s go over a slightly more complicated example using the Mini-Redis client.

Full code can be found here.

  1. use tokio_stream::StreamExt;
  2. use mini_redis::client;
  3. async fn publish() -> mini_redis::Result<()> {
  4. let mut client = client::connect("127.0.0.1:6379").await?;
  5. // Publish some data
  6. client.publish("numbers", "1".into()).await?;
  7. client.publish("numbers", "two".into()).await?;
  8. client.publish("numbers", "3".into()).await?;
  9. client.publish("numbers", "four".into()).await?;
  10. client.publish("numbers", "five".into()).await?;
  11. client.publish("numbers", "6".into()).await?;
  12. Ok(())
  13. }
  14. async fn subscribe() -> mini_redis::Result<()> {
  15. let client = client::connect("127.0.0.1:6379").await?;
  16. let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
  17. let messages = subscriber.into_stream();
  18. tokio::pin!(messages);
  19. while let Some(msg) = messages.next().await {
  20. println!("got = {:?}", msg);
  21. }
  22. Ok(())
  23. }
  24. #[tokio::main]
  25. async fn main() -> mini_redis::Result<()> {
  26. tokio::spawn(async {
  27. publish().await
  28. });
  29. subscribe().await?;
  30. println!("DONE");
  31. Ok(())
  32. }

A task is spawned to publish messages to the Mini-Redis server on the “numbers” channel. Then, on the main task, we subscribe to the “numbers” channel and display received messages.

After subscribing, into_stream() is called on the returned subscriber. This consumes the Subscriber, returning a stream that yields messages as they arrive. Before we start iterating the messages, note that the stream is pinned to the stack using tokio::pin!. Calling next() on a stream requires the stream to be pinned. The into_stream() function returns a stream that is not pin, we must explicitly pin it in order to iterate it.

A Rust value is “pinned” when it can no longer be moved in memory. A key property of a pinned value is that pointers can be taken to the pinned data and the caller can be confident the pointer stays valid. This feature is used by async/await to support borrowing data across .await points.

If we forget to pin the stream, we get an error like this:

  1. error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  2. --> streams/src/main.rs:29:36
  3. |
  4. 29 | while let Some(msg) = messages.next().await {
  5. | ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
  6. |
  7. = note: required because it appears within the type `impl Future`
  8. = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
  9. = note: required because it appears within the type `impl Stream`
  10. = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
  11. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
  12. = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
  13. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
  14. = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
  15. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

If you hit an error message like this, try pinning the value!

Before trying to run this, start the Mini-Redis server:

  1. $ mini-redis-server

Then try running the code. We will see the messages outputted to STDOUT.

  1. got = Ok(Message { channel: "numbers", content: b"1" })
  2. got = Ok(Message { channel: "numbers", content: b"two" })
  3. got = Ok(Message { channel: "numbers", content: b"3" })
  4. got = Ok(Message { channel: "numbers", content: b"four" })
  5. got = Ok(Message { channel: "numbers", content: b"five" })
  6. got = Ok(Message { channel: "numbers", content: b"6" })

Some early messages may be dropped as there is a race between subscribing and publishing. The program never exits. A subscription to a Mini-Redis channel stays active as long as the server is active.

Let’s see how we can work with streams to expand on this program.