I/O概叙

Rust标准库提供对网络和I/O的支持,例如TCP连接,UDP套接字,读取和写入文件等。但是,这些操作都是同步或阻塞的,这意味着当它们被调用时,当前线程可能会停止执行并进入睡眠状态,直到它被解除阻塞。例如,std::io::Read中的read方法会阻塞当前线程,直到能读取到数据。在使用Future的时候,这种行为会有问题,因为我们希望在等待I/O完成时继续执行我们可能拥有的其他Future。

为了实现这一点,Tokio提供了许多标准库I/O资源的非阻塞版本,例如文件操作TCPUDPUnix套接字。这些操作为长时间运行的操作(如接受新的TCP连接)返回Future,并实现无阻塞 std::io::Readstd::io::Write的async版本,名为AsyncReadAsyncWrite

例如,如果没有可用的数据,非阻塞读写不会阻塞当前线程。相反,它们会立即返回 WouldBlock错误,同时保证(Future::poll)已安排当前任务在以后可以取得进展时被唤醒,例如当网络数据包到达时。

通过使用非阻塞的Tokio I/O类型,如果一个执行I/O操作的Future不能立即执行,也不会阻止其他Future的执行,它只是返回 NotReady,并依赖于任务通知task notification,使poll方法再次被调用,那时该I/O应该会成功且不会阻塞。

在幕后,Tokio使用miotokio-fs跟踪不同futures等待的各种I/O资源的状态,只要其中任何一个的状态发生变化,操作系统就会通知它。

一个服务器例子

要了解它如何工作,请考虑以下echo服务器的实现:

  1. use tokio::prelude::*;
  2. use tokio::net::TcpListener;
  3. // Set up a listening socket, just like in std::net
  4. let addr = "127.0.0.1:12345".parse().unwrap();
  5. let listener = TcpListener::bind(&addr)
  6. .expect("unable to bind TCP listener");
  7. // Listen for incoming connections.
  8. // This is similar to the iterator of incoming connections that
  9. // .incoming() from std::net::TcpListener, produces, except that
  10. // it is an asynchronous Stream of tokio::net::TcpStream instead
  11. // of an Iterator of std::net::TcpStream.
  12. let incoming = listener.incoming();
  13. // Since this is a Stream, not an Iterator, we use the for_each
  14. // combinator to specify what should happen each time a new
  15. // connection becomes available.
  16. let server = incoming
  17. .map_err(|e| eprintln!("accept failed = {:?}", e))
  18. .for_each(|socket| {
  19. // Each time we get a connection, this closure gets called.
  20. // We want to construct a Future that will read all the bytes
  21. // from the socket, and write them back on that same socket.
  22. //
  23. // If this were a TcpStream from the standard library, a read or
  24. // write here would block the current thread, and prevent new
  25. // connections from being accepted or handled. However, this
  26. // socket is a Tokio TcpStream, which implements non-blocking
  27. // I/O! So, if we read or write from this socket, and the
  28. // operation would block, the Future will just return NotReady
  29. // and then be polled again in the future.
  30. //
  31. // While we *could* write our own Future combinator that does an
  32. // (async) read followed by an (async) write, we'll instead use
  33. // tokio::io::copy, which already implements that. We split the
  34. // TcpStream into a read "half" and a write "half", and use the
  35. // copy combinator to produce a Future that asynchronously
  36. // copies all the data from the read half to the write half.
  37. let (reader, writer) = socket.split();
  38. let bytes_copied = tokio::io::copy(reader, writer);
  39. let handle_conn = bytes_copied.map(|amt| {
  40. println!("wrote {:?} bytes", amt)
  41. }).map_err(|err| {
  42. eprintln!("I/O error {:?}", err)
  43. });
  44. // handle_conn here is still a Future, so it hasn't actually
  45. // done any work yet. We *could* return it here; then for_each
  46. // would wait for it to complete before it accepts the next
  47. // connection. However, we want to be able to handle multiple
  48. // connections in parallel, so we instead spawn the future and
  49. // return an "empty" future that immediately resolves so that
  50. // Tokio will _simultaneously_ accept new connections and
  51. // service this one.
  52. tokio::spawn(handle_conn)
  53. });
  54. // The `server` variable above is itself a Future, and hasn't actually
  55. // done any work yet to set up the server. We need to run it on a Tokio
  56. // runtime for the server to really get up and running:
  57. tokio::run(server);

更多例子,请参考 这里.