Example: A Chat Server

We’re going to use what has been covered so far to build a chat server. This isa non-trivial Tokio server application.

The server is going to use a line-based protocol. Lines are terminated by\r\n. This is compatible with telnet, so we will just use telnet for theclient. When a client connects, it must identify itself by sending a linecontaining its “nick” (i.e., some name used to identify the client amongst itspeers).

Once a client is identified, all sent lines are prefixed with [nick]: andbroadcasted to all other connected clients.

The full code can be found here. Note that Tokio provides some additionalabstractions that have not yet been covered that would enable the chat server tobe written with less code.

Setup

First, generate a new crate.

  1. $ cargo new --bin line-chat
  2. cd line-chat

Next, add the necessary dependencies:

  1. [dependencies]
  2. tokio = "0.1"
  3. tokio-io = "0.1"
  4. futures = "0.1"
  5. bytes = "0.4"

and the crates and types into scope in main.rs:

  1. # #![deny(deprecated)]
  2. extern crate tokio;
  3. #[macro_use]
  4. extern crate futures;
  5. extern crate bytes;
  6. use tokio::io;
  7. use tokio::net::{TcpListener, TcpStream};
  8. use tokio::prelude::*;
  9. use futures::sync::mpsc;
  10. use futures::future::{self, Either};
  11. use bytes::{BytesMut, Bytes, BufMut};
  12. use std::collections::HashMap;
  13. use std::net::SocketAddr;
  14. use std::sync::{Arc, Mutex};
  15. /// Shorthand for the transmit half of the message channel.
  16. type Tx = mpsc::UnboundedSender<Bytes>;
  17. /// Shorthand for the receive half of the message channel.
  18. type Rx = mpsc::UnboundedReceiver<Bytes>;
  19. # fn main() {}

Now, we setup the necessary structure for a server. These are the same stepsthat were used as part of the Hello World! example:

  • Bind a TcpListener to a local port.
  • Define a task that accepts inbound connections and processes them.
  • Start the Tokio runtime
  • Spawn the server task.
    Again, no work actually happens until the server task is spawned on theexecutor.
  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate futures;
  4. #
  5. # use tokio::prelude::*;
  6. # use tokio::net::TcpListener;
  7. fn main() {
  8. let addr = "127.0.0.1:6142".parse().unwrap();
  9. let listener = TcpListener::bind(&addr).unwrap();
  10. let server = listener.incoming().for_each(move |socket| {
  11. // TODO: Process socket
  12. Ok(())
  13. })
  14. .map_err(|err| {
  15. // Handle error by printing to STDOUT.
  16. println!("accept error = {:?}", err);
  17. });
  18. println!("server running on localhost:6142");
  19. # let server = server.select(futures::future::ok(())).then(|_| Ok(()));
  20. // Start the server
  21. //
  22. // This does a few things:
  23. //
  24. // * Start the Tokio runtime (reactor, threadpool, etc...)
  25. // * Spawns the `server` task onto the runtime.
  26. // * Blocks the current thread until the runtime becomes idle, i.e. all
  27. // spawned tasks have completed.
  28. tokio::run(server);
  29. }

Chat State

A chat server requires that messages received from one client are broadcasted toall other connected clients. This will be done using message passing overmpsc channels.

Each client socket will be managed by a task. Each task will have an associatedmpsc channel that is used to receive messages from other clients. The sendhalf of all these channels is stored in an Rc cell in order to make themaccessible.

In this example, we are going to be using unbounded channels. Ideally,channels should never be unbounded, but handling backpressure in this kind ofsituation is a bit tricky. We will leave bounding the channels to a latersection dedicated to handling backpressure.

Here is how the shared state is defined (the Tx type alias was done above):

  1. # #![deny(deprecated)]
  2. # use std::collections::HashMap;
  3. # use std::net::SocketAddr;
  4. # struct Tx;
  5. struct Shared {
  6. peers: HashMap<SocketAddr, Tx>,
  7. }

Then, at the very top of the main function, the state instance is created.This state instance will be moved into the task that accepts incomingconnections.

  1. # #![deny(deprecated)]
  2. # use std::sync::{Arc, Mutex};
  3. # type Shared = String;
  4. let state = Arc::new(Mutex::new(Shared::new()));

Now we can handle processing incoming connections. The server task is updated tothis:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate futures;
  4. # use tokio::net::{TcpListener, TcpStream};
  5. # use futures::prelude::*;
  6. # fn dox() {
  7. # let addr = "127.0.0.1:6142".parse().unwrap();
  8. # let listener = TcpListener::bind(&addr).unwrap();
  9. # fn process(_: TcpStream, _: String) {}
  10. # let state = String::new();
  11. listener.incoming().for_each(move |socket| {
  12. process(socket, state.clone());
  13. Ok(())
  14. })
  15. # ;
  16. # }
  17. # fn main() {}

The server task passes all sockets along with a clone of the server state to aprocess function. Let’s define that function. It will have a structure likethis:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate futures;
  4. # use futures::future;
  5. # use tokio::net::TcpStream;
  6. # use std::sync::{Arc, Mutex};
  7. # type Shared = String;
  8. fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
  9. // Define the task that processes the connection.
  10. # /*
  11. let task = unimplemented!();
  12. # */ let task = future::ok(());
  13. // Spawn the task
  14. tokio::spawn(task);
  15. }
  16. # fn main() {}

The call to tokio::spawn will spawn a new task onto the current Tokio runtime.All the worker threads keep a reference to the current runtime stored in athread-local variable. Note, attempting to call tokio::spawn from outside ofthe Tokio runtime will result in a panic.

All the connection processing logic has to be able to do is understand theprotocol. The protocol is line-based, terminated by \r\n. Instead of workingat the byte stream level, it is much easier to work at the frame level, i.e.working with values that represent atomic messages.

We implement a codec that holds the socket and exposes an API that takes andconsumes lines.

Line Codec

A codec is a loose term for a type that takes a byte stream type (AsyncRead +
AsyncWrite
) and exposes a read and write API at the frame level. Thetokio-io crate provides additional helpers for writing codecs, in thisexample, we are going to do it by hand.

The Lines codec is defined as such:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate bytes;
  4. # use tokio::net::TcpStream;
  5. # use bytes::BytesMut;
  6. struct Lines {
  7. socket: TcpStream,
  8. rd: BytesMut,
  9. wr: BytesMut,
  10. }
  11. impl Lines {
  12. /// Create a new `Lines` codec backed by the socket
  13. fn new(socket: TcpStream) -> Self {
  14. Lines {
  15. socket,
  16. rd: BytesMut::new(),
  17. wr: BytesMut::new(),
  18. }
  19. }
  20. }
  21. # fn main() {}

Data read from the socket is buffered into rd. When a full line is read, it isreturned to the caller. Lines submitted by the caller to write to the socket arebuffered into wr, then flushed.

This is how the read half is implemented:

  1. # #![deny(deprecated)]
  2. # extern crate bytes;
  3. # extern crate tokio;
  4. # #[macro_use]
  5. # extern crate futures;
  6. # #[macro_use]
  7. # use bytes::BytesMut;
  8. # use tokio::io;
  9. # use tokio::net::TcpStream;
  10. # use tokio::prelude::*;
  11. # struct Lines {
  12. # socket: TcpStream,
  13. # rd: BytesMut,
  14. # wr: BytesMut,
  15. # }
  16. impl Stream for Lines {
  17. type Item = BytesMut;
  18. type Error = io::Error;
  19. fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
  20. // First, read any new data that might have been received
  21. // off the socket
  22. //
  23. // We track if the socket is closed here and will be used
  24. // to inform the return value below.
  25. let sock_closed = self.fill_read_buf()?.is_ready();
  26. // Now, try finding lines
  27. let pos = self.rd.windows(2)
  28. .position(|bytes| bytes == b"\r\n");
  29. if let Some(pos) = pos {
  30. // Remove the line from the read buffer and set it
  31. // to `line`.
  32. let mut line = self.rd.split_to(pos + 2);
  33. // Drop the trailing \r\n
  34. line.split_off(pos);
  35. // Return the line
  36. return Ok(Async::Ready(Some(line)));
  37. }
  38. if sock_closed {
  39. Ok(Async::Ready(None))
  40. } else {
  41. Ok(Async::NotReady)
  42. }
  43. }
  44. }
  45. impl Lines {
  46. fn fill_read_buf(&mut self) -> Result<Async<()>, io::Error> {
  47. loop {
  48. // Ensure the read buffer has capacity.
  49. //
  50. // This might result in an internal allocation.
  51. self.rd.reserve(1024);
  52. // Read data into the buffer.
  53. //
  54. // The `read_buf` fn is provided by `AsyncRead`.
  55. let n = try_ready!(self.socket.read_buf(&mut self.rd));
  56. if n == 0 {
  57. return Ok(Async::Ready(()));
  58. }
  59. }
  60. }
  61. }
  62. # fn main() {}

The example uses BytesMut from the bytes crate. This provides some niceutilities for working with byte sequences in a networking context. TheStream implementation yields BytesMut values which contain exactly oneline.

As always, the key to implementing a function that returns Async is to neverreturn Async::NotReady unless the function implementation received anAsync::NotReady itself. In this example, NotReady is only returned iffill_read_buf returns NotReady and fill_read_buf only returns NotReadyif TcpStream::read_buf returns NotReady.

Now, for the write half.

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate bytes;
  4. # #[macro_use]
  5. # extern crate futures;
  6. # use tokio::io;
  7. # use tokio::net::TcpStream;
  8. # use tokio::prelude::*;
  9. # use bytes::{BytesMut, BufMut};
  10. struct Lines {
  11. socket: TcpStream,
  12. rd: BytesMut,
  13. wr: BytesMut,
  14. }
  15. impl Lines {
  16. fn buffer(&mut self, line: &[u8]) {
  17. // Push the line onto the end of the write buffer.
  18. //
  19. // The `put` function is from the `BufMut` trait.
  20. self.wr.put(line);
  21. }
  22. fn poll_flush(&mut self) -> Poll<(), io::Error> {
  23. // As long as there is buffered data to write, try to write it.
  24. while !self.wr.is_empty() {
  25. // Try to write some bytes to the socket
  26. let n = try_ready!(self.socket.poll_write(&self.wr));
  27. // As long as the wr is not empty, a successful write should
  28. // never write 0 bytes.
  29. assert!(n > 0);
  30. // This discards the first `n` bytes of the buffer.
  31. let _ = self.wr.split_to(n);
  32. }
  33. Ok(Async::Ready(()))
  34. }
  35. }
  36. fn main() {}

The caller queues up all lines by calling buffer. This appends the line to theinternal wr buffer. Then, once all data is queued up, the caller callspoll_flush, which does the actual writing to the socket. poll_flush onlyreturns Ready once all the queued data has been successfully written to thesocket.

Similar to the read half, NotReady is only returned when the functionimplementation received NotReady itself.

And the Lines codec is used in the process function as such:

  1. # #![deny(deprecated)]
  2. # extern crate tokio;
  3. # extern crate bytes;
  4. # use tokio::net::TcpStream;
  5. # use tokio::prelude::*;
  6. # use bytes::BytesMut;
  7. # use std::io;
  8. # use std::sync::{Arc, Mutex};
  9. # type Shared = String;
  10. # struct Lines;
  11. # impl Lines {
  12. # fn new(_: TcpStream) -> Self { unimplemented!() }
  13. # }
  14. # impl Stream for Lines {
  15. # type Item = BytesMut;
  16. # type Error = io::Error;
  17. # fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { unimplemented!() }
  18. # }
  19. fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
  20. // Wrap the socket with the `Lines` codec that we wrote above.
  21. let lines = Lines::new(socket);
  22. // The first line is treated as the client's name. The client
  23. // is not added to the set of connected peers until this line
  24. // is received.
  25. //
  26. // We use the `into_future` combinator to extract the first
  27. // item from the lines stream. `into_future` takes a `Stream`
  28. // and converts it to a future of `(first, rest)` where `rest`
  29. // is the original stream instance.
  30. let connection = lines.into_future()
  31. // `into_future` doesn't have the right error type, so map
  32. // the error to make it work.
  33. .map_err(|(e, _)| e)
  34. // Process the first received line as the client's name.
  35. .and_then(|(name, lines)| {
  36. let name = match name {
  37. Some(name) => name,
  38. None => {
  39. // TODO: Handle a client that disconnects
  40. // early.
  41. unimplemented!();
  42. }
  43. };
  44. // TODO: Rest of the process function
  45. # Ok(())
  46. });
  47. }
  48. # fn main() {}

Broadcasting Messages

The next step is to implement the connection processing logic that handles theactual chat functionality, i.e. broadcasting messages from one client to all theothers.

To implement this, we will explicitly implement a Future that takes theLines codec instance and handles the broadcasting logic. This logic handles:

  • Receive messages on its message channel and write them to the socket.
  • Receive messages from the socket and broadcast them to all peers.
    Implementing this logic entirely with combinators is also possible, but requiresusing split, which hasn’t been covered yet. Also, this provides anopportunity to see how to implement a non-trivial Future by hand.

Here is the definition of the future that processes the broadcast logic for aconnection:

  1. # use std::net::SocketAddr;
  2. # use std::sync::{Arc, Mutex};
  3. # type BytesMut = ();
  4. # type Lines = ();
  5. # type Shared = ();
  6. # type Rx = ();
  7. struct Peer {
  8. /// Name of the peer. This is the first line received from the client.
  9. name: BytesMut,
  10. /// The TCP socket wrapped with the `Lines` codec.
  11. lines: Lines,
  12. /// Handle to the shared chat state.
  13. state: Arc<Mutex<Shared>>,
  14. /// Receive half of the message channel.
  15. ///
  16. /// This is used to receive messages from peers. When a message is received
  17. /// off of this `Rx`, it will be written to the socket.
  18. rx: Rx,
  19. /// Client socket address.
  20. ///
  21. /// The socket address is used as the key in the `peers` HashMap. The
  22. /// address is saved so that the `Peer` drop implementation can clean up its
  23. /// entry.
  24. addr: SocketAddr,
  25. }
  26. # fn main() {}

And a Peer instance is created as such:

  1. # extern crate bytes;
  2. # extern crate futures;
  3. # extern crate tokio;
  4. # use bytes::{BytesMut, Bytes};
  5. # use futures::sync::mpsc;
  6. # use tokio::net::TcpStream;
  7. # use tokio::prelude::*;
  8. # use std::net::SocketAddr;
  9. # use std::collections::HashMap;
  10. # use std::sync::{Arc, Mutex};
  11. # struct Peer {
  12. # name: BytesMut,
  13. # lines: Lines,
  14. # state: Arc<Mutex<Shared>>,
  15. # rx: Rx,
  16. # addr: SocketAddr,
  17. # }
  18. # struct Shared {
  19. # peers: HashMap<SocketAddr, Tx>,
  20. # }
  21. # struct Lines {
  22. # socket: TcpStream,
  23. # }
  24. # type Tx = mpsc::UnboundedSender<Bytes>;
  25. # type Rx = mpsc::UnboundedReceiver<Bytes>;
  26. impl Peer {
  27. fn new(name: BytesMut,
  28. state: Arc<Mutex<Shared>>,
  29. lines: Lines) -> Peer
  30. {
  31. // Get the client socket address
  32. let addr = lines.socket.peer_addr().unwrap();
  33. // Create a channel for this peer
  34. let (tx, rx) = mpsc::unbounded();
  35. // Add an entry for this `Peer` in the shared state map.
  36. state.lock().unwrap()
  37. .peers.insert(addr, tx);
  38. Peer {
  39. name,
  40. lines,
  41. state,
  42. rx,
  43. addr,
  44. }
  45. }
  46. }
  47. # fn main() {}

A mpsc channel is created for other peers to send their messages to thisnewly created peer. After creating the channel, the transmit half is insertedinto the peers map. This entry is removed in the drop implementation forPeer.

  1. # use std::net::SocketAddr;
  2. # use std::collections::HashMap;
  3. # use std::sync::{Arc, Mutex};
  4. # struct Peer {
  5. # state: Arc<Mutex<Shared>>,
  6. # addr: SocketAddr,
  7. # }
  8. # struct Shared {
  9. # peers: HashMap<SocketAddr, ()>,
  10. # }
  11. impl Drop for Peer {
  12. fn drop(&mut self) {
  13. self.state.lock().unwrap().peers
  14. .remove(&self.addr);
  15. }
  16. }
  17. # fn main() {}

And here is the implementation.

  1. # extern crate tokio;
  2. # extern crate futures;
  3. # extern crate bytes;
  4. # use tokio::io;
  5. # use tokio::prelude::*;
  6. # use futures::sync::mpsc;
  7. # use bytes::{Bytes, BytesMut, BufMut};
  8. # use std::net::SocketAddr;
  9. # use std::collections::HashMap;
  10. # use std::sync::{Arc, Mutex};
  11. # struct Peer {
  12. # name: BytesMut,
  13. # lines: Lines,
  14. # state: Arc<Mutex<Shared>>,
  15. # rx: Rx,
  16. # addr: SocketAddr,
  17. # }
  18. # struct Shared {
  19. # peers: HashMap<SocketAddr, Tx>,
  20. # }
  21. # struct Lines;
  22. # type Tx = mpsc::UnboundedSender<Bytes>;
  23. # type Rx = mpsc::UnboundedReceiver<Bytes>;
  24. # impl Lines {
  25. # fn buffer(&mut self, _: &[u8]) { unimplemented!() }
  26. # fn poll_flush(&mut self) -> Poll<(), io::Error> { unimplemented!() }
  27. # }
  28. # impl Stream for Lines {
  29. # type Item = BytesMut;
  30. # type Error = io::Error;
  31. # fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  32. # unimplemented!();
  33. # }
  34. # }
  35. impl Future for Peer {
  36. type Item = ();
  37. type Error = io::Error;
  38. fn poll(&mut self) -> Poll<(), io::Error> {
  39. // Receive all messages from peers.
  40. loop {
  41. // Polling an `UnboundedReceiver` cannot fail, so `unwrap`
  42. // here is safe.
  43. match self.rx.poll().unwrap() {
  44. Async::Ready(Some(v)) => {
  45. // Buffer the line. Once all lines are buffered,
  46. // they will be flushed to the socket (right
  47. // below).
  48. self.lines.buffer(&v);
  49. }
  50. _ => break,
  51. }
  52. }
  53. // Flush the write buffer to the socket
  54. let _ = self.lines.poll_flush()?;
  55. // Read new lines from the socket
  56. while let Async::Ready(line) = self.lines.poll()? {
  57. println!("Received line ({:?}) : {:?}", self.name, line);
  58. if let Some(message) = line {
  59. // Append the peer's name to the front of the line:
  60. let mut line = self.name.clone();
  61. line.put(": ");
  62. line.put(&message);
  63. line.put("\r\n");
  64. // We're using `Bytes`, which allows zero-copy clones
  65. // (by storing the data in an Arc internally).
  66. //
  67. // However, before cloning, we must freeze the data.
  68. // This converts it from mutable -> immutable,
  69. // allowing zero copy cloning.
  70. let line = line.freeze();
  71. // Now, send the line to all other peers
  72. for (addr, tx) in &self.state.lock().unwrap().peers {
  73. // Don't send the message to ourselves
  74. if *addr != self.addr {
  75. // The send only fails if the rx half has been
  76. // dropped, however this is impossible as the
  77. // `tx` half will be removed from the map
  78. // before the `rx` is dropped.
  79. tx.unbounded_send(line.clone()).unwrap();
  80. }
  81. }
  82. } else {
  83. // EOF was reached. The remote client has disconnected.
  84. // There is nothing more to do.
  85. return Ok(Async::Ready(()));
  86. }
  87. }
  88. // As always, it is important to not just return `NotReady`
  89. // without ensuring an inner future also returned `NotReady`.
  90. //
  91. // We know we got a `NotReady` from either `self.rx` or
  92. // `self.lines`, so the contract is respected.
  93. Ok(Async::NotReady)
  94. }
  95. }
  96. # fn main() {}

Final Touches

All that remains is wiring up the Peer future that was just implemented. To dothis, the client connection task (defined in the process function) is extendedto use Peer.

  1. # extern crate tokio;
  2. # extern crate futures;
  3. # use tokio::io;
  4. # use tokio::prelude::*;
  5. # use futures::future::{self, Either, empty};
  6. # type Lines = Box<Stream<Item = (), Error = io::Error>>;
  7. # struct Peer;
  8. # impl Peer {
  9. # fn new(_: (), state: (), lines: Lines) -> impl Future<Item = (), Error = io::Error> {
  10. # empty()
  11. # }
  12. # }
  13. # fn dox(lines: Lines) {
  14. # let state = ();
  15. let connection = lines.into_future()
  16. .map_err(|(e, _)| e)
  17. .and_then(|(name, lines)| {
  18. // If `name` is `None`, then the client disconnected without
  19. // actually sending a line of data.
  20. //
  21. // Since the connection is closed, there is no further work
  22. // that we need to do. So, we just terminate processing by
  23. // returning `future::ok()`.
  24. //
  25. // The problem is that only a single future type can be
  26. // returned from a combinator closure, but we want to
  27. // return both `future::ok()` and `Peer` (below).
  28. //
  29. // This is a common problem, so the `futures` crate solves
  30. // this by providing the `Either` helper enum that allows
  31. // creating a single return type that covers two concrete
  32. // future types.
  33. let name = match name {
  34. Some(name) => name,
  35. None => {
  36. // The remote client closed the connection without
  37. // sending any data.
  38. return Either::A(future::ok(()));
  39. }
  40. };
  41. println!("`{:?}` is joining the chat", name);
  42. // Create the peer.
  43. //
  44. // This is also a future that processes the connection, only
  45. // completing when the socket closes.
  46. let peer = Peer::new(
  47. name,
  48. state,
  49. lines);
  50. // Wrap `peer` with `Either::B` to make the return type fit.
  51. Either::B(peer)
  52. })
  53. // Task futures have an error of type `()`, this ensures we handle
  54. // the error. We do this by printing the error to STDOUT.
  55. .map_err(|e| {
  56. println!("connection error = {:?}", e);
  57. });
  58. # }
  59. # fn main() {}

Besides just adding Peer, name == None is also handled. In this case, theremote client terminated before identifying itself.

Returning multiple futures (the name == None handler and Peer) is handled bywrapping the returned futures in Either. Either is an enum that acceptsa different future type for each variant. This allows returning multiple futuretypes without reaching for trait objects.

The full code can be found here.

Next up: Timers