Spawning

Tokio based applications are organized in terms of Tasks. A task is a small unitof logic that executes independently from other tasks. It is similar to Go’sgoroutine and Erlang’s process, but asynchronous. In other words, tasks areasynchronous green threads. Tasks are spawned for similar reasons that threadsare spawned in synchronous code, but spawning a task with Tokio is extremelylightweight.

Previous examples defined a future and passed that future to tokio::run. Thisresulted in a task being spawned onto Tokio’s runtime to execute the providedfuture. Additional tasks may be spawned by calling tokio::spawn, but only fromcode that is already running on a Tokio task. One way to think about it is thefuture passed to tokio::run is the “main function”.

In the following example, four tasks are spawned.

  1. extern crate tokio;
  2. extern crate futures;
  3. use futures::future::lazy;
  4. tokio::run(lazy(|| {
  5. for i in 0..4 {
  6. tokio::spawn(lazy(move || {
  7. println!("Hello from task {}", i);
  8. Ok(())
  9. }));
  10. }
  11. Ok(())
  12. }));

The tokio::run function will block until the the future passed to runteriminates as well as all other spawned tasks. In this case, tokio::runblocks until all four tasks output to STDOUT and terminate.

The lazy function runs the closure the first time the future is polled. Itis used here to ensure that tokio::spawn is called from a task. Withoutlazy, tokio::spawn would be called from outside the context of a task,which results in an error.

Communicating with tasks

Just as with Go and Erlang, tasks can communicate using message passing. Infact, it will be very common to use message passing to coordinate multipletasks. This allows independent tasks to still interact.

The futures crate provides a sync module which contains some channeltypes that are ideal for message passing across tasks.

  • oneshot is a channel for sending exactly one value.
  • mpsc is a channel for sending many (zero or more) values.
    A oneshot is ideal for getting the result from a spawned task:
  1. extern crate tokio;
  2. extern crate futures;
  3. use futures::Future;
  4. use futures::future::lazy;
  5. use futures::sync::oneshot;
  6. tokio::run(lazy(|| {
  7. let (tx, rx) = oneshot::channel();
  8. tokio::spawn(lazy(|| {
  9. tx.send("hello from spawned task");
  10. Ok(())
  11. }));
  12. rx.and_then(|msg| {
  13. println!("Got `{}`", msg);
  14. Ok(())
  15. })
  16. .map_err(|e| println!("error = {:?}", e))
  17. }));

And mpsc is good for sending a stream of values to another task:

  1. extern crate tokio;
  2. extern crate futures;
  3. use futures::{stream, Future, Stream, Sink};
  4. use futures::future::lazy;
  5. use futures::sync::mpsc;
  6. tokio::run(lazy(|| {
  7. let (tx, rx) = mpsc::channel(1_024);
  8. tokio::spawn({
  9. stream::iter_ok(0..10).fold(tx, |tx, i| {
  10. tx.send(format!("Message {} from spawned task", i))
  11. .map_err(|e| println!("error = {:?}", e))
  12. })
  13. .map(|_| ()) // Drop tx handle
  14. });
  15. rx.for_each(|msg| {
  16. println!("Got `{}`", msg);
  17. Ok(())
  18. })
  19. }));

These two message passing primitives will also be used in the examples below tocoordinate and communicate between tasks.

Multi threaded

While it is possible to introduce concurrency with futures without spawningtasks, this concurrency will be limited to running on a single thread. Spawningtasks allows the Tokio runtime to schedule these tasks on multiple threads.

The multi-threaded Tokio runtime manages multiple OS threads internally.It multiplexes many tasks across a few physical threads. When a Tokioapplication spawns its tasks, these tasks are submitted to the runtime and theruntime handles scheduling.

When to spawn tasks

As all things software related, the answer is that it depends. Generally, theanswer is spawn a new task whenever you can. The more available tasks, thegreater the ability to run the tasks in parallel. However, keep in mind that ifmultiple tasks do require communication, this will involve channel overhead.

The following examples will help illustrate cases for spawning new tasks.

Processing inbound sockets

The most straightforward example for spawning tasks is a network server.The primary task listens for inbound sockets on a TCP listener. When anew connection arrives, the listener task spawns a new task forprocessing the socket.

  1. extern crate tokio;
  2. extern crate futures;
  3. use tokio::io;
  4. use tokio::net::TcpListener;
  5. use futures::{Future, Stream};
  6. let addr = "127.0.0.1:0".parse().unwrap();
  7. let listener = TcpListener::bind(&addr).unwrap();
  8. # if false {
  9. tokio::run({
  10. listener.incoming().for_each(|socket| {
  11. // An inbound socket has been received.
  12. //
  13. // Spawn a new task to process the socket
  14. tokio::spawn({
  15. // In this example, "hello world" will be written to the
  16. // socket followed by the socket being closed.
  17. io::write_all(socket, "hello world")
  18. // Drop the socket
  19. .map(|_| ())
  20. // Write any error to STDOUT
  21. .map_err(|e| println!("socket error = {:?}", e))
  22. });
  23. // Receive the next inbound socket
  24. Ok(())
  25. })
  26. .map_err(|e| println!("listener error = {:?}", e))
  27. });
  28. # }

The listener task and the tasks that process each socket are completelyunrelated. They do not communicate and either can terminate withoutimpacting the others. This is a perfect use case for spawning tasks.

Background processing

Another case is to spawn a task that runs background computations inservice of other tasks. The primary tasks send data to the backgroundtask for processing but do not care about if and when the data getsprocessed. This also allows a single background task to coalesce datafrom multiple primary tasks.

This requires communication between the primary tasks and the backgroundtask. This is usually handled with an mpsc channel.

The following example is a TCP server that reads data from the remotepeer and tracks the number of received bytes. It then sends the numberof received bytes to a background task. This background task writes thetotal number of bytes read from all socket tasks every 30 seconds.

  1. extern crate tokio;
  2. extern crate futures;
  3. use tokio::io;
  4. use tokio::net::TcpListener;
  5. use tokio::timer::Interval;
  6. use futures::{future, stream, Future, Stream, Sink};
  7. use futures::future::lazy;
  8. use futures::sync::mpsc;
  9. use std::time::Duration;
  10. // Defines the background task. The `rx` argument is the channel receive
  11. // handle. The task will pull `usize` values (which represent number of
  12. // bytes read by a socket) off the channel and sum it internally. Every
  13. // 30 seconds, the current sum is written to STDOUT and the sum is reset
  14. // to zero.
  15. fn bg_task(rx: mpsc::Receiver<usize>)
  16. -> impl Future<Item = (), Error = ()>
  17. {
  18. // The stream of received `usize` values will be merged with a 30
  19. // second interval stream. The value types of each stream must
  20. // match. This enum is used to track the various values.
  21. #[derive(Eq, PartialEq)]
  22. enum Item {
  23. Value(usize),
  24. Tick,
  25. Done,
  26. }
  27. // Interval at which the current sum is written to STDOUT.
  28. let tick_dur = Duration::from_secs(30);
  29. let interval = Interval::new_interval(tick_dur)
  30. .map(|_| Item::Tick)
  31. .map_err(|_| ());
  32. // Turn the stream into a sequence of:
  33. // Item(num), Item(num), ... Done
  34. //
  35. let items = rx.map(Item::Value)
  36. .chain(stream::once(Ok(Item::Done)))
  37. // Merge in the stream of intervals
  38. .select(interval)
  39. // Terminate the stream once `Done` is received. This is necessary
  40. // because `Interval` is an infinite stream and `select` will keep
  41. // selecting on it.
  42. .take_while(|item| future::ok(*item != Item::Done));
  43. // With the stream of `Item` values, start our logic.
  44. //
  45. // Using `fold` allows the state to be maintained across iterations.
  46. // In this case, the state is the number of read bytes between tick.
  47. items.fold(0, |num, item| {
  48. match item {
  49. // Sum the number of bytes with the state.
  50. Item::Value(v) => future::ok(num + v),
  51. Item::Tick => {
  52. println!("bytes read = {}", num);
  53. // Reset the byte counter
  54. future::ok(0)
  55. }
  56. _ => unreachable!(),
  57. }
  58. })
  59. .map(|_| ())
  60. }
  61. # if false {
  62. // Start the application
  63. tokio::run(lazy(|| {
  64. let addr = "127.0.0.1:0".parse().unwrap();
  65. let listener = TcpListener::bind(&addr).unwrap();
  66. // Create the channel that is used to communicate with the
  67. // background task.
  68. let (tx, rx) = mpsc::channel(1_024);
  69. // Spawn the background task:
  70. tokio::spawn(bg_task(rx));
  71. listener.incoming().for_each(move |socket| {
  72. // An inbound socket has been received.
  73. //
  74. // Spawn a new task to process the socket
  75. tokio::spawn({
  76. // Each spawned task will have a clone of the sender handle.
  77. let tx = tx.clone();
  78. // In this example, "hello world" will be written to the
  79. // socket followed by the socket being closed.
  80. io::read_to_end(socket, vec![])
  81. // Drop the socket
  82. .and_then(move |(_, buf)| {
  83. tx.send(buf.len())
  84. .map_err(|_| io::ErrorKind::Other.into())
  85. })
  86. .map(|_| ())
  87. // Write any error to STDOUT
  88. .map_err(|e| println!("socket error = {:?}", e))
  89. });
  90. // Receive the next inbound socket
  91. Ok(())
  92. })
  93. .map_err(|e| println!("listener error = {:?}", e))
  94. }));
  95. # }

Coordinating access to a resource

When working with futures, the preferred strategy for coordinatingaccess to a shared resource (socket, data, etc…) is by using messagepassing. To do this, a dedicated task is spawned to manage the resourceand other tasks interact with the resource by sending messages.

This pattern is very similar to the previous example, but this time thetasks want to receive a message back once the operation is complete. Toimplement this, both mpsc and oneshot channels are used.

The example coordinates access to a transport over a ping / pongprotocol. Pings are sent into the transport and pongs are received.Primary tasks send a message to the coordinator task to initiate a ping,the coordinator task will respond to the ping request with the roundtrip time. The message sent to the coordinator task over thempsc contains a oneshot::Sender allowing the coordinator task torespond.

  1. extern crate tokio;
  2. extern crate futures;
  3. use tokio::io;
  4. use futures::{future, Future, Stream, Sink};
  5. use futures::future::lazy;
  6. use futures::sync::{mpsc, oneshot};
  7. use std::time::{Duration, Instant};
  8. type Message = oneshot::Sender<Duration>;
  9. struct Transport;
  10. impl Transport {
  11. fn send_ping(&self) {
  12. // ...
  13. }
  14. fn recv_pong(&self) -> impl Future<Item = (), Error = io::Error> {
  15. # future::ok(())
  16. // ...
  17. }
  18. }
  19. fn coordinator_task(rx: mpsc::Receiver<Message>)
  20. -> impl Future<Item = (), Error = ()>
  21. {
  22. let transport = Transport;
  23. rx.for_each(move |pong_tx| {
  24. let start = Instant::now();
  25. transport.send_ping();
  26. transport.recv_pong()
  27. .map_err(|_| ())
  28. .and_then(move |_| {
  29. let rtt = start.elapsed();
  30. pong_tx.send(rtt).unwrap();
  31. Ok(())
  32. })
  33. })
  34. }
  35. /// Request an rtt.
  36. fn rtt(tx: mpsc::Sender<Message>)
  37. -> impl Future<Item = (Duration, mpsc::Sender<Message>), Error = ()>
  38. {
  39. let (resp_tx, resp_rx) = oneshot::channel();
  40. tx.send(resp_tx)
  41. .map_err(|_| ())
  42. .and_then(|tx| {
  43. resp_rx.map(|dur| (dur, tx))
  44. .map_err(|_| ())
  45. })
  46. }
  47. # if false {
  48. // Start the application
  49. tokio::run(lazy(|| {
  50. // Create the channel that is used to communicate with the
  51. // background task.
  52. let (tx, rx) = mpsc::channel(1_024);
  53. // Spawn the background task:
  54. tokio::spawn(coordinator_task(rx));
  55. // Spawn a few tasks that use the coordinator to requst RTTs.
  56. for _ in 0..4 {
  57. let tx = tx.clone();
  58. tokio::spawn(lazy(|| {
  59. rtt(tx).and_then(|(dur, _)| {
  60. println!("duration = {:?}", dur);
  61. Ok(())
  62. })
  63. }));
  64. }
  65. Ok(())
  66. }));
  67. # }

When not to spawn tasks

If the amount of coordination via message passing and synchronization primitivesoutweighs the parallism benefits from spawning tasks, then maintaining a singletask is preferred.

For example, it is generally better to maintain reading from and writing to asingle TCP socket on a single task instead of splitting up reading and writingbetween two tasks.

Next up: Leaf futures