Buffered writes

The other half of the framing API is the write_frame(frame) function. This function writes an entire frame to the socket. In order to minimize write syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame(), the entire frame is not always buffered to a byte array before writing to the socket.

Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes). The wire format of a bulk frame is a frame head, which consists of the $ character followed by the data length in bytes. The majority of the frame is the contents of the Bytes value. If the data is large, copying it to an intermediate buffer would be costly.

To implement buffered writes, we will use the BufWriter struct. This struct is initialized with a T: AsyncWrite and implements AsyncWrite itself. When write is called on BufWriter, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.

We will not attempt a full implementation of write_frame() as part of the tutorial. See the full implementation here.

First, the Connection struct is updated:

  1. use tokio::io::BufWriter;
  2. use tokio::net::TcpStream;
  3. use bytes::BytesMut;
  4. pub struct Connection {
  5. stream: BufWriter<TcpStream>,
  6. buffer: BytesMut,
  7. }
  8. impl Connection {
  9. pub fn new(stream: TcpStream) -> Connection {
  10. Connection {
  11. stream: BufWriter::new(stream),
  12. buffer: BytesMut::with_capacity(4096),
  13. }
  14. }
  15. }

Next, write_frame() is implemented.

  1. use tokio::io::{self, AsyncWriteExt};
  2. use mini_redis::Frame;
  3. async fn write_value(&mut self, frame: &Frame)
  4. -> io::Result<()>
  5. {
  6. match frame {
  7. Frame::Simple(val) => {
  8. self.stream.write_u8(b'+').await?;
  9. self.stream.write_all(val.as_bytes()).await?;
  10. self.stream.write_all(b"\r\n").await?;
  11. }
  12. Frame::Error(val) => {
  13. self.stream.write_u8(b'-').await?;
  14. self.stream.write_all(val.as_bytes()).await?;
  15. self.stream.write_all(b"\r\n").await?;
  16. }
  17. Frame::Integer(val) => {
  18. self.stream.write_u8(b':').await?;
  19. self.write_decimal(*val).await?;
  20. }
  21. Frame::Null => {
  22. self.stream.write_all(b"$-1\r\n").await?;
  23. }
  24. Frame::Bulk(val) => {
  25. let len = val.len();
  26. self.stream.write_u8(b'$').await?;
  27. self.write_decimal(len as u64).await?;
  28. self.stream.write_all(val).await?;
  29. self.stream.write_all(b"\r\n").await?;
  30. }
  31. Frame::Array(_val) => unimplemented!(),
  32. }
  33. self.stream.flush().await;
  34. Ok(())
  35. }

The functions used here are provided by AsyncWriteExt. They are available on TcpStream as well, but it would not be advisable to issue single byte writes without the intermediate buffer.

The function ends with a call to self.stream.flush().await. Because BufWriter stores writes in an intermediate buffer, calls to write do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush() writes any data pending in the buffer to the socket.

Another alternative would be to not call flush() in write_frame(). Instead, provide a flush() function on Connection. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write syscall. Doing this complicates the Connection API. Simplicity is one of Mini-Redis’ goals, so we decided to include the flush().await call in fn write_frame().