I/O with Tokio

The tokio crate comes with TCP and UDP networking types. Unlike the types instd, Tokio’s networking types are based on the poll model and will notify thetask executors when their readiness states change (data is received and writebuffers are flushed). In the tokio::net module you’ll find types likeTcpListener, TcpStream, and UdpSocket.

All of these types provide both a future API as well as a pollAPI.

The Tokio net types are powered by a Mio based reactor that, by default, isstarted up lazily on a background thread. See reactor documentation for moredetails.

Using the Future API

We’ve already seen some of this earlier in the guide with the incomingfunction as well as the helpers found in tokio_io::io.

These helpers include:

  • incoming: A stream of inbound TCP connections.
  • read_exact: Read exactly n bytes into a buffer.
  • read_to_end: Read all bytes into a buffer.
  • write_all: Write the entire contents of a buffer.
  • copy: Copy bytes from one I/O handle to another.
    A lot of these functions / helpers are generic over the AsyncRead andAsyncWrite traits. These traits are similar to Read and Write fromstd, but are only for types that are “future aware”, i.e. follow themandated properties:

  • Calls to read or write are nonblocking, they never block the callingthread.

  • If a call would otherwise block then the function returns a value indicating so.If this happens then the current future’s task is scheduled to receive anotification when the I/O is ready again.
    Note that users of AsyncRead and AsyncWrite types should usepoll_read and poll_write instead of directly calling read and write.

For example, here is how to accept connections, read 5 bytes from them, thenwrite the 5 bytes back to the socket:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. #
  4. # use tokio::io;
  5. # use tokio::net::TcpListener;
  6. # use tokio::prelude::*;
  7. # fn main() {
  8. # let addr = "127.0.0.1:6142".parse().unwrap();
  9. # let listener = TcpListener::bind(&addr).unwrap();
  10. let server = listener.incoming().for_each(|socket| {
  11. println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
  12. let buf = vec![0; 5];
  13. let connection = io::read_exact(socket, buf)
  14. .and_then(|(socket, buf)| {
  15. io::write_all(socket, buf)
  16. })
  17. .then(|_| Ok(())); // Just discard the socket and buffer
  18. // Spawn a new task that processes the socket:
  19. tokio::spawn(connection);
  20. Ok(())
  21. })
  22. # ;
  23. # }

Using the Poll API

The Poll based API is to be used when implementing Future by hand and you needto return Async. This is useful when you need to implement your owncombinators that handle custom logic.

For example, this is how the read_exact future could be implemented for aTcpStream.

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # #[macro_use]
  4. # extern crate futures;
  5. # use tokio::io;
  6. # use tokio::prelude::*;
  7. #
  8. # use tokio::net::TcpStream;
  9. # use std::mem;
  10. pub struct ReadExact {
  11. state: State,
  12. }
  13. enum State {
  14. Reading {
  15. stream: TcpStream,
  16. buf: Vec<u8>,
  17. pos: usize,
  18. },
  19. Empty,
  20. }
  21. impl Future for ReadExact {
  22. type Item = (TcpStream, Vec<u8>);
  23. type Error = io::Error;
  24. fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
  25. match self.state {
  26. State::Reading {
  27. ref mut stream,
  28. ref mut buf,
  29. ref mut pos
  30. } => {
  31. while *pos < buf.len() {
  32. let n = try_ready!({
  33. stream.poll_read(&mut buf[*pos..])
  34. });
  35. *pos += n;
  36. if n == 0 {
  37. let err = io::Error::new(
  38. io::ErrorKind::UnexpectedEof,
  39. "early eof");
  40. return Err(err)
  41. }
  42. }
  43. }
  44. State::Empty => panic!("poll a ReadExact after it's done"),
  45. }
  46. match mem::replace(&mut self.state, State::Empty) {
  47. State::Reading { stream, buf, .. } => {
  48. Ok(Async::Ready((stream, buf)))
  49. }
  50. State::Empty => panic!(),
  51. }
  52. }
  53. }
  54. # pub fn main() {}

Datagrams

Note that most of this discussion has been around I/O or byte streams, whichUDP importantly is not! To accommodate this, however, the UdpSocket typealso provides a number of methods for working with it conveniently:

  • send_dgram allows you to express sending a datagram as a future, returningan error if the entire datagram couldn’t be sent at once.
  • recv_dgram expresses reading a datagram into a buffer, yielding both thebuffer and the address it came from.
    Next up: Example: A Chat Server