Reading and Writing Data

Non-blocking I/O

In the overview we mentioned briefly that Tokio’s I/O types implementnon-blocking variants of std::io::Read and std::io::Write calledAsyncRead and AsyncWrite. These are an integral part of Tokio’sI/O story, and are important to understand when working with I/O code.

Note: in this section, we’ll primarily talk about AsyncRead, butAsyncWrite is pretty much exactly the same, just for writing data toan I/O resource (like a TCP socket) instead of reading from it.

So, let’s take a look at AsyncRead and see what all the fuss isabout:

  1. use std::io::Read;
  2. pub trait AsyncRead: Read {
  3. // ...
  4. // various provided methods
  5. // ...
  6. }

Huh. What’s going on here? Well, AsyncRead is really just Readfrom std::io, along with an additional contract. The documentationfor AsyncRead reads:

This trait inherits from std::io::Read and indicates that an I/Oobject is non-blocking. All non-blocking I/O objects must return anerror when bytes are unavailable instead of blocking the currentthread.

That last part is critical. If you implement AsyncRead for a type, youare promising that calling read on it will never block. Instead,you are expected to return an io::ErrorKind::WouldBlock error toindicate that the operation would have blocked (for example becausethere was no more data available) if it wasn’t non-blocking. Theprovided poll_read method relies on this:

  1. fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
  2. match self.read(buf) {
  3. Ok(t) => Ok(Async::Ready(t)),
  4. Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
  5. Ok(Async::NotReady)
  6. }
  7. Err(e) => Err(e),
  8. }
  9. }

This code should look familiar. If you squint a little, poll_readlooks a lot like Future::poll. And that’s because that’s almostexactly what it is! A type that implements AsyncRead essentiallybehaves like a future that you can try to read data out of, and it willinform you whether it is Ready (and some data was read) or NotReady(and you’ll have to poll_read again later).

Working with I/O futures

Since AsyncRead (and AsyncWrite) are pretty much futures, you caneasily embed them in your own futures and poll_read them just as youwould poll any other embedded Future. You can even use try_ready!to propagate errors and NotReady as appropriate. We’ll talk more aboutdirectly using these traits in the next section. However, to make lifesimpler in a number of situations, Tokio provides a number of usefulcombinators in tokio::io for performing common I/O operations on topof AsyncRead and AsyncWrite. In general, these provide wrappersaround AsyncRead or AsyncWrite types that implement Future andthat complete when a given read or write operation has completed.

The first handy I/O combinator is read_exact. It takes a mutablebuffer (&mut [u8]) and an implementor of AsyncRead as arguments, andreturns a Future that reads exactly enough bytes to fill the buffer.Internally the returned future just keeps track of how many bytes it hasread thus far, and continues to issue poll_ready on the AsyncRead(returning NotReady if necessary) until it has exactly filled thebuffer. At that point, it returns Ready(buf) with the filled buffer.Let’s take a look:

  1. # extern crate tokio;
  2. use tokio::net::tcp::TcpStream;
  3. use tokio::prelude::*;
  4. # fn main() {
  5. let addr = "127.0.0.1:12345".parse().unwrap();
  6. let read_8_fut = TcpStream::connect(&addr)
  7. .and_then(|stream| {
  8. // We need to create a buffer for read_exact to write into.
  9. // A Vec<u8> is a good starting point.
  10. // read_exact will read buffer.len() bytes, so we need
  11. // to make sure the Vec isn't empty!
  12. let mut buf = vec![0; 8];
  13. // read_exact returns a Future that resolves when
  14. // buffer.len() bytes have been read from stream.
  15. tokio::io::read_exact(stream, buf)
  16. })
  17. .inspect(|(_stream, buf)| {
  18. // Notice that we get both the buffer and the stream back
  19. // here, so that we can now continue using the stream to
  20. // send a reply for example.
  21. println!("got eight bytes: {:x?}", buf);
  22. });
  23. // We can now either chain more futures onto read_8_fut,
  24. // or if all we wanted to do was read and print those 8
  25. // bytes, we can just use tokio::run to run it (taking
  26. // care to map Future::Item and Future::Error to ()).
  27. # }

A second I/O combinator that is often useful is write_all. It takesa buffer (&[u8]) and an implementor of AsyncWrite as arguments, andreturns a Future that writes out all the bytes of the buffer into theAsyncWrite using poll_write. When the Future resolves, the entirebuffer has been written out and flushed. We can combine this withread_exact to echo whatever the server says back to it:

  1. # extern crate tokio;
  2. use tokio::net::tcp::TcpStream;
  3. use tokio::prelude::*;
  4. # fn main() {
  5. # let addr = "127.0.0.1:12345".parse().unwrap();
  6. let echo_fut = TcpStream::connect(&addr)
  7. .and_then(|stream| {
  8. // We're going to read the first 32 bytes the server sends us
  9. // and then just echo them back:
  10. let mut buf = vec![0; 32];
  11. // First, we need to read the server's message
  12. tokio::io::read_exact(stream, buf)
  13. })
  14. .and_then(|(stream, buf)| {
  15. // Then, we use write_all to write the entire buffer back:
  16. tokio::io::write_all(stream, buf)
  17. })
  18. .inspect(|(_stream, buf)| {
  19. println!("echoed back {} bytes: {:x?}", buf.len(), buf);
  20. });
  21. // As before, we can chain more futures onto echo_fut,
  22. // or declare ourselves finished and run it with tokio::run.
  23. # }

Tokio also comes with an I/O combinator to implement this kind ofcopying. It is (perhaps unsurprisingly) called copy. copy takesan AsyncRead and an AsyncWrite, and continuously writes all thebytes read out from the AsyncRead into the AsyncWrite untilpollread indicates that the input has been closed and all the byteshave been written out and flushed to the output. This is the combinatorwe used in our echo server! It greatly simplifies our example fromabove, and also makes it work for _any amount of server data!

  1. # extern crate tokio;
  2. use tokio::net::tcp::TcpStream;
  3. use tokio::prelude::*;
  4. # fn main() {
  5. # let addr = "127.0.0.1:12345".parse().unwrap();
  6. let echo_fut = TcpStream::connect(&addr)
  7. .and_then(|stream| {
  8. // First, we need to get a separate read and write handle for
  9. // the connection so that we can forward one to the other.
  10. // See "Split I/O resources" below for more details.
  11. let (reader, writer) = stream.split();
  12. // Then, we can use copy to send all the read bytes to the
  13. // writer, and return how many bytes it read/wrote.
  14. tokio::io::copy(reader, writer)
  15. })
  16. .inspect(|(bytes_copied, r, w)| {
  17. println!("echoed back {} bytes", bytes_copied);
  18. });
  19. # }

Pretty neat!

The combinators we’ve talked about so far are all for pretty low-leveloperations: read these bytes, write these bytes, copy these bytes. Oftentimes though, you want to operate on higher-level representations, like“lines”. Tokio has you covered there too! lines takes anAsyncRead, and returns a Stream that yields each line from the inputuntil there are no more lines to read:

  1. # extern crate tokio;
  2. use tokio::net::tcp::TcpStream;
  3. use tokio::prelude::*;
  4. # fn main() {
  5. # let addr = "127.0.0.1:12345".parse().unwrap();
  6. let lines_fut = TcpStream::connect(&addr).and_then(|stream| {
  7. // We want to parse out each line we receive on stream.
  8. // To do that, we may need to buffer input for a little while
  9. // (if the server sends two lines in one packet for example).
  10. // Because of that, lines requires that the AsyncRead it is
  11. // given *also* implements BufRead. This may be familiar if
  12. // you've ever used the lines() method from std::io::BufRead.
  13. // Luckily, BufReader from the standard library gives us that!
  14. let stream = std::io::BufReader::new(stream);
  15. tokio::io::lines(stream).for_each(|line| {
  16. println!("server sent us the line: {}", line);
  17. // This closure is called for each line we receive,
  18. // and returns a Future that represents the work we
  19. // want to do before accepting the next line.
  20. // In this case, we just wanted to print, so we
  21. // don't need to do anything more.
  22. Ok(())
  23. })
  24. });
  25. # }

There are also plenty more I/O combinators in tokio::io that you maywant to take a look at before you decide to write your own!

Split I/O resources

Both the copy example above and the echo server contained thismysterious-looking snippet:

  1. let (reader, writer) = socket.split();
  2. let bytes_copied = tokio::io::copy(reader, writer);

As the comment above it explains, we split the TcpStream (socket)into a read “half” and a write “half”, and use the copy combinatorwe discussed above to produce a Future that asynchronously copies allthe data from the read half to the write half. But why is this “split”required in the first place? After all, AsyncRead::poll_ready andAsyncWrite::poll_write just take &mut self.

To answer that, we need to think back to Rust’s ownership system alittle. Recall that Rust only allows you to have a single mutablereference to a given variable at a time. But we have to pass two_arguments to copy, one for where to read from, and one for where towrite to. However, once we pass a mutable reference to the TcpStreamas one of the arguments, we cannot also construct a second mutablereference to it to pass as the second argument! _We know that copywon’t read and write at the same time to those, but that’s notexpressed in copy’s types.

Enter split, a provided method on the AsyncRead trait when thetype also implements AsyncWrite. If we look at the signature, we see

  1. fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
  2. where Self: AsyncWrite { ... }

The returned ReadHalf implements AsyncRead, and the WriteHalfimplements AsyncWrite. And crucially, we now have two _separate_pointers into our type, which we can pass around separately. This comesin handy for copy, but it also means that we can pass each half toa different future, and handle the reads and writes completelyindependently! Behind the scenes, split ensures that if we both tryto read and write at the same time, only one of them happen at a time.

Transports

Turning an AsyncRead into a Stream (like lines does) or anAsyncWrite into a Sink is pretty common in applications that need todo I/O. They often want to abstract away the way bytes are retrievedfrom or put on the wire, and let most of their application code dealwith more convenient “requests” and “response” types. This is oftenknown as “framing”: instead of viewing your connections as consisting ofjust bytes in/bytes out, you view them as “frames” of application datathat are received and sent. A framed stream of bytes is often referredto as a “transport”.

Transports are typically implemented using a codec. For example,lines represents a very simple codec that separates a byte string bythe newline character, \n, and parses each frame as a string beforepassing it to the application. Tokio provides helpers for implementingnew codecs in tokio::codec; you implement the Encoder andDecoder traits for your transport, and use Framed::new to makea Sink + Stream from your byte stream (like a TcpStream). It’salmost like magic! There are versions for doing just the read or writeside of a codec too (like lines). Let’s take a look at writing asimple implementation of a line-based codec (even though LinesCodecexists):

  1. # extern crate tokio;
  2. extern crate bytes;
  3. use bytes::{BufMut, BytesMut};
  4. use tokio::codec::{Decoder, Encoder};
  5. use tokio::prelude::*;
  6. // This is where we'd keep track of any extra book-keeping information
  7. // our transport needs to operate.
  8. struct LinesCodec;
  9. // Turns string errors into std::io::Error
  10. fn bad_utf8<E>(_: E) -> std::io::Error {
  11. std::io::Error::new(std::io::ErrorKind::InvalidData, "Unable to decode input as UTF8")
  12. }
  13. // First, we implement encoding, because it's so straightforward.
  14. // Just write out the bytes of the string followed by a newline!
  15. // Easy-peasy.
  16. impl Encoder for LinesCodec {
  17. type Item = String;
  18. type Error = std::io::Error;
  19. fn encode(&mut self, line: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
  20. // Note that we're given a BytesMut here to write into.
  21. // BytesMut comes from the bytes crate, and aims to give
  22. // efficient read/write access to a buffer. To use it,
  23. // we have to reserve memory before we try to write to it.
  24. buf.reserve(line.len() + 1);
  25. // And now, we write out our stuff!
  26. buf.put(line);
  27. buf.put_u8(b'\n');
  28. Ok(())
  29. }
  30. }
  31. // The decoding is a little trickier, because we need to look for
  32. // newline characters. We also need to handle *two* cases: the "normal"
  33. // case where we're just asked to find the next string in a bunch of
  34. // bytes, and the "end" case where the input has ended, and we need
  35. // to find any remaining strings (the last of which may not end with a
  36. // newline!
  37. impl Decoder for LinesCodec {
  38. type Item = String;
  39. type Error = std::io::Error;
  40. // Find the next line in buf!
  41. fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
  42. Ok(if let Some(offset) = buf.iter().position(|b| *b == b'\n') {
  43. // We found a newline character in this buffer!
  44. // Cut out the line from the buffer so we don't return it again.
  45. let mut line = buf.split_to(offset + 1);
  46. // And then parse it as UTF-8
  47. Some(
  48. std::str::from_utf8(&line[..line.len() - 1])
  49. .map_err(bad_utf8)?
  50. .to_string(),
  51. )
  52. } else {
  53. // There are no newlines in this buffer, so no lines to speak of.
  54. // Tokio will make sure to call this again when we have more bytes.
  55. None
  56. })
  57. }
  58. // Find the next line in buf when there will be no more data coming.
  59. fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
  60. Ok(match self.decode(buf)? {
  61. Some(frame) => {
  62. // There's a regular line here, so we may as well just return that.
  63. Some(frame)
  64. },
  65. None => {
  66. // There are no more lines in buf!
  67. // We know there are no more bytes coming though,
  68. // so we just return the remainder, if any.
  69. if buf.is_empty() {
  70. None
  71. } else {
  72. Some(
  73. std::str::from_utf8(&buf.take()[..])
  74. .map_err(bad_utf8)?
  75. .to_string(),
  76. )
  77. }
  78. }
  79. })
  80. }
  81. }

Next up: Using AsyncRead and AsyncWrite directly