Echo server

Let’s practice doing some asynchronous I/O. We will be writing an echo server.

The echo server binds a TcpListener and accepts inbound connections in a loop. For each inbound connection, data is read from the socket and written immediately back to the socket. The client sends data to the server and receives the exact same data back.

We will implement the echo server twice, using slightly different strategies.

Using io::copy()

To start, we will implement the echo logic using the io::copy utility.

This is a TCP server and needs an accept loop. A new task is spawned to process each accepted socket.

  1. use tokio::io;
  2. use tokio::net::TcpListener;
  3. #[tokio::main]
  4. async fn main() -> io::Result<()> {
  5. let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
  6. loop {
  7. let (mut socket, _) = listener.accept().await?;
  8. tokio::spawn(async move {
  9. // Copy data here
  10. });
  11. }
  12. }

As seen earlier, this utility function takes a reader and a writer and copies data from one to the other. However, we only have a single TcpStream. This single value implements both AsyncRead and AsyncWrite. Because io::copy requires &mut for both the reader and the writer, the socket cannot be used for both arguments.

  1. // This fails to compile
  2. io::copy(&mut socket, &mut socket).await

Splitting a reader + writer

To work around this problem, we must split the socket into a reader handle and a writer handle. The best way to split a reader/writer combo depends on the specific type.

Any reader + writer type can be split using the io::split utility. This function takes a single value and returns separate reader and writer handles. These two handles can be used independently, including from separate tasks.

For example, the echo client could handle concurrent reads and writes like this:

  1. use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
  2. use tokio::net::TcpStream;
  3. #[tokio::main]
  4. async fn main() -> io::Result<()> {
  5. let socket = TcpStream::connect("127.0.0.1:6142").await?;
  6. let (mut rd, mut wr) = io::split(socket);
  7. // Write data in the background
  8. let write_task = tokio::spawn(async move {
  9. wr.write_all(b"hello\r\n").await?;
  10. wr.write_all(b"world\r\n").await?;
  11. // Sometimes, the rust type inferencer needs
  12. // a little help
  13. Ok::<_, io::Error>(())
  14. });
  15. let mut buf = vec![0; 128];
  16. loop {
  17. let n = rd.read(&mut buf).await?;
  18. if n == 0 {
  19. break;
  20. }
  21. println!("GOT {:?}", &buf[..n]);
  22. }
  23. Ok(())
  24. }

Because io::split supports any value that implements AsyncRead + AsyncWrite and returns independent handles, internally io::split uses an Arc and a Mutex. This overhead can be avoided with TcpStream. TcpStream offers two specialized split functions.

TcpStream::split takes a reference to the stream and returns a reader and writer handle. Because a reference is used, both handles must stay on the same task that split() was called from. This specialized split is zero-cost. There is no Arc or Mutex needed. TcpStream also provides into_split which supports handles that can move across tasks at the cost of only an Arc.

Because io::copy() is called on the same task that owns the TcpStream, we can use TcpStream::split. The task that processes the echo logic becomes:

  1. tokio::spawn(async move {
  2. let (mut rd, mut wr) = socket.split();
  3. if io::copy(&mut rd, &mut wr).await.is_err() {
  4. eprintln!("failed to copy");
  5. }
  6. });

You can find the entire code here.

Manual copying

Now lets look at how we would write the echo server by copying the data manually. To do this, we use AsyncReadExt::read and AsyncWriteExt::write_all.

The full echo server is as follows:

  1. use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
  2. use tokio::net::TcpListener;
  3. #[tokio::main]
  4. async fn main() -> io::Result<()> {
  5. let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
  6. loop {
  7. let (mut socket, _) = listener.accept().await?;
  8. tokio::spawn(async move {
  9. let mut buf = vec![0; 1024];
  10. loop {
  11. match socket.read(&mut buf).await {
  12. // Return value of `Ok(0)` signifies that the remote has
  13. // closed
  14. Ok(0) => return,
  15. Ok(n) => {
  16. // Copy the data back to socket
  17. if socket.write_all(&buf[..n]).await.is_err() {
  18. // Unexpected socket error. There isn't much we can
  19. // do here so just stop processing.
  20. return;
  21. }
  22. }
  23. Err(_) => {
  24. // Unexpected socket error. There isn't much we can do
  25. // here so just stop processing.
  26. return;
  27. }
  28. }
  29. }
  30. });
  31. }
  32. }

Let’s break it down. First, since the AsyncRead and AsyncWrite utilities are used, the extension traits must be brought into scope.

  1. use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

Allocating a buffer

The strategy is to read some data from the socket into a buffer then write the contents of the buffer back to the socket.

  1. let mut buf = vec![0; 1024];

A stack buffer is explicitly avoided. Recall from earlier, we noted that all task data that lives across calls to .await must be stored by the task. In this case, buf is used across .await calls. All task data is stored in a single allocation. You can think of it as an enum where each variant is the data that needs to be stored for a specific call to .await.

If the buffer is represented by a stack array, the internal structure for tasks spawned per accepted socket might look something like:

  1. struct Task {
  2. // internal task fields here
  3. task: enum {
  4. AwaitingRead {
  5. socket: TcpStream,
  6. buf: [BufferType],
  7. },
  8. AwaitingWriteAll {
  9. socket: TcpStream,
  10. buf: [BufferType],
  11. }
  12. }
  13. }

If a stack array is used as the buffer type, it will be stored inline in the task structure. This will make the task structure very big. Additionally, buffer sizes are often page sized. This will, in turn, make Task an awkward size: $page-size + a-few-bytes.

The compiler optimizes the layout of async blocks further than a basic enum. In practice, variables are not moved around between variants as would be required with an enum. However, the task struct size is at least as big as the largest variable.

Because of this, it is usually more efficient to use a dedicated allocation for the buffer.

Handling EOF

When the read half of the TCP stream is shut down, a call to read() returns Ok(0). It is important to exit the read loop at this point. Forgetting to break from the read loop on EOF is a common source of bugs.

  1. loop {
  2. match socket.read(&mut buf).await {
  3. // Return value of `Ok(0)` signifies that the remote has
  4. // closed
  5. Ok(0) => return,
  6. // ... other cases handled here
  7. }
  8. }

Forgetting to break from the read loop usually results in a 100% CPU infinite loop situation. As the socket is closed, socket.read() returns immediately. The loop then repeats forever.

Full code can be found here.