Using AsyncRead and AsyncWrite directly

So far, we have primarily talked about AsyncRead and AsyncWrite inthe context of I/O combinators provided by Tokio. While these are oftenenough, sometimes you need to implement your own combinators thatwant to perform asynchronous reads and writes directly.

Reading data with AsyncRead

The heart of AsyncRead is the poll_read method. It maps theWouldBlock error that indicates that an I/O read operation wouldhave blocked into NotReady, which in turn lets us interoperate withthe world of futures. When you write a Future (or something like it,such as Stream) that internally contains an AsyncRead, poll_readis likely the method you will be interacting with.

The important thing to keep in mind with poll_read is that it followsthe same contract as Future::poll. Specifically, it cannot returnNotReady unless it has arranged for the current task to be notifiedwhen it can make progress again. This fact is what lets us callpoll_read inside of poll in our own Futures; we know that we areupholding the contract of poll when we forward a NotReady frompoll_read, because poll_read follows that same contract!

The exact mechanism Tokio uses to ensure that poll_read later notifiesthe current task is out of scope for this section, but you can read moreabout it in the non-blocking I/O section of Tokio internals if you’reinterested.

With that all said, let’s look at how we might implement theread_exact method ourselves!

  1. # extern crate tokio;
  2. #[macro_use]
  3. extern crate futures;
  4. # fn main() {}
  5. use std::io;
  6. use tokio::prelude::*;
  7. // This is going to be our Future.
  8. // In the common case, this is set to Some(Reading),
  9. // but we'll set it to None when we return Async::Ready
  10. // so that we can return the reader and the buffer.
  11. struct ReadExact<R, T>(Option<Reading<R, T>>);
  12. struct Reading<R, T> {
  13. // This is the stream we're reading from.
  14. reader: R,
  15. // This is the buffer we're reading into.
  16. buffer: T,
  17. // And this is how far into the buffer we've written.
  18. pos: usize,
  19. }
  20. // We want to be able to construct a ReadExact over anything
  21. // that implements AsyncRead, and any buffer that can be
  22. // thought of as a &mut [u8].
  23. fn read_exact<R, T>(reader: R, buffer: T) -> ReadExact<R, T>
  24. where
  25. R: AsyncRead,
  26. T: AsMut<[u8]>,
  27. {
  28. ReadExact(Some(Reading {
  29. reader,
  30. buffer,
  31. // Initially, we've read no bytes into buffer.
  32. pos: 0,
  33. }))
  34. }
  35. impl<R, T> Future for ReadExact<R, T>
  36. where
  37. R: AsyncRead,
  38. T: AsMut<[u8]>,
  39. {
  40. // When we've filled up the buffer, we want to return both the buffer
  41. // with the data that we read and the reader itself.
  42. type Item = (R, T);
  43. type Error = io::Error;
  44. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  45. match self.0 {
  46. Some(Reading {
  47. ref mut reader,
  48. ref mut buffer,
  49. ref mut pos,
  50. }) => {
  51. let buffer = buffer.as_mut();
  52. // Check that we haven't finished
  53. while *pos < buffer.len() {
  54. // Try to read data into the remainder of the buffer.
  55. // Just like read in std::io::Read, poll_read *can* read
  56. // fewer bytes than the length of the buffer it is given,
  57. // and we need to handle that by looking at its return
  58. // value, which is the number of bytes actually read.
  59. //
  60. // Notice that we are using try_ready! here, so if poll_read
  61. // returns NotReady (or an error), we will do the same!
  62. // We uphold the contract that we have arranged to be
  63. // notified later because poll_read follows that same
  64. // contract, and _it_ returned NotReady.
  65. let n = try_ready!(reader.poll_read(&mut buffer[*pos..]));
  66. *pos += n;
  67. // If no bytes were read, but there was no error, this
  68. // generally implies that the reader will provide no more
  69. // data (for example, because the TCP connection was closed
  70. // by the other side).
  71. if n == 0 {
  72. return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
  73. }
  74. }
  75. }
  76. None => panic!("poll a ReadExact after it's done"),
  77. }
  78. // We need to return the reader and the buffer, which we can only
  79. // do by moving them out of self. We do this by taking our state
  80. // and leaving `None`. This _should_ be fine, because poll()
  81. // requires callers to not call poll() again after Ready has been
  82. // returned, so we should only ever see Some(Reading) when poll()
  83. // is called.
  84. let reading = self.0.take().expect("must have seen Some above");
  85. Ok(Async::Ready((reading.reader, reading.buffer)))
  86. }
  87. }

Writing data with AsyncWrite

Just like pollread is the core piece of AsyncRead, poll_write isthe core of AsyncWrite. Like poll_read, it maps the WouldBlockerror that indicates that an I/O write_ operation would have blocked intoNotReady, which again lets us interoperate with the world offutures. AsyncWrite also has a poll_flush, which provides anasynchronous analogue to Write’s flush method. The role ofpoll_flush is to make sure that any bytes previously written bypoll_write are, well, flushed onto the underlying I/O resource(written out in network packets for example). Similar to poll_write, itwraps around Write::flush, and maps a WouldBlock error intoNotReady to indicate that the flushing is still ongoing.

AsyncWrite’s poll_write and poll_flush follow the same contract asFuture::poll and AsyncRead::poll_read, namely that if they returnNotReady, they have arranged for the current task to be notified whenthey can make progress again. Like with poll_read, this means that wecan safely call these methods in our own futures, and know that we arealso following the contract.

Tokio uses the same mechanism to manage notifications for poll_writeand poll_flush as it does for poll_read, and you can read more aboutit in the non-blocking I/O section of Tokio internals.

Shutdown

AsyncWrite also adds one method that is not part of Write:shutdown. From its documentation:

Initiates or attempts to shut down this writer, returning success whenthe I/O connection has completely shut down.This method is intended to be used for asynchronous shutdown of I/Oconnections. For example this is suitable for implementing shutdown ofa TLS connection or calling TcpStream::shutdown on a proxiedconnection. Protocols sometimes need to flush out final pieces of dataor otherwise perform a graceful shutdown handshake, reading/writingmore data as appropriate. This method is the hook for such protocolsto implement the graceful shutdown logic.

This sums shutdown up pretty nicely: it is a way to tell the writerthat no more data is coming, and that it should indicate in whatever waythe underlying I/O protocol requires. For TCP connections, for example,this usually entails closing the writing side of the TCP channel so thatthe other end receives and end-of-file in the form of a read thatreturns 0 bytes. You can often think of shutdown as what you would_have done synchronously in the implementation of Drop; it’s just thatin the asynchronous world, you can’t easily do something in Dropbecause you need to have an executor that keeps polling your writer!

Note that calling shutdown on a write “half” of a type that implementsAsyncWrite _and
AsyncRead does not shut down the read “half”. Youcan usually still continue reading data as you please until the otherside shuts down their corresponding write “half”.

An example of using AsyncWrite

Without further ado, let’s take a look at how we might implement

  1. # extern crate tokio;
  2. #[macro_use]
  3. extern crate futures;
  4. # fn main() {}
  5. use std::io;
  6. use tokio::prelude::*;
  7. // This is going to be our Future.
  8. // It'll seem awfully familiar to ReadExact above!
  9. // In the common case, this is set to Some(Writing),
  10. // but we'll set it to None when we return Async::Ready
  11. // so that we can return the writer and the buffer.
  12. struct WriteAll<W, T>(Option<Writing<W, T>>);
  13. struct Writing<W, T> {
  14. // This is the stream we're writing into.
  15. writer: W,
  16. // This is the buffer we're writing from.
  17. buffer: T,
  18. // And this is much of the buffer we've written.
  19. pos: usize,
  20. }
  21. // We want to be able to construct a WriteAll over anything
  22. // that implements AsyncWrite, and any buffer that can be
  23. // thought of as a &[u8].
  24. fn write_all<W, T>(writer: W, buffer: T) -> WriteAll<W, T>
  25. where
  26. W: AsyncWrite,
  27. T: AsRef<[u8]>,
  28. {
  29. WriteAll(Some(Writing {
  30. writer,
  31. buffer,
  32. // Initially, we've written none of the bytes from buffer.
  33. pos: 0,
  34. }))
  35. }
  36. impl<W, T> Future for WriteAll<W, T>
  37. where
  38. W: AsyncWrite,
  39. T: AsRef<[u8]>,
  40. {
  41. // When we've written out the entire buffer, we want to return
  42. // both the buffer and the writer so that the user can re-use them.
  43. type Item = (W, T);
  44. type Error = io::Error;
  45. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  46. match self.0 {
  47. Some(Writing {
  48. ref mut writer,
  49. ref buffer,
  50. ref mut pos,
  51. }) => {
  52. let buffer = buffer.as_ref();
  53. // Check that we haven't finished
  54. while *pos < buffer.len() {
  55. // Try to write the remainder of the buffer into the writer.
  56. // Just like write in std::io::Write, poll_write *can* write
  57. // fewer bytes than the length of the buffer it is given,
  58. // and we need to handle that by looking at its return
  59. // value, which is the number of bytes actually written.
  60. //
  61. // We are using try_ready! here, just like in poll_read in
  62. // ReadExact, so that if poll_write returns NotReady (or an
  63. // error), we will do the same! We uphold the contract that
  64. // we have arranged to be notified later because poll_write
  65. // follows that same contract, and _it_ returned NotReady.
  66. let n = try_ready!(writer.poll_write(&buffer[*pos..]));
  67. *pos += n;
  68. // If no bytes were written, but there was no error, this
  69. // generally implies that something weird happened under us.
  70. // We make sure to turn this into an error for the caller to
  71. // deal with.
  72. if n == 0 {
  73. return Err(io::Error::new(
  74. io::ErrorKind::WriteZero,
  75. "zero-length write",
  76. ));
  77. }
  78. }
  79. }
  80. None => panic!("poll a WriteAll after it's done"),
  81. }
  82. // We use the same trick as in ReadExact to ensure that we can return
  83. // the buffer and the writer once the entire buffer has been written out.
  84. let writing = self.0.take().expect("must have seen Some above");
  85. Ok(Async::Ready((writing.writer, writing.buffer)))
  86. }
  87. }

Next up: Implementing Async Read/Write