直接使用AsyncRead和AsyncWrite

到目前为止,我们都是在Tokio提供的I/O组合器场景下讨论了AsyncReadAsyncWrite。通常这些就够了,但有时您需要实现自己的组合器,直接执行异步读写。

用AsyncRead读取数据

AsyncRead的核心是poll_read方法。该方法检查Err类型是否为WouldBlock,如果是,表明I/O read操作可能被阻塞的,则返回NotReady,这就使我们可以与futures互操作。当你写一个内部包含AsyncRead的Future(或类似的东西,例如Stream)时,poll_read 很可能就是你将要与之交互的方法。

要记住一点:poll_read遵循与Future::poll相同的契约。具体而言,你不能返回NotReady,除非你已安排当前任务在取得进展时,会被通知再次被调用。基于此,我们可以在自己futures的poll方法内调用poll_read; 当我们从poll_read中转发一个NotReady的时候,我们知道这是遵循poll合约的,因为poll_read遵循相同的合约。

Tokio用于确保poll_read以后通知当前任务task的确切机制不在本节讨论的范围,但如果您感兴趣,可以在Tokio内部原理的非阻塞I/O中阅读更多相关内容。

有了这一切,让我们看看如何自己实现read_exact 这个方法!

  1. #[macro_use]
  2. extern crate futures;
  3. use std::io;
  4. use tokio::prelude::*;
  5. // This is going to be our Future.
  6. // In the common case, this is set to Some(Reading),
  7. // but we'll set it to None when we return Async::Ready
  8. // so that we can return the reader and the buffer.
  9. struct ReadExact<R, T>(Option<Reading<R, T>>);
  10. struct Reading<R, T> {
  11. // This is the stream we're reading from.
  12. reader: R,
  13. // This is the buffer we're reading into.
  14. buffer: T,
  15. // And this is how far into the buffer we've written.
  16. pos: usize,
  17. }
  18. // We want to be able to construct a ReadExact over anything
  19. // that implements AsyncRead, and any buffer that can be
  20. // thought of as a &mut [u8].
  21. fn read_exact<R, T>(reader: R, buffer: T) -> ReadExact<R, T>
  22. where
  23. R: AsyncRead,
  24. T: AsMut<[u8]>,
  25. {
  26. ReadExact(Some(Reading {
  27. reader,
  28. buffer,
  29. // Initially, we've read no bytes into buffer.
  30. pos: 0,
  31. }))
  32. }
  33. impl<R, T> Future for ReadExact<R, T>
  34. where
  35. R: AsyncRead,
  36. T: AsMut<[u8]>,
  37. {
  38. // When we've filled up the buffer, we want to return both the buffer
  39. // with the data that we read and the reader itself.
  40. type Item = (R, T);
  41. type Error = io::Error;
  42. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  43. match self.0 {
  44. Some(Reading {
  45. ref mut reader,
  46. ref mut buffer,
  47. ref mut pos,
  48. }) => {
  49. let buffer = buffer.as_mut();
  50. // Check that we haven't finished
  51. while *pos < buffer.len() {
  52. // Try to read data into the remainder of the buffer.
  53. // Just like read in std::io::Read, poll_read *can* read
  54. // fewer bytes than the length of the buffer it is given,
  55. // and we need to handle that by looking at its return
  56. // value, which is the number of bytes actually read.
  57. //
  58. // Notice that we are using try_ready! here, so if poll_read
  59. // returns NotReady (or an error), we will do the same!
  60. // We uphold the contract that we have arranged to be
  61. // notified later because poll_read follows that same
  62. // contract, and _it_ returned NotReady.
  63. let n = try_ready!(reader.poll_read(&mut buffer[*pos..]));
  64. *pos += n;
  65. // If no bytes were read, but there was no error, this
  66. // generally implies that the reader will provide no more
  67. // data (for example, because the TCP connection was closed
  68. // by the other side).
  69. if n == 0 {
  70. return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
  71. }
  72. }
  73. }
  74. None => panic!("poll a ReadExact after it's done"),
  75. }
  76. // We need to return the reader and the buffer, which we can only
  77. // do by moving them out of self. We do this by taking our state
  78. // and leaving `None`. This _should_ be fine, because poll()
  79. // requires callers to not call poll() again after Ready has been
  80. // returned, so we should only ever see Some(Reading) when poll()
  81. // is called.
  82. let reading = self.0.take().expect("must have seen Some above");
  83. Ok(Async::Ready((reading.reader, reading.buffer)))
  84. }
  85. }

用AsyncWrite写数据

就像poll_readAsyncRead的核心一样,poll_write也是AsyncWrite的核心部分。和poll_read一样,该方法检查Err类型是否为WouldBlock,如果是,则表明write操作将被阻塞,就返回NotReady,这再次让我们与futures互操作。AsyncWrite也有一个poll_flush,它提供了一个Write flush的异步版本。poll_flush确保先前通过poll_write写入的任何字节都被刷到底层I/O资源上(例如,发送网络数据包)。类似于poll_write,它封装了Write::flush,映射WouldBlock错误为NotReady,指示flush仍在进行中。

AsyncWritepoll_write,以及poll_flush都遵循与Future::pollAsyncRead::poll_read相同的合约,即如果你想返回NotReady,则必须保证当前任务能够被在可以进行下去的时候被通知。和poll_read一样,这意味着我们可以安全地在我们自己的futures中调用这些方法,因为我们知道我们也在遵守合同。

Tokio使用和poll_read相同的通知机制来通知poll_writepoll_flush,你可以在Tokio内部原理的非阻塞I/O中阅读更多相关内容。

关闭

AsyncWrite还添加了一个不属于Write的方法:shutdown。从它的文档:

启动或尝试关闭此writer,在I/O连接完全关闭时返回成功。

此方法旨在用于I/O连接的异步关闭。例如,这适用于实现TLS连接的关闭或调用TcpStream::shutdown来关闭代理连接proxied connection。一些协议有时需要清除最终的数据,或者发起优雅关闭握手,适当地读写更多数据。此方法就是实现这些协议所需的优雅关闭握手逻辑的钩子方法(扩展点)。

总结shutdown:它是一种告诉写一方不再有新数据产生的方法,并且它应该以底层I/O协议所需的任何方式指示。例如,对于TCP连接,这通常需要关闭TCP通道channel的写入端,这样,另一端就可以读到0字节,表明已到文件尾。通常,你可以将shutdown视为你要实现Drop时你需要同步地执行的方法; 只是在异步世界中,你不能在Drop简单地处理,因为你需要有一个执行器executor轮询你的writer!

请注意,在一个实现了AsyncWriteAsyncRead写半部分调用shutdown不会关闭读半部分。您通常可以继续随意读取数据,直到另一方关闭相应的写半

一个使用AsyncWrite的例子

废话少说,让我们来看看我们如何实现:

  1. #[macro_use]
  2. extern crate futures;
  3. use std::io;
  4. use tokio::prelude::*;
  5. // This is going to be our Future.
  6. // It'll seem awfully familiar to ReadExact above!
  7. // In the common case, this is set to Some(Writing),
  8. // but we'll set it to None when we return Async::Ready
  9. // so that we can return the writer and the buffer.
  10. struct WriteAll<W, T>(Option<Writing<W, T>>);
  11. struct Writing<W, T> {
  12. // This is the stream we're writing into.
  13. writer: W,
  14. // This is the buffer we're writing from.
  15. buffer: T,
  16. // And this is much of the buffer we've written.
  17. pos: usize,
  18. }
  19. // We want to be able to construct a WriteAll over anything
  20. // that implements AsyncWrite, and any buffer that can be
  21. // thought of as a &[u8].
  22. fn write_all<W, T>(writer: W, buffer: T) -> WriteAll<W, T>
  23. where
  24. W: AsyncWrite,
  25. T: AsRef<[u8]>,
  26. {
  27. WriteAll(Some(Writing {
  28. writer,
  29. buffer,
  30. // Initially, we've written none of the bytes from buffer.
  31. pos: 0,
  32. }))
  33. }
  34. impl<W, T> Future for WriteAll<W, T>
  35. where
  36. W: AsyncWrite,
  37. T: AsRef<[u8]>,
  38. {
  39. // When we've written out the entire buffer, we want to return
  40. // both the buffer and the writer so that the user can re-use them.
  41. type Item = (W, T);
  42. type Error = io::Error;
  43. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  44. match self.0 {
  45. Some(Writing {
  46. ref mut writer,
  47. ref buffer,
  48. ref mut pos,
  49. }) => {
  50. let buffer = buffer.as_ref();
  51. // Check that we haven't finished
  52. while *pos < buffer.len() {
  53. // Try to write the remainder of the buffer into the writer.
  54. // Just like write in std::io::Write, poll_write *can* write
  55. // fewer bytes than the length of the buffer it is given,
  56. // and we need to handle that by looking at its return
  57. // value, which is the number of bytes actually written.
  58. //
  59. // We are using try_ready! here, just like in poll_read in
  60. // ReadExact, so that if poll_write returns NotReady (or an
  61. // error), we will do the same! We uphold the contract that
  62. // we have arranged to be notified later because poll_write
  63. // follows that same contract, and _it_ returned NotReady.
  64. let n = try_ready!(writer.poll_write(&buffer[*pos..]));
  65. *pos += n;
  66. // If no bytes were written, but there was no error, this
  67. // generally implies that something weird happened under us.
  68. // We make sure to turn this into an error for the caller to
  69. // deal with.
  70. if n == 0 {
  71. return Err(io::Error::new(
  72. io::ErrorKind::WriteZero,
  73. "zero-length write",
  74. ));
  75. }
  76. }
  77. }
  78. None => panic!("poll a WriteAll after it's done"),
  79. }
  80. // We use the same trick as in ReadExact to ensure that we can return
  81. // the buffer and the writer once the entire buffer has been written out.
  82. let writing = self.0.take().expect("must have seen Some above");
  83. Ok(Async::Ready((writing.writer, writing.buffer)))
  84. }
  85. }