Working with framed streams

Tokio has helpers to transform a stream of bytes into a stream of frames. Examplesof byte streams include TCP connections, pipes, file objects and the standardinput and output file descriptors. In Rust, streams are easily identifiedbecause they implement the Read and Write traits.

One of the simplest forms of framed message is the line delimited message.Each message ends with a \n character. Let’s look at how one would implementa stream of line delimited messages with tokio.

Writing a codec

The codec implements the tokio_codec::Decoder andtokio_codec::Encoder traits. Its job is to convert a frame to and frombytes. Those traits are used in conjunction with the tokio_codec::Framedstruct to provide buffering, decoding and encoding of byte streams.

Let’s look at a simplified version of the LinesCodec struct, which implementsdecoding and encoding of the line delimited message.

  1. pub struct LinesCodec {
  2. // Stored index of the next index to examine for a `\n` character.
  3. // This is used to optimize searching.
  4. // For example, if `decode` was called with `abc`, it would hold `3`,
  5. // because that is the next index to examine.
  6. // The next time `decode` is called with `abcde\n`, the method will
  7. // only look at `de\n` before returning.
  8. next_index: usize,
  9. }

The comments here explain how, since the bytes are buffered until a line isfound, it is wasteful to search for a \n from the beginning of the buffereverytime data is received. It’s more efficient to keep the last length ofthe buffer and start searching from there when new data is received.

The Decoder::decode method is called when data is received on the underlyingstream. The method can produce a frame or return Ok(None) to signify thatit needs more data to produce a frame. The decode method is responsiblefor removing the data that no longer needs to be buffered by splitting it offusing the BytesMut methods. If the data is not removed, the buffer willkeep growing.

Let’s look at how Decoder::decode is implemented for LinesCodec.

  1. # extern crate bytes;
  2. # extern crate tokio_io;
  3. # use std::io;
  4. # use std::str;
  5. # use bytes::BytesMut;
  6. # use tokio_io::codec::*;
  7. # struct LinesCodec { next_index: usize };
  8. # impl Decoder for LinesCodec {
  9. # type Item = String;
  10. # type Error = io::Error;
  11. fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
  12. // Look for a byte with the value '\n' in buf. Start searching from the search start index.
  13. if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n')
  14. {
  15. // Found a '\n' in the string.
  16. // The index of the '\n' is at the sum of the start position + the offset found.
  17. let newline_index = newline_offset + self.next_index;
  18. // Split the buffer at the index of the '\n' + 1 to include the '\n'.
  19. // `split_to` returns a new buffer with the contents up to the index.
  20. // The buffer on which `split_to` is called will now start at this index.
  21. let line = buf.split_to(newline_index + 1);
  22. // Trim the `\n` from the buffer because it's part of the protocol,
  23. // not the data.
  24. let line = &line[..line.len() - 1];
  25. // Convert the bytes to a string and panic if the bytes are not valid utf-8.
  26. let line = str::from_utf8(&line).expect("invalid utf8 data");
  27. // Set the search start index back to 0.
  28. self.next_index = 0;
  29. // Return Ok(Some(...)) to signal that a full frame has been produced.
  30. Ok(Some(line.to_string()))
  31. } else {
  32. // '\n' not found in the string.
  33. // Tell the next call to start searching after the current length of the buffer
  34. // since all of it was scanned and no '\n' was found.
  35. self.next_index = buf.len();
  36. // Ok(None) signifies that more data is needed to produce a full frame.
  37. Ok(None)
  38. }
  39. }
  40. # }

The Encoder::encode method is called when a frame must be written to theunderlying stream. The frame must be written to the buffer received as aparameter. The data written to the buffer will be written to thestream as it becomes ready to send the data.

Let’s now look at how Encoder::encode is implemented for LinesCodec.

  1. # extern crate bytes;
  2. # extern crate tokio_io;
  3. # use std::io;
  4. # use std::str;
  5. # use bytes::*;
  6. # use tokio_io::codec::*;
  7. # struct LinesCodec { next_index: usize };
  8. # impl Encoder for LinesCodec {
  9. # type Item = String;
  10. # type Error = io::Error;
  11. fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
  12. // It's important to reserve the amount of space needed. The `bytes` API
  13. // does not grow the buffers implicitly.
  14. // Reserve the length of the string + 1 for the '\n'.
  15. buf.reserve(line.len() + 1);
  16. // String implements IntoBuf, a trait used by the `bytes` API to work with
  17. // types that can be expressed as a sequence of bytes.
  18. buf.put(line);
  19. // Put the '\n' in the buffer.
  20. buf.put_u8(b'\n');
  21. // Return ok to signal that no error occured.
  22. Ok(())
  23. }
  24. # }

It’s often simpler to encode information. Here we simply reserve the spaceneeded and write the data to the buffer.

Using a codec

The simplest way of using a codec is with the Framed struct. It’s a wrapperaround a codec that implements automatic buffering. The Framed struct is botha Stream and a Sink. Thus, you can receive frames from it and send framesto it.

You can create a Framed struct using any type that implements the AsyncReadand AsyncWrite traits using the AsyncRead::framed method.

  1. # extern crate futures;
  2. # extern crate tokio;
  3. # extern crate tokio_codec;
  4. # use futures::prelude::*;
  5. # use tokio::net::TcpStream;
  6. # use tokio_codec::{Framed, LinesCodec};
  7. # let addr = "127.0.0.1:5000".parse().expect("invalid socket address");
  8. TcpStream::connect(&addr).and_then(|sock| {
  9. let framed_sock = Framed::new(sock, LinesCodec::new());
  10. framed_sock.for_each(|line| {
  11. println!("Received line {}", line);
  12. Ok(())
  13. })
  14. });

Next up: Building a runtime