Non-blocking I/O

This section describes the network resources and drivers provided by Tokio. Thiscomponent provides one of Tokio’s primary functions: non-blocking, event-driven,networking provided by the appropriate operating system primitives (epoll,kqueue, IOCP, …). It is modeled after the resource and driver patterndescribed in the previous section.

The network driver is built using mio and network resources are backed bytypes that implement Evented.

This guide will be focused on TCP types. The other network resources (UDP, unixsockets, pipes, etc) follow the same pattern.

The network resource.

Network resources are types, such as TcpListener and TcpStream, that arecomposed of the network handle and a reference to the driver that is poweringthe resource. Initially, when the resource is first created, the driver pointermay be None:

  1. # extern crate tokio;
  2. # use tokio::net::TcpListener;
  3. # use std::net::SocketAddr;
  4. # fn dox(addr: SocketAddr) {
  5. let listener = TcpListener::bind(&addr).unwrap();
  6. # }

In this case, the reference to the driver is not yet set. However, if aconstructor that takes a Handle reference is used, then the driver referencewill be set to driver represented by the given handle:

  1. # extern crate tokio;
  2. # use tokio::net::TcpListener;
  3. # use tokio::reactor::Handle;
  4. # use std::net::TcpListener as StdListener;
  5. # fn dox(std_listener: StdListener, my_reactor_handle: &Handle) {
  6. let listener = TcpListener::from_std(std_listener, &my_reactor_handle);
  7. # }

Once a driver is associated with a resource, it is set for the lifetime of theresource and cannot be changed. The associated driver is responsible forreceiving operating system events for the network resource and notifying thetasks that have expressed interest in the resource.

Using the resource

Resource types include non-blocking functions that are prefixed with poll_ andthat include Async in the return type. These are the functions that are linkedwith the task system and should be used from tasks and are used as part of[Future] implementations. For example, TcpStream provides [poll_read] and[poll_write]. TcpListener provides [poll_accept].

Here is a task that uses [poll_accept] to accept inbound sockets from alistener and handle them by spawning a new task:

  1. struct Acceptor {
  2. listener: TcpListener,
  3. }
  4. impl Future for Acceptor {
  5. type Item = ();
  6. type Error = ();
  7. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  8. loop {
  9. let (socket, _) = try_ready!(self.listener.poll_accept());
  10. // Spawn a task to process the socket
  11. tokio::spawn(process(socket));
  12. }
  13. }
  14. }

Resource types may also include functions that return futures. These arehelpers that use the poll_ functions to provide additional functionality. Forexample, TcpStream provides a [connect] function that returns a future.This future will complete once the TcpStream has established a connectionwith a peer (or failed attemepting to do so).

Using combinators to connect a TcpStream:

  1. # extern crate tokio;
  2. # use tokio::prelude::*;
  3. # use tokio::net::TcpStream;
  4. # use std::io;
  5. # fn process<T>(t: T) -> impl Future<Item = (), Error = io::Error> {
  6. # Ok(()).into_future()
  7. # }
  8. # fn dox() {
  9. # let addr = "127.0.0.1:0".parse().unwrap();
  10. tokio::spawn({
  11. let connect_future = TcpStream::connect(&addr);
  12. connect_future
  13. .and_then(|socket| process(socket))
  14. .map_err(|_| panic!())
  15. });
  16. # }

Futures may also be used directly from other future implementations:

  1. struct ConnectAndProcess {
  2. connect: ConnectFuture,
  3. }
  4. impl Future for ConnectAndProcess {
  5. type Item = ();
  6. type Error = ();
  7. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  8. let socket = try_ready!(self.connect.poll());
  9. tokio::spawn(process(socket));
  10. Ok(Async::Ready(()))
  11. }
  12. }

Registering the resource with the driver

When using TcpListener::poll_accept (or any poll_ function),if the resource is ready to return immediately then it will do so. In the caseof poll_accept, being ready means that there is a socketwaiting to be accepted in the queue. If the resource is *not ready, i.e.there is no pending socket to accept, then the resource asks the driver tonotify the current task once it becomes ready.

The first time NotReady is returned by a resource, if the resource was notexplicity assigned a driver using a Handle argument, the resource will registeritself with a driver instance. This is done by looking at the network driverassociated with the current execution context.

The default driver for the execution context is stored using a thread-local, setusing with_default, and accessed using Handle::current. It is theruntime’s responsibility to ensure that the task is polled from within theclosure passed to with_default. A call to Handle::current accesses thethread-local set by with_default in order to return the handle to thedriver for the current execution context.

Handle::current vs Handle::default

Both Handle::current and Handle::default return a Handle instance.They are, however, subtly different. Most often, Handle::default is thedesired behavior.

Handle::current immediately reads the thread-local variable storing thedriver for the current driver. This means that Handle::current must be calledfrom an execution context that set the default driver. Handle::current shouldbe used when the handle is going to be sent to a different execution contextsand the user wishes that a specific reactor is used (see below for an example).

On the other hand, Handle::default lazily reads the thread-local variable.This allows getting a Handle instance from outside of an execution context.When the resource is used, the handle will access the thread-local variable asdescribed in the previous section.

For example:

  1. # extern crate tokio;
  2. # use tokio::prelude::*;
  3. # use tokio::net::TcpListener;
  4. # use tokio::reactor::Handle;
  5. # use std::net::SocketAddr;
  6. # fn process<T>(t: T) -> impl Future<Item = (), Error = ()> {
  7. # Ok(()).into_future()
  8. # }
  9. fn main() {
  10. let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
  11. let std_listener = ::std::net::TcpListener::bind(&addr).unwrap();
  12. let listener = TcpListener::from_std(std_listener, &Handle::default()).unwrap();
  13. tokio::run({
  14. listener.incoming().for_each(|socket| {
  15. tokio::spawn(process(socket));
  16. Ok(())
  17. })
  18. .map_err(|_| panic!("error"))
  19. });
  20. }

In this example, incoming() returns a future that is implemented by callingpoll_accept. The future is spawned onto a runtime, which has a network driverconfigured as part of the execution context. When poll_accept is called fromwithin the execution context, that is when the thread-local is read and thedriver is associated with the TcpListener instance.

However, if tokio-threadpool is used directly, then tasks spawned onto thethreadpool executor will not have access to a reactor:

  1. # extern crate tokio;
  2. # extern crate tokio_threadpool;
  3. # use tokio_threadpool::*;
  4. # use tokio::prelude::*;
  5. # use tokio::net::TcpListener;
  6. # fn dox() {
  7. # let addr = "127.0.0.1:0".parse().unwrap();
  8. let pool = ThreadPool::new();
  9. let listener = TcpListener::bind(&addr).unwrap();
  10. pool.spawn({
  11. listener.incoming().for_each(|socket| {
  12. // This will never get called due to the listener not being able to
  13. // function.
  14. unreachable!();
  15. # Ok(())
  16. })
  17. .map_err(|_| panic!("error"))
  18. });
  19. # }

In order to make the above example work, a reactor must be set for thethreadpool’s execution context. See building a runtime for moredetails. Alternatively, a Handle obtained with [Handle::current] could beused:

  1. # extern crate futures;
  2. # extern crate tokio;
  3. # extern crate tokio_threadpool;
  4. # use tokio::prelude::*;
  5. # use tokio::net::TcpListener;
  6. # use tokio_threadpool::*;
  7. # use tokio::reactor::Handle;
  8. # use futures::future;
  9. # use std::net::SocketAddr;
  10. # fn dox() {
  11. # let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
  12. let pool = ThreadPool::new();
  13. // This does not run on the pool.
  14. tokio::run(future::lazy(move || {
  15. // Get the handle
  16. let handle = Handle::current();
  17. let std_listener = std::net::TcpListener::bind(&addr).unwrap();
  18. // This eagerly links the listener with the handle for the current reactor.
  19. let listener = TcpListener::from_std(std_listener, &handle).unwrap();
  20. pool.spawn({
  21. listener.incoming().for_each(|socket| {
  22. // Do something with the socket
  23. Ok(())
  24. })
  25. .map_err(|_| panic!())
  26. });
  27. Ok(())
  28. }));
  29. # }

The network driver

The driver that powers all of Tokio’s network types is the [Reactor] type inthe [tokio-reactor] crate. It is implemented using mio. Calls to[Reactor::turn] uses [mio::Poll::poll] to get operating system events forregistered network resources. It then notifies the registered tasks for eachnetwork resource using the [task system]. The tasks then get scheduled to run ontheir associated executors and the task then sees the network resource as readyand calls to poll_* functions return Async::Ready.

Linking the driver with resources

The driver must track each resource that is registered with it. While the actualimplementation is more complex, it can be thought as a shared reference to acell sharing state, similar to:

  1. struct Registration {
  2. // The registration needs to know its ID. This allows it to remove state
  3. // from the reactor when it is dropped.
  4. id: Id,
  5. // The task that owns the resource and is registered to receive readiness
  6. // notifications from the driver.
  7. //
  8. // If `task` is `Some`, we **definitely** know that the resource
  9. // is not ready because we have not yet received an operating system event.
  10. // This allows avoiding syscalls that will return `NotReady`.
  11. //
  12. // If `task` is `None`, then the resource **might** be ready. We can try the
  13. // syscall, but it might still return `NotReady`.
  14. task: Option<task::Task>,
  15. }
  16. struct TcpListener {
  17. mio_listener: mio::TcpListener,
  18. registration: Option<Arc<Mutex<Registration>>>,
  19. }
  20. struct Reactor {
  21. poll: mio::Poll,
  22. resources: HashMap<Id, Arc<Mutex<Registration>>>,
  23. }

This is not the real implementation, but a simplified version to demonstratethe behavior. In practice, there is no Mutex, cells are not allocated perresource instance, and the reactor does not use a HashMap. The realimplementation can be found here

When the resource is first used, it is registered with the driver:

  1. impl TcpListener {
  2. fn poll_accept(&mut self) -> Poll<TcpStream, io::Error> {
  3. // If the registration is not set, this will associate the `TcpListener`
  4. // with the current execution context's reactor.
  5. let registration = self.registration.get_or_insert_with(|| {
  6. // Access the thread-local variable that tracks the reactor.
  7. Reactor::with_current(|reactor| {
  8. // Registers the listener, which implements `mio::Evented`.
  9. // `register` returns the registration instance for the resource.
  10. reactor.register(&self.mio_listener)
  11. })
  12. });
  13. if registration.task.is_none() {
  14. // The task is `None`, this means the resource **might** be ready.
  15. match self.mio_listener.accept() {
  16. Ok(socket) => {
  17. let socket = mio_socket_to_tokio(socket);
  18. return Ok(Async::Ready(socket));
  19. }
  20. Err(ref e) if e.kind() == WouldBlock => {
  21. // The resource is not ready, fall through to task registration
  22. }
  23. Err(e) => {
  24. // All other errors are returned to the caller
  25. return Err(e);
  26. }
  27. }
  28. }
  29. // The task is set even if it is already `Some`, this handles the case where
  30. // the resource is moved to a different task than the one stored in
  31. // `self.task`.
  32. registration.task = Some(task::current());
  33. Ok(Async::NotReady)
  34. }
  35. }

Note that there is only a single task field per resource. The implications arethat a resource can only be used from a single task at a time. IfTcpListener::poll_accept returns NotReady, registering the current task andthe listener is then sent to a different task which calls poll_accept and seesNotReady, then the second task is the only one that will receive anotification once a socket is ready to be accepted. Resources may supporttracking different tasks for different operations. For example, TcpStreaminternally has two task fields: one for notifying on read ready and one fornotifying on write ready. This allows TcpStream::poll_read andTcpStream::poll_write to be called from different tasks.

The evented types are registered with the driver’s mio::Poll instance aspart of the register function used above. Again, this guide uses asimplified implementation which does not match the actual one intokio-reactor but is sufficient for understanding how tokio-reactor behaves.

  1. impl Reactor {
  2. fn register<T: mio::Evented>(&mut self, evented: &T) -> Arc<Mutex<Registration>> {
  3. // Generate a unique identifier for this registration. This identifier
  4. // can be converted to and from a Mio Token.
  5. let id = generate_unique_identifier();
  6. // Register the I/O type with Mio
  7. self.poll.register(
  8. evented, id.into_token(),
  9. mio::Ready::all(),
  10. mio::PollOpt::edge());
  11. let registration = Arc::new(Mutex::new(Registration {
  12. id,
  13. task: None,
  14. }));
  15. self.resources.insert(id, registration.clone());
  16. registration
  17. }
  18. }

Running the driver

The driver needs to run in order for its associated resources to function. Ifthe driver does not run, the resources will never become ready. Running thedriver is handled automatically when using a Runtime, but it is useful tounderstand how it works. If you are interested in the real implementation, thetokio-reactor source is the best reference.

When resources are registered with the driver, they are also registered withMio. Running the driver performs the following steps in a loop:

1) Call Poll::poll to get operating system events.2) Dispatch all events to the appropriate resources via the registration.

The steps above are done by calling Reactor::turn. The looping part is up tous. This is typically done in a background thread or embedded in the executor asa Park implementation. See the runtime guide for more details.

  1. # extern crate tokio_reactor;
  2. # fn dox() {
  3. # let mut reactor = tokio_reactor::Reactor::new().unwrap();
  4. loop {
  5. // `None` means never timeout, blocking until we receive an operating system
  6. // event.
  7. reactor.turn(None);
  8. }
  9. # }

The implementation of turn does the following:

  1. fn turn(&mut self) {
  2. // Create storage for operating system events. This shouldn't be created
  3. // each time `turn` is called, but doing so does not impact behavior.
  4. let mut events = mio::Events::with_capacity(1024);
  5. self.poll.poll(&mut events, timeout);
  6. for event in &events {
  7. let id = Id::from_token(event.token());
  8. if let Some(registration) = self.resources.get(&id) {
  9. if let Some(task) = registration.lock().unwrap().task.take() {
  10. task.notify();
  11. }
  12. }
  13. }
  14. }

Notifying the task results in the task getting scheduled on its executor. Whenthe task runs again, it will call the poll_accept function again. This time,the task slot will be None. This means the syscall should be attempted, andthis time poll_accept will return an accepted socket (probably, spurious events are permitted).