Example: An Echo Server

We’re going to use what has been covered so far to build an echo server. This is aTokio application that encorporates everything we’ve learned so far. The server willsimply receive messages from the connected client and send back the same message itreceived to the client.

We’ll be able to test this echo server using the basic Tcp client we created in thehello world section.

The full code can be found here.

Setup

First, generate a new crate.

  1. $ cargo new --bin echo-server
  2. cd echo-server

Next, add the necessary dependencies:

  1. [dependencies]
  2. tokio = "0.1"

and the crates and types into scope in main.rs:

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. extern crate futures;
  4. use tokio::io;
  5. use tokio::net::TcpListener;
  6. use tokio::prelude::*;
  7. # fn main() {}

Now, we setup the necessary structure for a server:

  • Bind a TcpListener to a local port.
  • Define a task that accepts inbound connections and processes them.
  • Spawn the server task.
  • Start the Tokio runtime
    Again, no work actually happens until the server task is spawned on theexecutor.
  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate futures;
  4. #
  5. # use tokio::prelude::*;
  6. # use tokio::net::TcpListener;
  7. fn main() {
  8. let addr = "127.0.0.1:6142".parse().unwrap();
  9. let listener = TcpListener::bind(&addr).unwrap();
  10. // Here we convert the `TcpListener` to a stream of incoming connections
  11. // with the `incoming` method. We then define how to process each element in
  12. // the stream with the `for_each` combinator method
  13. let server = listener.incoming().for_each(|socket| {
  14. // TODO: Process socket
  15. Ok(())
  16. })
  17. .map_err(|err| {
  18. // Handle error by printing to STDOUT.
  19. println!("accept error = {:?}", err);
  20. });
  21. println!("server running on localhost:6142");
  22. # // `select` completes when the first of the two futures completes. Since
  23. # // future::ok() completes immediately, the server won't hang waiting for
  24. # // more connections. This is just so the doc test doesn't hang.
  25. # let server = server.select(futures::future::ok(())).then(|_| Ok(()));
  26. // Start the server
  27. //
  28. // This does a few things:
  29. //
  30. // * Start the Tokio runtime
  31. // * Spawns the `server` task onto the runtime.
  32. // * Blocks the current thread until the runtime becomes idle, i.e. all
  33. // spawned tasks have completed.
  34. tokio::run(server);
  35. }

Here we’ve created a TcpListener that can listen for incoming TCP connections. On thelistener we call incoming which turns the listener into a Stream of inbound clientconnections. We then call for_each which will yield each inbound client connection.For now we’re not doing anything with this inbound connection - that’s our next step.

Once we have our server, we can give it to tokio::run. Up until this point ourserver feature has done nothing. It’s up to the Tokio runtime to drive our future tocompletion.

Note: We must call map_err on our server future because tokio::run expectsa future with Item of type () and Error of type (). This is to ensure thatwe handle all values and errors before handing off the future to the runtime.

Handling the connections

Now that we have incoming client connections, we should handle them.

We just want to copy all data read from the socket back onto the socket itself(e.g. “echo”). We can use the standard io::copy function to do precisely this.

The copy function takes two arguments, where to read from and where to write to.We only have one argument, though, with socket. Luckily there’s a method, split, which will split a readable and writeable stream into its two halves. Thisoperation allows us to work with each stream independently, such as pass them as twoarguments to the copy function.

The copy function then returns a future, and this future will be resolved when thecopying operation is complete, resolving to the amount of data that was copied.

Let’s take a look at the closure we passed to for_each again.

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate futures;
  4. #
  5. # use tokio::prelude::*;
  6. # use tokio::net::TcpListener;
  7. # use tokio::io;
  8. # fn main() {
  9. # let addr = "127.0.0.1:6142".parse().unwrap();
  10. # let listener = TcpListener::bind(&addr).unwrap();
  11. let server = listener.incoming().for_each(|socket| {
  12. // split the socket stream into readable and writable parts
  13. let (reader, writer) = socket.split();
  14. // copy bytes from the reader into the writer
  15. let amount = io::copy(reader, writer);
  16. let msg = amount.then(|result| {
  17. match result {
  18. Ok((amount, _, _)) => println!("wrote {} bytes", amount),
  19. Err(e) => println!("error: {}", e),
  20. }
  21. Ok(())
  22. });
  23. // spawn the task that handles the client connection socket on to the
  24. // tokio runtime. This means each client connection will be handled
  25. // concurrently
  26. tokio::spawn(msg);
  27. Ok(())
  28. })
  29. # .map_err(|_| ());
  30. # let server = server.select(futures::future::ok(())).then(|_| Ok(()));
  31. # tokio::run(server);
  32. # }

As you can see we’ve split the socket stream into readable and writable parts. Wethen used io::copy to read from reader and write into writer. We use the thencombinator to look at the amount future’s Item and Error as a Result printingsome diagnostics.

The call to tokio::spawn is the key here. We crucially want all clients to makeprogress concurrently, rather than blocking one on completion of another. To achievethis we use the tokio::spawn function to execute the work in the background.

If we did not do this then each invocation of the block in for_each would beresolved at a time meaning we could never have two client connections processedconcurrently!

The full code can be found here.

Next up: Overview