使用构建流

Tokio有助手将字节流转换为帧流。 字节流的示例包括TCP连接,管道,文件对象以及标准输入和输出文件描述符。 在Rust中,流很容易识别,因为它们实现了读写 trait

框架消息的最简单形式之一是行分隔消息。 每条消息都以\ n字符结尾。 让我们看一下如何使用tokio实现行分隔消息流。

编写编解码器

编解码器实现了tokio_codec :: Decodertokio_codec :: Encoder trait。 它的工作是将帧转换为字节和从字节转换。 这些 trait与tokio_codec :: Framed结构一起使用,以提供字节流的缓冲,解码和编码。

让我们看一下LinesCodec结构的简化版本,它实现了行分隔消息的解码和编码。

  1. pub struct LinesCodec {
  2. // Stored index of the next index to examine for a `\n` character.
  3. // This is used to optimize searching.
  4. // For example, if `decode` was called with `abc`, it would hold `3`,
  5. // because that is the next index to examine.
  6. // The next time `decode` is called with `abcde\n`, the method will
  7. // only look at `de\n` before returning.
  8. next_index: usize,
  9. }

这里的注释解释了,由于字节被缓冲直到找到一行,因此每次收到数据时从缓冲区的开头搜索\ n是很浪费的。 保持缓冲区的最后长度并在收到新数据时从那里开始搜索更有效。

当在底层流上接收数据时,调用Decoder :: decode方法。 该方法可以生成一个帧或返回Ok(None)来表示它需要更多的数据来生成一个帧。 解码方法负责通过使用BytesMut方法将其拆分来删除不再需要缓冲的数据。 如果未删除数据,缓冲区将继续增长。

我们来看看如何为LinesCodec实现Decoder :: decode

  1. fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
  2. // Look for a byte with the value '\n' in buf. Start searching from the search start index.
  3. if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n')
  4. {
  5. // Found a '\n' in the string.
  6. // The index of the '\n' is at the sum of the start position + the offset found.
  7. let newline_index = newline_offset + self.next_index;
  8. // Split the buffer at the index of the '\n' + 1 to include the '\n'.
  9. // `split_to` returns a new buffer with the contents up to the index.
  10. // The buffer on which `split_to` is called will now start at this index.
  11. let line = buf.split_to(newline_index + 1);
  12. // Trim the `\n` from the buffer because it's part of the protocol,
  13. // not the data.
  14. let line = &line[..line.len() - 1];
  15. // Convert the bytes to a string and panic if the bytes are not valid utf-8.
  16. let line = str::from_utf8(&line).expect("invalid utf8 data");
  17. // Set the search start index back to 0.
  18. self.next_index = 0;
  19. // Return Ok(Some(...)) to signal that a full frame has been produced.
  20. Ok(Some(line.to_string()))
  21. } else {
  22. // '\n' not found in the string.
  23. // Tell the next call to start searching after the current length of the buffer
  24. // since all of it was scanned and no '\n' was found.
  25. self.next_index = buf.len();
  26. // Ok(None) signifies that more data is needed to produce a full frame.
  27. Ok(None)
  28. }
  29. }

当必须将帧写入底层流时,将调用Encoder :: encode方法。 必须将帧写入作为参数接收的缓冲区。 写入缓冲区的数据将在准备好发送数据时写入流。

现在让我们看看如何为LinesCodec实现Encoder :: encode。

  1. fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
  2. // It's important to reserve the amount of space needed. The `bytes` API
  3. // does not grow the buffers implicitly.
  4. // Reserve the length of the string + 1 for the '\n'.
  5. buf.reserve(line.len() + 1);
  6. // String implements IntoBuf, a trait used by the `bytes` API to work with
  7. // types that can be expressed as a sequence of bytes.
  8. buf.put(line);
  9. // Put the '\n' in the buffer.
  10. buf.put_u8(b'\n');
  11. // Return ok to signal that no error occured.
  12. Ok(())
  13. }

编码信息通常更简单。 这里我们只需保留所需的空间并将数据写入缓冲区。

使用编解码器

使用编解码器的最简单方法是使用Framed结构。 它是实现自动缓冲的编解码器的包装器。 Framed结构体既是Stream又是Sink。 因此,您可以从中接收帧并向其发送帧。

您可以使用AsyncRead :: framed方法使用任何实现AsyncRead和AsyncWrite trait的类型创建Framed结构。

  1. TcpStream::connect(&addr).and_then(|sock| {
  2. let framed_sock = Framed::new(sock, LinesCodec::new());
  3. framed_sock.for_each(|line| {
  4. println!("Received line {}", line);
  5. Ok(())
  6. })
  7. });