Buffered reads

The read_frame method waits for an entire frame to be received before returning. A single call to TcpStream::read() may return an arbitrary amount of data. It could contain an entire frame, a partial frame, or multiple frames. If a partial frame is received, the data is buffered and more data is read from the socket. If multiple frames are received, the first frame is returned and the rest of the data is buffered until the next call to read_frame.

To implement this, Connection needs a read buffer field. Data is read from the socket into the read buffer. When a frame is parsed, the corresponding data is removed from the buffer.

We will use BytesMut as the buffer type. This is a mutable version of Bytes.

  1. use bytes::BytesMut;
  2. use tokio::net::TcpStream;
  3. pub struct Connection {
  4. stream: TcpStream,
  5. buffer: BytesMut,
  6. }
  7. impl Connection {
  8. pub fn new(stream: TcpStream) -> Connection {
  9. Connection {
  10. stream,
  11. // Allocate the buffer with 4kb of capacity.
  12. buffer: BytesMut::with_capacity(4096),
  13. }
  14. }
  15. }

Next, we implement the read_frame() method.

  1. use tokio::io::AsyncReadExt;
  2. use bytes::Buf;
  3. use mini_redis::Result;
  4. pub async fn read_frame(&mut self)
  5. -> Result<Option<Frame>>
  6. {
  7. loop {
  8. // Attempt to parse a frame from the buffered data. If
  9. // enough data has been buffered, the frame is
  10. // returned.
  11. if let Some(frame) = self.parse_frame()? {
  12. return Ok(Some(frame));
  13. }
  14. // There is not enough buffered data to read a frame.
  15. // Attempt to read more data from the socket.
  16. //
  17. // On success, the number of bytes is returned. `0`
  18. // indicates "end of stream".
  19. if 0 == self.stream.read_buf(&mut self.buffer).await? {
  20. // The remote closed the connection. For this to be
  21. // a clean shutdown, there should be no data in the
  22. // read buffer. If there is, this means that the
  23. // peer closed the socket while sending a frame.
  24. if self.buffer.is_empty() {
  25. return Ok(None);
  26. } else {
  27. return Err("connection reset by peer".into());
  28. }
  29. }
  30. }
  31. }

Let’s break this down. The read_frame method operates in a loop. First, self.parse_frame() is called. This will attempt to parse a redis frame from self.buffer. If there is enough data to parse a frame, the frame is returned to the caller of read_frame().Otherwise, we attempt to read more data from the socket into the buffer. After reading more data, parse_frame() is called again. This time, if enough data has been received, parsing may succeed.

When reading from the stream, a return value of 0 indicates that no more data will be received from the peer. If the read buffer still has data in it, this indicates a partial frame has been received and the connection is being terminated abruptly. This is an error condition and Err is returned.

The Buf trait

When reading from the stream, read_buf is called. This version of the read function takes a value implementing BufMut from the bytes crate.

First, consider how we would implement the same read loop using read(). Vec<u8> could be used instead of BytesMut.

  1. use tokio::net::TcpStream;
  2. pub struct Connection {
  3. stream: TcpStream,
  4. buffer: Vec<u8>,
  5. cursor: usize,
  6. }
  7. impl Connection {
  8. pub fn new(stream: TcpStream) -> Connection {
  9. Connection {
  10. stream,
  11. // Allocate the buffer with 4kb of capacity.
  12. buffer: vec![0; 4096],
  13. cursor: 0,
  14. }
  15. }
  16. }

And the read_frame() function on Connection:

  1. use mini_redis::{Frame, Result};
  2. pub async fn read_frame(&mut self)
  3. -> Result<Option<Frame>>
  4. {
  5. loop {
  6. if let Some(frame) = self.parse_frame()? {
  7. return Ok(Some(frame));
  8. }
  9. // Ensure the buffer has capacity
  10. if self.buffer.len() == self.cursor {
  11. // Grow the buffer
  12. self.buffer.resize(self.cursor * 2, 0);
  13. }
  14. // Read into the buffer, tracking the number
  15. // of bytes read
  16. let n = self.stream.read(
  17. &mut self.buffer[self.cursor..]).await?;
  18. if 0 == n {
  19. if self.cursor == 0 {
  20. return Ok(None);
  21. } else {
  22. return Err("connection reset by peer".into());
  23. }
  24. } else {
  25. // Update our cursor
  26. self.cursor += n;
  27. }
  28. }
  29. }

When working with byte arrays and read, we must also maintain a cursor tracking how much data has been buffered. We must make sure to pass the empty portion of the buffer to read(). Otherwise, we would overwrite buffered data. If our buffer gets filled up, we must grow the buffer in order to keep reading. In parse_frame() (not included), we would need to parse data contained by self.buffer[..self.cursor].

Because pairing a byte array with a cursor is very common, the bytes crate provides an abstraction representing a byte array and cursor. The Buf trait is implemented by types from which data can be read. The BufMut trait is implemented by types into which data can be written. When passing a T: BufMut to read_buf(), the buffer’s internal cursor is automatically updated by read_buf. Because of this, in our version of read_frame, we do not need to manage our own cursor.

Additionally, when using Vec<u8>, the buffer must be initialized. vec![0; 4096] allocates an array of 4096 bytes and writes zero to every entry. When resizing the buffer, the new capacity must also be initialized with zeros. The initialization process is not free. When working with BytesMut and BufMut, capacity is uninitialized. The BytesMut abstraction prevents us from reading the uninitialized memory. This lets us avoid the initialization step.