Adapters

Functions that take a Stream and return another Stream are often called ‘stream adapters’, as they’re a form of the ‘adapter pattern’. Common stream adapters include map, take, and filter.

Lets update the Mini-Redis so that it will exit. After receiving three messages, stop iterating messages. This is done using take. This adapter limits the stream to yield at most n messages.

  1. let messages = subscriber
  2. .into_stream()
  3. .take(3);

Running the program again, we get:

  1. got = Ok(Message { channel: "numbers", content: b"1" })
  2. got = Ok(Message { channel: "numbers", content: b"two" })
  3. got = Ok(Message { channel: "numbers", content: b"3" })

This time the program ends.

Now, let’s limit the stream to single digit numbers. We will check this by checking for the message length. We use the filter adapter to drop any message that does not match the predicate.

  1. let messages = subscriber
  2. .into_stream()
  3. .filter(|msg| match msg {
  4. Ok(msg) if msg.content.len() == 1 => true,
  5. _ => false,
  6. })
  7. .take(3);

Running the program again, we get:

  1. got = Ok(Message { channel: "numbers", content: b"1" })
  2. got = Ok(Message { channel: "numbers", content: b"3" })
  3. got = Ok(Message { channel: "numbers", content: b"6" })

Note that the order in which adapters are applied matters. Calling filter first then take is different than calling take then filter.

Finally, we will tidy up the output by stripping the Ok(Message { ... }) part of the output. This is done with map. Because this is applied after filter, we know the message is Ok, so we can use unwrap().

  1. let messages = subscriber
  2. .into_stream()
  3. .filter(|msg| match msg {
  4. Ok(msg) if msg.content.len() == 1 => true,
  5. _ => false,
  6. })
  7. .map(|msg| msg.unwrap().content)
  8. .take(3);

Now, the output is:

  1. got = b"1"
  2. got = b"3"
  3. got = b"6"

Another option would be to combine the filter and map steps into a single call using filter_map.

There are more available adapters. See the list here.