Concurrency

Our server has a slight problem (besides only responding with errors). It processes inbound requests one at a time. When a connection is accepted, the server stays inside the accept loop block until the response is fully written to the socket.

We want our Redis server to process many concurrent requests. To do this, we need to add some concurrency.

Concurrency and parallelism is not the same thing. If you alternate between two tasks, then you are working on both tasks concurrently, but not in parallel. For it to qualify as parallel, you would need two people, one dedicated to each task.

One of the advantages of using Tokio is that asynchronous code allows you to work on many tasks concurrently, without having to work on them in parallel using ordinary threads. In fact, Tokio can run many tasks concurrently on a single thread!

To process connections concurrently, a new task is spawned for each inbound connection. The connection is processed on this task.

The accept loop becomes:

  1. use tokio::net::TcpListener;
  2. #[tokio::main]
  3. async fn main() {
  4. let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
  5. loop {
  6. let (socket, _) = listener.accept().await.unwrap();
  7. // A new task is spawned for each inbound socket. The socket is
  8. // moved to the new task and processed there.
  9. tokio::spawn(async move {
  10. process(socket).await;
  11. });
  12. }
  13. }

Tasks

A Tokio task is an asynchronous green thread. They are created by passing an async block to tokio::spawn. The tokio::spawn function returns a JoinHandle, which the caller may use to interact with the spawned task. The async block may have a return value. The caller may obtain the return value using .await on the JoinHandle.

For example:

  1. #[tokio::main]
  2. async fn main() {
  3. let handle = tokio::spawn(async {
  4. // Do some async work
  5. "return value"
  6. });
  7. // Do some other work
  8. let out = handle.await.unwrap();
  9. println!("GOT {}", out);
  10. }

Awaiting on JoinHandle returns a Result. When a task encounters an error during execution, the JoinHandle will return an Err. This happens when the task either panics, or if the task is forcefully cancelled by the runtime shutting down.

Tasks are the unit of execution managed by the scheduler. Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do. The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread. The task can also be moved between threads after being spawned.

Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.

'static bound

When you spawn a task on the Tokio runtime, its type must be 'static. This means that the spawned task must not contain any references to data owned outside the task.

It is a common misconception that 'static always means “lives forever”, but this is not the case. Just because a value is 'static does not mean that you have a memory leak. You can read more in Common Rust Lifetime Misconceptions.

For example, the following will not compile:

  1. use tokio::task;
  2. #[tokio::main]
  3. async fn main() {
  4. let v = vec![1, 2, 3];
  5. task::spawn(async {
  6. println!("Here's a vec: {:?}", v);
  7. });
  8. }

Attempting to compile this results in the following error:

  1. error[E0373]: async block may outlive the current function, but
  2. it borrows `v`, which is owned by the current function
  3. --> src/main.rs:7:23
  4. |
  5. 7 | task::spawn(async {
  6. | _______________________^
  7. 8 | | println!("Here's a vec: {:?}", v);
  8. | | - `v` is borrowed here
  9. 9 | | });
  10. | |_____^ may outlive borrowed value `v`
  11. |
  12. note: function requires argument type to outlive `'static`
  13. --> src/main.rs:7:17
  14. |
  15. 7 | task::spawn(async {
  16. | _________________^
  17. 8 | | println!("Here's a vector: {:?}", v);
  18. 9 | | });
  19. | |_____^
  20. help: to force the async block to take ownership of `v` (and any other
  21. referenced variables), use the `move` keyword
  22. |
  23. 7 | task::spawn(async move {
  24. 8 | println!("Here's a vec: {:?}", v);
  25. 9 | });
  26. |

This happens because, by default, variables are not moved into async blocks. The v vector remains owned by the main function. The println! line borrows v. The rust compiler helpfully explains this to us and even suggests the fix! Changing line 7 to task::spawn(async move { will instruct the compiler to move v into the spawned task. Now, the task owns all of its data, making it 'static.

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

Note that the error message talks about the argument type outliving the 'static lifetime. This terminology can be rather confusing because the 'static lifetime lasts until the end of the program, so if it outlives it, don’t you have a memory leak? The explanation is that it is the type, not the value that must outlive the 'static lifetime, and the value may be destroyed before its type is no longer valid.

When we say that a value is 'static, all that means is that it would not be incorrect to keep that value around forever. This is important because the compiler is unable to reason about how long a newly spawned task stays around, so the only way it can be sure that the task doesn’t live too long is to make sure it may live forever.

The link in the info-box above uses the terminology “bounded by 'static“ rather than “its type outlives 'static“ or “the value is 'static“ for T: 'static. These all mean the same thing, and are different from “annotated with 'static“ as in &'static T.

Send bound

Tasks spawned by tokio::spawn must implement Send. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.

Tasks are Send when all data that is held across .await calls is Send. This is a bit subtle. When .await is called, the task yields back to the scheduler. The next time the task is executed, it resumes from the point it last yielded. To make this work, all state that is used after .await must be saved by the task. If this state is Send, i.e. can be moved across threads, then the task itself can be moved across threads. Conversely, if the state is not Send, then neither is the task.

For example, this works:

  1. use tokio::task::yield_now;
  2. use std::rc::Rc;
  3. #[tokio::main]
  4. async fn main() {
  5. tokio::spawn(async {
  6. // The scope forces `rc` to drop before `.await`.
  7. {
  8. let rc = Rc::new("hello");
  9. println!("{}", rc);
  10. }
  11. // `rc` is no longer used. It is **not** persisted when
  12. // the task yields to the scheduler
  13. yield_now().await;
  14. });
  15. }

This does not:

  1. use tokio::task::yield_now;
  2. use std::rc::Rc;
  3. #[tokio::main]
  4. async fn main() {
  5. tokio::spawn(async {
  6. let rc = Rc::new("hello");
  7. // `rc` is used after `.await`. It must be persisted to
  8. // the task's state.
  9. yield_now().await;
  10. println!("{}", rc);
  11. });
  12. }

Attempting to compile the snippet results in:

  1. error: future cannot be sent between threads safely
  2. --> src/main.rs:6:5
  3. |
  4. 6 | tokio::spawn(async {
  5. | ^^^^^^^^^^^^ future created by async block is not `Send`
  6. |
  7. ::: [..]spawn.rs:127:21
  8. |
  9. 127 | T: Future + Send + 'static,
  10. | ---- required by this bound in
  11. | `tokio::task::spawn::spawn`
  12. |
  13. = help: within `impl std::future::Future`, the trait
  14. | `std::marker::Send` is not implemented for
  15. | `std::rc::Rc<&str>`
  16. note: future is not `Send` as this value is used across an await
  17. --> src/main.rs:10:9
  18. |
  19. 7 | let rc = Rc::new("hello");
  20. | -- has type `std::rc::Rc<&str>` which is not `Send`
  21. ...
  22. 10 | yield_now().await;
  23. | ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
  24. | used later
  25. 11 | println!("{}", rc);
  26. 12 | });
  27. | - `rc` is later dropped here

We will discuss a special case of this error in more depth in the next chapter.