Tasks

Tasks are the application’s “unit of logic”. They are similar to Go’sgoroutine and Erlang’s process, but asynchronous. In other words, tasks areasynchronous green threads.

Given that a task runs an asynchronous bit of logic, they are represented by theFuture trait. The task’s future implementation completes with a () valueonce the task is done processing.

Tasks are passed to executors, which handle scheduling the task. An executorusually is scheduling many tasks across a single or small set of threads.Tasks must not perform computation heavy logic or they will prevent othertasks from executing. So don’t try to compute the fibonacci sequence as atask.

Tasks are implemented by either implementing the Future trait directly or bybuilding up a future using the various combinator functions available in thefutures and tokio crates.

Here is an example that fetches the value from a URI using an HTTP get andcaches the result.

The logic is as follows:

  • Check the cache to see if there is an entry for the URI.
  • If there is no entry, perform the HTTP get.
  • Store the response in the cache.
  • Return the response.
    The entire sequence of events is also wrapped with a timeout in order to preventunbounded execution time.
  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::prelude::*;
  4. # use futures::future::{self, Either, empty};
  5. # use std::time::Duration;
  6. # fn docx() {
  7. #
  8. # pub struct Timeout;
  9. # impl Timeout {
  10. # pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
  11. # empty()
  12. # }
  13. # }
  14. # pub struct MyExecutor;
  15. # impl MyExecutor {
  16. # fn spawn<T>(&self, _: T) {
  17. # unimplemented!();
  18. # }
  19. # }
  20. # pub struct Error;
  21. // The functions here all return `impl Future<...>`. This is one
  22. // of a number of ways to return futures. For more details on
  23. // returning futures, see the "Returning futures" section in
  24. // "Going deeper: Futures".
  25. /// Get a URI from some remote cache.
  26. fn cache_get(uri: &str)
  27. -> impl Future<Item = Option<String>, Error = Error>
  28. # { empty() } /*
  29. { ... }
  30. # */
  31. fn cache_put(uri: &str, val: String)
  32. -> impl Future<Item = (), Error = Error>
  33. # { empty() } /*
  34. { ... }
  35. # */
  36. /// Do a full HTTP get to a remote URL
  37. fn http_get(uri: &str)
  38. -> impl Future<Item = String, Error = Error>
  39. # { empty() } /*
  40. { ... }
  41. # */
  42. #
  43. # let my_executor = MyExecutor;
  44. fn fetch_and_cache(url: &str)
  45. -> impl Future<Item = String, Error = Error>
  46. {
  47. // The URL has to be converted to a string so that it can be
  48. // moved into the closure. Given futures are asynchronous,
  49. // the stack is not around anymore by the time the closure is called.
  50. let url = url.to_string();
  51. let response = http_get(&url)
  52. .and_then(move |response| {
  53. cache_put(&url, response.clone())
  54. .map(|_| response)
  55. });
  56. Box::new(response)
  57. }
  58. let url = "https://example.com";
  59. let response = cache_get(url)
  60. .and_then(|resp| {
  61. // `Either` is a utility provided by the `futures` crate
  62. // that enables returning different futures from a single
  63. // closure without boxing.
  64. match resp {
  65. Some(resp) => Either::A(future::ok(resp)),
  66. None => {
  67. Either::B(fetch_and_cache(url))
  68. }
  69. }
  70. });
  71. // Only let the task run for up to 20 seconds.
  72. //
  73. // This uses a fictional timer API. Use the `tokio-timer` crate for
  74. // all your actual timer needs.
  75. let task = Timeout::new(response, Duration::from_secs(20));
  76. my_executor.spawn(task);
  77. # }
  78. # fn main() {}

Because the steps are all necessary for the task to complete, it makes sense togroup them all within the same task.

However, if instead of updating the cache on a cache-miss, we wanted to updatethe cache value on an interval, then it would make sense to split that intomultiple tasks as the steps are no longer directly related.

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::prelude::*;
  4. # use futures::future::{self, Either, empty};
  5. # use std::time::Duration;
  6. # fn docx() {
  7. #
  8. # pub struct Timeout;
  9. # impl Timeout {
  10. # pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
  11. # empty()
  12. # }
  13. # }
  14. # pub struct Interval;
  15. # impl Interval {
  16. # pub fn new(_: Duration) -> Box<Stream<Item = (), Error = Error>> {
  17. # unimplemented!();
  18. # }
  19. # }
  20. # pub struct MyExecutor;
  21. # impl MyExecutor {
  22. # fn spawn<T>(&self, _: T) {
  23. # unimplemented!();
  24. # }
  25. # }
  26. # pub struct Error;
  27. #
  28. # fn cache_get(uri: &str)
  29. # -> impl Future<Item = Option<String>, Error = Error>
  30. # { empty() }
  31. # fn cache_put(uri: &str, val: String)
  32. # -> impl Future<Item = (), Error = Error>
  33. # { empty() }
  34. # fn http_get(uri: &str)
  35. # -> impl Future<Item = String, Error = Error>
  36. # { empty() }
  37. # fn fetch_and_cache(url: &str)
  38. # -> impl Future<Item = String, Error = Error>
  39. # { empty() }
  40. # let my_executor = MyExecutor;
  41. let url = "https://example.com";
  42. // An Interval is a stream that yields `()` on a fixed interval.
  43. let update_cache = Interval::new(Duration::from_secs(60))
  44. // On each tick of the interval, update the cache. This is done
  45. // by using the same function from the previous snippet.
  46. .for_each(|_| {
  47. fetch_and_cache(url)
  48. .map(|resp| println!("updated cache with {}", resp))
  49. });
  50. // Spawn the cache update task so that it runs in the background
  51. my_executor.spawn(update_cache);
  52. // Now, only get from the cache.
  53. // (NB: see next section about ensuring the cache is up to date.)
  54. let response = cache_get(url);
  55. let task = Timeout::new(response, Duration::from_secs(20));
  56. my_executor.spawn(task);
  57. # }
  58. # fn main() {}

Message Passing

Just as with Go and Erlang, tasks can communicate using message passing. Infact, it will be very common to use message passing to coordinate multipletasks. This allows independent tasks to still interact.

The futures crate provides a sync module which contains some channeltypes that are ideal for message passing across tasks.

  • oneshot is a channel for sending exactly one value.
  • mpsc is a channel for sending many (zero or more) values.
    The previous example isn’t exactly correct. Given that tasks are executedconcurrently, there is no guarantee that the cache updating task will havewritten the first value to the cache by the time the other task tries to readfrom the cache.

This is a perfect situation to use message passing. The cache updating task cansend a message notifying the other task that it has primed the cache with aninitial value.

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # use futures::prelude::*;
  4. # use futures::future::{self, Either, empty};
  5. # use futures::sync::oneshot;
  6. # use std::time::Duration;
  7. # fn docx() {
  8. #
  9. # pub struct Timeout;
  10. # impl Timeout {
  11. # pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
  12. # empty()
  13. # }
  14. # }
  15. # pub struct Interval;
  16. # impl Interval {
  17. # pub fn new(_: Duration) -> Box<Stream<Item = (), Error = Error>> {
  18. # unimplemented!();
  19. # }
  20. # }
  21. # pub struct MyExecutor;
  22. # impl MyExecutor {
  23. # fn spawn<T>(&self, _: T) {
  24. # unimplemented!();
  25. # }
  26. # }
  27. # pub struct Error;
  28. #
  29. # fn cache_get(uri: &str)
  30. # -> impl Future<Item = Option<String>, Error = Error>
  31. # { empty() }
  32. # fn cache_put(uri: &str, val: String)
  33. # -> impl Future<Item = (), Error = Error>
  34. # { empty() }
  35. # fn http_get(uri: &str)
  36. # -> impl Future<Item = String, Error = Error>
  37. # { empty() }
  38. # fn fetch_and_cache(url: &str)
  39. # -> impl Future<Item = String, Error = Error>
  40. # { empty() }
  41. # let my_executor = MyExecutor;
  42. let url = "https://example.com";
  43. let (primed_tx, primed_rx) = oneshot::channel();
  44. let update_cache = fetch_and_cache(url)
  45. // Now, notify the other task that the cache is primed
  46. .then(|_| primed_tx.send(()))
  47. // Then we can start refreshing the cache on an interval
  48. .then(|_| {
  49. Interval::new(Duration::from_secs(60))
  50. .for_each(|_| {
  51. fetch_and_cache(url)
  52. .map(|resp| println!("updated cache with {}", resp))
  53. })
  54. });
  55. // Spawn the cache update task so that it runs in the background
  56. my_executor.spawn(update_cache);
  57. // First, wait for the cache to primed
  58. let response = primed_rx
  59. .then(|_| cache_get(url));
  60. let task = Timeout::new(response, Duration::from_secs(20));
  61. my_executor.spawn(task);
  62. # }
  63. # fn main() {}

Task Notification

An application built with Tokio is structured as a set of concurrently runningtasks. Here is the basic structure of a server:

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # extern crate tokio;
  4. #
  5. # use tokio::io;
  6. # use tokio::net::{TcpListener, TcpStream};
  7. # use tokio::prelude::*;
  8. # use futures::future::empty;
  9. #
  10. # pub fn process(socket: TcpStream) -> impl Future<Item = (), Error = ()> + Send {
  11. # empty()
  12. # }
  13. #
  14. # fn docx() {
  15. # let addr = "127.0.0.1:6142".parse().unwrap();
  16. # let listener = TcpListener::bind(&addr).unwrap();
  17. let server = listener.incoming().for_each(|socket| {
  18. // Spawn a task to process the connection
  19. tokio::spawn(process(socket));
  20. Ok(())
  21. })
  22. .map_err(|_| ()); // Just drop the error
  23. tokio::run(server);
  24. # }
  25. # pub fn main() {}

In this case, we spawn a task for each inbound server socket. However, it isalso possible to implement a server future that processes all inboundconnections on the same socket:

  1. # #![deny(deprecated)]
  2. # extern crate futures;
  3. # extern crate tokio;
  4. # use futures::prelude::*;
  5. # use tokio::net::*;
  6. # use std::io;
  7. # use futures::future::empty;
  8. pub struct Server {
  9. listener: TcpListener,
  10. connections: Vec<Box<Future<Item = (), Error = io::Error> + Send>>,
  11. }
  12. # pub fn process(socket: TcpStream) -> impl Future<Item = (), Error = io::Error> + Send {
  13. # empty()
  14. # }
  15. impl Future for Server {
  16. type Item = ();
  17. type Error = io::Error;
  18. fn poll(&mut self) -> Result<Async<()>, io::Error> {
  19. // First, accept all new connections
  20. loop {
  21. match self.listener.poll_accept()? {
  22. Async::Ready((socket, _)) => {
  23. let connection = process(socket);
  24. self.connections.push(Box::new(connection));
  25. }
  26. Async::NotReady => break,
  27. }
  28. }
  29. // Now, poll all connection futures.
  30. let len = self.connections.len();
  31. for i in (0..len).rev() {
  32. match self.connections[i].poll()? {
  33. Async::Ready(_) => {
  34. self.connections.remove(i);
  35. }
  36. Async::NotReady => {}
  37. }
  38. }
  39. // `NotReady` is returned here because the future never actually
  40. // completes. The server runs until it is dropped.
  41. Ok(Async::NotReady)
  42. }
  43. }
  44. # pub fn main() {}

These two strategies are functionally equivalent, but have significantlydifferent runtime characteristics.

Notifications happens at the task level. The task does not know whichsub future triggered the notification. So, whenever the task is polled, it hasto try polling all sub futures.

Layout of a task


Layout of a task

In this task, there are three sub futures that can get polled. If a resourcecontained by one of the sub futures transitions to “ready”, the task itself getsnotified and it will try to poll all three of its sub futures. One of them willadvance, which in turn advances the internal state of the task.

The key is to try to keep tasks small, doing as little as possible per task.This is why servers spawn new tasks for each connection instead of processingthe connections in the same task as the listener.

Ok, there actually is a way for the task to know which sub future triggered thenotification using FuturesUnordered, but usually the right thing to do is tospawn a new task.

Next up: Runtime Model