Building a runtime

The runtime ‒ all the pieces needed to run an event driven application ‒ isalready available. You don’t need to know this if you want to just use tokio.However, it may be useful to know what happens under the hood, both to gain somemore understanding of the details in case something goes wrong, and to be ableto customize it beyond what the runtime Builder supports.

We are going to build a single threaded runtime, because it is slightly simplerto put together. Not that the default multi threaded one would be conceptuallymore complex, but there are more moving parts around. Knowing the details herecan be a stepping stone to reading the code of the default runtime.

A complete, working example of things discussed here can be found in thegit repository.

The Park trait

The asynchronous world is inherently about waiting for something to happen(and being able to wait for multiple things at once). It is no surprise there’sa trait to abstract over the waiting. It’s called Park.

The idea is, if there’s nothing better to do, the control is passed to thePark until something interesting happens and the control is taken away from itagain or until some specified time passes. It is up to the Park how it spendsthis time. It can either do something useful (processing background jobs) orsimply block the thread in some way.

Some things are the bottom Park implementations ‒ they somehow block thethread. Other things implementing the trait only delegate the park calls to someunderlying object they wrap (with some added functionality), allowing to stackthings onto each other.

The usual components

We definitely need a Reactor to accept external events (like network socketsbeing readable) from the OS. It does so by blocking on epoll, kqueue orother OS-dependent primitive, through the mio crate. This can’t delegate thewaiting to anything else, so the reactor goes to the bottom of our stack.

The reactor is able to notify our futures of data coming over the network andsimilar events, but we need an executor to actually run them. We’ll be using theCurrentThread executor, because we’re building a single-threaded runtime.Use any other executor that suits your needs. The executor needs a Parkunderneath to wait when there are no futures ready to run. It doesn’t implementPark, therefore it must go on the top of the whole stack.

While not strictly necessary, it is useful to be able to run delayed futures ‒timeouts and similar. Therefore, we place the Timer in the middle ‒fortunately, it can be placed on top of one Park and also implements Park.This plays a similar role for timeouts as reactor does for IO-based futures.

In addition, any custom layer can be added. One example could be some kind ofidle bookkeeping component ‒ it would try to repeatedly do a bit of work ifasked to wait and interleave it with letting the park below it also pick upevents. If there was no bookkeeping to be done, it would simply delegate thewaiting.

This is how the creation of the reactor, timer and executor would look like incode:

  1. # extern crate futures;
  2. # extern crate tokio;
  3. # extern crate tokio_executor;
  4. # extern crate tokio_reactor;
  5. # extern crate tokio_timer;
  6. #
  7. # use std::io::Error as IoError;
  8. # use std::time::{Duration, Instant};
  9. #
  10. # use futures::{future, Future};
  11. # use tokio::executor::current_thread::{self, CurrentThread};
  12. # use tokio_reactor::Reactor;
  13. # use tokio_timer::timer::{self, Timer};
  14. # fn run<F: Future<Item = (), Error = std::io::Error>>(f: F) -> Result<(), std::io::Error> {
  15. let reactor = Reactor::new()?;
  16. // The reactor itself will get consumed by timer,
  17. // so we keep a handle to communicate with it.
  18. let reactor_handle = reactor.handle();
  19. let timer = Timer::new(reactor);
  20. let timer_handle = timer.handle();
  21. let mut executor = CurrentThread::new_with_park(timer);
  22. # Ok(())
  23. # }
  24. # fn main() {
  25. # run(futures::future::lazy(|| Ok(()))).unwrap();
  26. # }

This way, if there are futures to execute, they’ll get executed first. Then onceit runs out of ready futures, it’ll look for timeouts to fire. This may generatesome more ready futures (which would get executed next). If no timeouts fire,the timer computes for how long the reactor can safely block and lets it waitfor external events.

Global state

We’ve built the components that do the actual work. But we need a way to buildand submit the work to them. We could do so through the handles, but to do that,we would have to carry them around which would be far from ergonomic.

To avoid the tedious passing of several handles around, the built-in runtimestores them in a thread local storage. Several modules in tokio have awith_default method, which takes the corresponding handle and a closure. Itstores the handle in the thread local storage and runs the closure. It thenrestores the original value of the TLS after the closure finishes.

This way we would run a future with all the default values set, so it can freelyuse them:

  1. # extern crate futures;
  2. # extern crate tokio;
  3. # extern crate tokio_executor;
  4. # extern crate tokio_reactor;
  5. # extern crate tokio_timer;
  6. #
  7. # use std::io::Error as IoError;
  8. # use std::time::{Duration, Instant};
  9. #
  10. # use futures::{future, Future};
  11. # use tokio::executor::current_thread::{self, CurrentThread};
  12. # use tokio_reactor::Reactor;
  13. # use tokio_timer::timer::{self, Timer};
  14. # fn run<F: Future<Item = (), Error = std::io::Error>>(f: F) -> Result<(), std::io::Error> {
  15. # let reactor = Reactor::new()?;
  16. # let reactor_handle = reactor.handle();
  17. # let timer = Timer::new(reactor);
  18. # let timer_handle = timer.handle();
  19. # let mut executor = CurrentThread::new_with_park(timer);
  20. // Binds an executor to this thread
  21. let mut enter = tokio_executor::enter()
  22. .expect("Multiple executors at once");
  23. // Set the defaults before running the closure
  24. let result = tokio_reactor::with_default(
  25. &reactor_handle,
  26. &mut enter,
  27. |enter| timer::with_default(
  28. &timer_handle,
  29. enter,
  30. |enter| {
  31. let mut default_executor =
  32. current_thread::TaskExecutor::current();
  33. tokio_executor::with_default(
  34. &mut default_executor,
  35. enter,
  36. |enter| executor.enter(enter).block_on(f)
  37. )
  38. }
  39. )
  40. );
  41. # Ok(())
  42. # }
  43. # fn main() {
  44. # run(futures::future::lazy(|| Ok(()))).unwrap();
  45. # }

There are a few things of note. First, the enter thing just ensures that wedon’t run multiple executors on the same thread at the same time. Runningmultiple executors would get one of them blocked, which would act in a very notuseful way, therefore this is footgun prevention.

Second, we want to use the same executor as the default executor and defaultcurrent thread executor, and also to run the executor (not only spawn a futureonto it without further waiting). To do both, we need two mutable references toit, which is not possible. To work around that, we set the current threadexecutor (it actually sets itself, in the executor.block_on call, or anysimilar one). We use the TaskExecutor as the default one, which is a proxy towhatever current thread executor is configured at the time of its use.

Finally, the block_on will execute the single future to completion (and willprocess any other futures spawned in the executor as well, but it’ll not waitfor them to finish if f finishes first). The result of the future is bubbledupwards through all the with_default calls and can be returned or used in anyother way. If you want to wait for all the other futures to finish too, there’salso executor.run which can be executed afterwards.

Next up: Introduction