读写数据

非阻塞I/O

概述我们简单提到的是tokio的I/O类型实现了无阻塞异步版std::io::Readstd::io::Write,名为 AsyncReadAsyncWrite。这些是Tokio的I/O中不可或缺的一部分,在使用I/O代码时,理解这些东东非常重要。

注意:在本节中,我们将主要讨论AsyncReadAsyncWrite几乎完全相同,只是将数据写入I/O资源(如TCP套接字)而不是从中读取。

好,让我们来看看,AsyncRead能干什么:

  1. use std::io::Read;
  2. pub trait AsyncRead: Read {
  3. // ...
  4. // various provided methods
  5. // ...
  6. }

嗯,这都说了些啥?嗯,AsyncRead只是继承了std::io中的Read,以及一份额外的契约。AsyncRead文档中提到:

此trait继承自std::io::Read并表明I/O对象是非阻塞的。当无可用数据时,所有非阻塞I/O对象都必须返回一个error而不是阻塞当前线程。

最后一部分至关重要。如果你为一个类型实现AsyncRead,你得保证调用read而不会阻塞。相反,如果它不是非阻塞的,则应该返回io::ErrorKind::WouldBlock错误以表明操作将被阻塞(例如因为没有可用的数据)。poll_read方法依赖于此:

  1. fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
  2. match self.read(buf) {
  3. Ok(t) => Ok(Async::Ready(t)),
  4. Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
  5. Ok(Async::NotReady)
  6. }
  7. Err(e) => Err(e),
  8. }
  9. }

这段代码应该很熟悉。如果仔细看一眼,poll_read 看起来很像Future::poll。那是因为它基本上就是!一个实现AsyncRead的类型本质上就是一个可以尝试从中读取数据的Future,它将通知你是否Ready(某些数据已被读取)或NotReady(你需要再次poll_read)。

使用I/O Future

由于AsyncRead(和AsyncWrite)几乎都是Futures,你可以很容易地将它们嵌入到你自己的Futures中,poll_read它们,就像poll任何其他嵌入Future一样。您甚至可以根据需要使用try_ready!, 这个micro可以传递error和NotReady状态。我们将在下一节中更多地讨论如何直接使用这些traits。但是,在许多情况下,为了简化,Tokio在tokio::io中提供了许多有用的组合器combinator,用于在AsyncReadAsyncWrite之上执行常见的I/O操作。通常,它们封装了AsyncReadAsyncWrite类型,实现了Future,并在给定的读或写操作完成时完成。

第一个好用的I/O组合器是read_exact。它需要一个可变的buffer(&mut [u8])和AsyncRead的实现作为参数,并返回一个Future读取足够的字节来填充buffer。在内部,返回的future只是跟踪它已经读取了多少字节,并在AsyncRead时继续发出poll_ready (如果需要,返回NotReady),直到它正好填满了buffer。那时,它返回带着填满的buffer的Ready(buf)。让我们来看看:

  1. use tokio::net::tcp::TcpStream;
  2. use tokio::prelude::*;
  3. let addr = "127.0.0.1:12345".parse().unwrap();
  4. let read_8_fut = TcpStream::connect(&addr)
  5. .and_then(|stream| {
  6. // We need to create a buffer for read_exact to write into.
  7. // A Vec<u8> is a good starting point.
  8. // read_exact will read buffer.len() bytes, so we need
  9. // to make sure the Vec isn't empty!
  10. let mut buf = vec![0; 8];
  11. // read_exact returns a Future that resolves when
  12. // buffer.len() bytes have been read from stream.
  13. tokio::io::read_exact(stream, buf)
  14. })
  15. .inspect(|(_stream, buf)| {
  16. // Notice that we get both the buffer and the stream back
  17. // here, so that we can now continue using the stream to
  18. // send a reply for example.
  19. println!("got eight bytes: {:x?}", buf);
  20. });
  21. // We can now either chain more futures onto read_8_fut,
  22. // or if all we wanted to do was read and print those 8
  23. // bytes, we can just use tokio::run to run it (taking
  24. // care to map Future::Item and Future::Error to ()).

第二个常用的I/O组合器是write_all。它需要一个buffer(&[u8])和一个AsyncWrite的实现作为参数,并返回一个将缓冲区的所有字节用poll_write写入AsyncWrite的future。当Future被resolve,整个缓冲区已经写完并被刷新。我们可以结合 read_exact使用,来echo服务器的任何内容:

  1. use tokio::net::tcp::TcpStream;
  2. use tokio::prelude::*;
  3. let echo_fut = TcpStream::connect(&addr)
  4. .and_then(|stream| {
  5. // We're going to read the first 32 bytes the server sends us
  6. // and then just echo them back:
  7. let mut buf = vec![0; 32];
  8. // First, we need to read the server's message
  9. tokio::io::read_exact(stream, buf)
  10. })
  11. .and_then(|(stream, buf)| {
  12. // Then, we use write_all to write the entire buffer back:
  13. tokio::io::write_all(stream, buf)
  14. })
  15. .inspect(|(_stream, buf)| {
  16. println!("echoed back {} bytes: {:x?}", buf.len(), buf);
  17. });
  18. // As before, we can chain more futures onto echo_fut,
  19. // or declare ourselves finished and run it with tokio::run.

Tokio还带有一个I/O组合器来实现上面例子中这种复制。它(或许不足为奇)被称为copycopy接受一个AsyncRead和一个AsyncWrite,连续地将从中AsyncRead读出的所有字节,写入到AsyncWrite,直到 poll_read指示输入已经关闭并且所有字节都已写出并刷新到输出。这是我们在echo服务器中使用的组合器!它大大简化了我们上面的示例,并使其适用于任何量级的服务器数据!

  1. use tokio::net::tcp::TcpStream;
  2. use tokio::prelude::*;
  3. let echo_fut = TcpStream::connect(&addr)
  4. .and_then(|stream| {
  5. // First, we need to get a separate read and write handle for
  6. // the connection so that we can forward one to the other.
  7. // See "Split I/O resources" below for more details.
  8. let (reader, writer) = stream.split();
  9. // Then, we can use copy to send all the read bytes to the
  10. // writer, and return how many bytes it read/wrote.
  11. tokio::io::copy(reader, writer)
  12. })
  13. .inspect(|(bytes_copied, r, w)| {
  14. println!("echoed back {} bytes", bytes_copied);
  15. });

简约!

到目前为止我们谈到的组合器都是用于相当底层的操作:读取字节,写入字节,复制字节。但是,通常情况下,您希望在更高级别的表述上操作,例如“lines”。这些Tokio也帮你搞定了!lines接受一个 AsyncRead,并返回一个Stream,从输入中产生yield每一行,直到没有更多行要读:

  1. use tokio::net::tcp::TcpStream;
  2. use tokio::prelude::*;
  3. let lines_fut = TcpStream::connect(&addr).and_then(|stream| {
  4. // We want to parse out each line we receive on stream.
  5. // To do that, we may need to buffer input for a little while
  6. // (if the server sends two lines in one packet for example).
  7. // Because of that, lines requires that the AsyncRead it is
  8. // given *also* implements BufRead. This may be familiar if
  9. // you've ever used the lines() method from std::io::BufRead.
  10. // Luckily, BufReader from the standard library gives us that!
  11. let stream = std::io::BufReader::new(stream);
  12. tokio::io::lines(stream).for_each(|line| {
  13. println!("server sent us the line: {}", line);
  14. // This closure is called for each line we receive,
  15. // and returns a Future that represents the work we
  16. // want to do before accepting the next line.
  17. // In this case, we just wanted to print, so we
  18. // don't need to do anything more.
  19. Ok(())
  20. })
  21. });

tokio::io中,还有更多的I/O组合器,在你决定自己写一个之前,不妨先看一下是否已经有了实现!

拆分I/O资源

上面的copy和echo服务器例子中都包含下面这个神秘的代码片段:

  1. let (reader, writer) = socket.split();
  2. let bytes_copied = tokio::io::copy(reader, writer);

正如上面的注释所解释的那样,我们将TcpStreamsocket)拆分为读半部分和写半部分,并使用我们上面讨论的copy组合器产生一个Future异步复制从读半部分到写半部分的所有数据。但究竟为什么需要这种“split”呢?毕竟,AsyncRead::poll_readAsyncWrite::poll_write只接受参数&mut self

要回答这个问题,我们需要回顾一下Rust的Ownership机制。回想一下,Rust只允许你在任一时刻对一给定变量拥有单个可变引用。但是我们必须传递两个 参数给copy,一个用于从哪里读,另一个指定写到哪里。但是,一旦我们将一个可变引用作为其中一个参数传递给TcpStream,我们就不能构造第二个指向它的可变引用作为第二个参数传递给它!我们知道,copy 将不能同时读取和写入到其参数,但是这并没有在copy的类型定义中表示出来。

进入split方法,一个AsyncRead中提供的方法也实现了AsyncWrite。如果我们看一下方法签名,我们就会看到:

  1. fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
  2. where Self: AsyncWrite { ... }

返回的ReadHalf实现了AsyncReadWriteHalf 实现了AsyncWrite。至关重要的是,我们现在有两个独立的 指针在我们的类型中,我们可以单独传递它们。这很方便copy,但这也意味着我们可以将每一传递到不同的Future,并完全独立地处理读写操作!在幕后,split确保如果我们同时尝试读写,一次只发生其中一个。

传输

在I/O应用中,将一个AsyncRead转换为Stream(像lines这样)或 将一个AsyncWrite转化为Sink相当普遍。他们经常想要把从网络上读、写字节的方式进行抽象,并让大多数应用程序代码处理更方便的“request”和“response”类型。这通常被称为“framing”:您可以将它们视为接收和发送的应用程序数据的“帧”,而不仅仅视您的连接视为字节进、字节出。帧化的字节流通常被称为“传输”。

传输通常使用编解码器codec实现。例如, lines表述了一个非常简单的编解码器,用换行符\n分割字节字符串,并在将其传递给应用程序之前,将每个帧作为字符串解析。Tokio在tokio::codec中提供了helpers来帮助实现新的编解码器; 你为你的传输实现了EncoderDecoder traits,并用Framed::new从你的字节流中创建一个Sink + Stream(比如一个TcpStream)。这几乎就像魔术一样!有些编解码器只用读或写端(比如lines)。让我们来看一下编写基于行的编解码器的简单实现(即使LinesCodec 已经存在):

  1. extern crate bytes;
  2. use bytes::{BufMut, BytesMut};
  3. use tokio::codec::{Decoder, Encoder};
  4. use tokio::prelude::*;
  5. // This is where we'd keep track of any extra book-keeping information
  6. // our transport needs to operate.
  7. struct LinesCodec;
  8. // Turns string errors into std::io::Error
  9. fn bad_utf8<E>(_: E) -> std::io::Error {
  10. std::io::Error::new(std::io::ErrorKind::InvalidData, "Unable to decode input as UTF8")
  11. }
  12. // First, we implement encoding, because it's so straightforward.
  13. // Just write out the bytes of the string followed by a newline!
  14. // Easy-peasy.
  15. impl Encoder for LinesCodec {
  16. type Item = String;
  17. type Error = std::io::Error;
  18. fn encode(&mut self, line: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
  19. // Note that we're given a BytesMut here to write into.
  20. // BytesMut comes from the bytes crate, and aims to give
  21. // efficient read/write access to a buffer. To use it,
  22. // we have to reserve memory before we try to write to it.
  23. buf.reserve(line.len() + 1);
  24. // And now, we write out our stuff!
  25. buf.put(line);
  26. buf.put_u8(b'\n');
  27. Ok(())
  28. }
  29. }
  30. // The decoding is a little trickier, because we need to look for
  31. // newline characters. We also need to handle *two* cases: the "normal"
  32. // case where we're just asked to find the next string in a bunch of
  33. // bytes, and the "end" case where the input has ended, and we need
  34. // to find any remaining strings (the last of which may not end with a
  35. // newline!
  36. impl Decoder for LinesCodec {
  37. type Item = String;
  38. type Error = std::io::Error;
  39. // Find the next line in buf!
  40. fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
  41. Ok(if let Some(offset) = buf.iter().position(|b| *b == b'\n') {
  42. // We found a newline character in this buffer!
  43. // Cut out the line from the buffer so we don't return it again.
  44. let mut line = buf.split_to(offset + 1);
  45. // And then parse it as UTF-8
  46. Some(
  47. std::str::from_utf8(&line[..line.len() - 1])
  48. .map_err(bad_utf8)?
  49. .to_string(),
  50. )
  51. } else {
  52. // There are no newlines in this buffer, so no lines to speak of.
  53. // Tokio will make sure to call this again when we have more bytes.
  54. None
  55. })
  56. }
  57. // Find the next line in buf when there will be no more data coming.
  58. fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
  59. Ok(match self.decode(buf)? {
  60. Some(frame) => {
  61. // There's a regular line here, so we may as well just return that.
  62. Some(frame)
  63. },
  64. None => {
  65. // There are no more lines in buf!
  66. // We know there are no more bytes coming though,
  67. // so we just return the remainder, if any.
  68. if buf.is_empty() {
  69. None
  70. } else {
  71. Some(
  72. std::str::from_utf8(&buf.take()[..])
  73. .map_err(bad_utf8)?
  74. .to_string(),
  75. )
  76. }
  77. }
  78. })
  79. }
  80. }