Solutions
Dining Philosophers —- Async
use std::sync::Arc;use tokio::sync::{mpsc, Mutex};use tokio::time;struct Fork;struct Philosopher {name: String,left_fork: Arc<Mutex<Fork>>,right_fork: Arc<Mutex<Fork>>,thoughts: mpsc::Sender<String>,}impl Philosopher {async fn think(&self) {self.thoughts.send(format!("Eureka! {} has a new idea!", &self.name)).await.unwrap();}async fn eat(&self) {// Keep trying until we have both forks// Pick up forks...let _left_fork = self.left_fork.lock().await;let _right_fork = self.right_fork.lock().await;println!("{} is eating...", &self.name);time::sleep(time::Duration::from_millis(5)).await;// The locks are dropped here}}static PHILOSOPHERS: &[&str] =&["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];#[tokio::main]async fn main() {// Create forkslet mut forks = vec![];(0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));// Create philosopherslet (philosophers, mut rx) = {let mut philosophers = vec![];let (tx, rx) = mpsc::channel(10);for (i, name) in PHILOSOPHERS.iter().enumerate() {let mut left_fork = Arc::clone(&forks[i]);let mut right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);if i == PHILOSOPHERS.len() - 1 {std::mem::swap(&mut left_fork, &mut right_fork);}philosophers.push(Philosopher {name: name.to_string(),left_fork,right_fork,thoughts: tx.clone(),});}(philosophers, rx)// tx is dropped here, so we don't need to explicitly drop it later};// Make them think and eatfor phil in philosophers {tokio::spawn(async move {for _ in 0..100 {phil.think().await;phil.eat().await;}});}// Output their thoughtswhile let Some(thought) = rx.recv().await {println!("Here is a thought: {thought}");}}
Broadcast Chat Application
src/bin/server.rs:
use futures_util::sink::SinkExt;use futures_util::stream::StreamExt;use std::error::Error;use std::net::SocketAddr;use tokio::net::{TcpListener, TcpStream};use tokio::sync::broadcast::{channel, Sender};use tokio_websockets::{Message, ServerBuilder, WebSocketStream};async fn handle_connection(addr: SocketAddr,mut ws_stream: WebSocketStream<TcpStream>,bcast_tx: Sender<String>,) -> Result<(), Box<dyn Error + Send + Sync>> {ws_stream.send(Message::text("Welcome to chat! Type a message".to_string())).await?;let mut bcast_rx = bcast_tx.subscribe();// A continuous loop for concurrently performing two tasks: (1) receiving// messages from `ws_stream` and broadcasting them, and (2) receiving// messages on `bcast_rx` and sending them to the client.loop {tokio::select! {incoming = ws_stream.next() => {match incoming {Some(Ok(msg)) => {if let Some(text) = msg.as_text() {println!("From client {addr:?} {text:?}");bcast_tx.send(text.into())?;}}Some(Err(err)) => return Err(err.into()),None => return Ok(()),}}msg = bcast_rx.recv() => {ws_stream.send(Message::text(msg?)).await?;}}}}#[tokio::main]async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {let (bcast_tx, _) = channel(16);let listener = TcpListener::bind("127.0.0.1:2000").await?;println!("listening on port 2000");loop {let (socket, addr) = listener.accept().await?;println!("New connection from {addr:?}");let bcast_tx = bcast_tx.clone();tokio::spawn(async move {// Wrap the raw TCP stream into a websocket.let ws_stream = ServerBuilder::new().accept(socket).await?;handle_connection(addr, ws_stream, bcast_tx).await});}}
src/bin/client.rs:
use futures_util::stream::StreamExt;use futures_util::SinkExt;use http::Uri;use tokio::io::{AsyncBufReadExt, BufReader};use tokio_websockets::{ClientBuilder, Message};#[tokio::main]async fn main() -> Result<(), tokio_websockets::Error> {let (mut ws_stream, _) =ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")).connect().await?;let stdin = tokio::io::stdin();let mut stdin = BufReader::new(stdin).lines();// Continuous loop for concurrently sending and receiving messages.loop {tokio::select! {incoming = ws_stream.next() => {match incoming {Some(Ok(msg)) => {if let Some(text) = msg.as_text() {println!("From server: {}", text);}},Some(Err(err)) => return Err(err.into()),None => return Ok(()),}}res = stdin.next_line() => {match res {Ok(None) => return Ok(()),Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,Err(err) => return Err(err.into()),}}}}}