Receive responses

The final step is to receive the response back from the manager task. The GET command needs to get the value and the SET command needs to know if the operation completed successfully.

To pass the response, an oneshot channel is used. The oneshot channel is a single-producer, single-consumer channel optimized for sending a single value. In our case, the single value is the response.

Similar to mpsc, oneshot::channel() returns a sender and receiver handle.

  1. use tokio::sync::oneshot;
  2. let (tx, rx) = oneshot::channel();

Unlike mpsc, no capacity is specified as the capacity is always one. Additionally, neither handle can be cloned.

To receive responses from the manager task, before sending a command, a oneshot channel is created. The Sender half of the channel is included in the command to the manager task. The receive half is used to receive the response.

First, update Command to include the Sender. For convenience, a type alias is used to reference the Sender.

  1. use tokio::sync::oneshot;
  2. use bytes::Bytes;
  3. /// Multiple different commands are multiplexed over a single channel.
  4. #[derive(Debug)]
  5. enum Command {
  6. Get {
  7. key: String,
  8. resp: Responder<Option<Bytes>>,
  9. },
  10. Set {
  11. key: String,
  12. val: Vec<u8>,
  13. resp: Responder<()>,
  14. },
  15. }
  16. /// Provided by the requester and used by the manager task to send
  17. /// the command response back to the requester.
  18. type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

Now, update the tasks issuing the commands to include the oneshot::Sender.

  1. let t1 = tokio::spawn(async move {
  2. let (resp_tx, resp_rx) = oneshot::channel();
  3. let cmd = Command::Get {
  4. key: "hello".to_string(),
  5. resp: resp_tx,
  6. };
  7. // Send the GET request
  8. tx.send(cmd).await.unwrap();
  9. // Await the response
  10. let res = resp_rx.await;
  11. println!("GOT = {:?}", res);
  12. });
  13. let t2 = tokio::spawn(async move {
  14. let (resp_tx, resp_rx) = oneshot::channel();
  15. let cmd = Command::Set {
  16. key: "foo".to_string(),
  17. val: b"bar".to_vec(),
  18. resp: resp_tx,
  19. };
  20. // Send the SET request
  21. tx2.send(cmd).await.unwrap();
  22. // Await the response
  23. let res = resp_rx.await;
  24. println!("GOT = {:?}", res);
  25. });

Finally, update the manager task to send the response over the oneshot channel.

  1. while let Some(cmd) = rx.recv().await {
  2. match cmd {
  3. Command::Get { key, resp } => {
  4. let res = client.get(&key).await;
  5. // Ignore errors
  6. let _ = resp.send(res);
  7. }
  8. Command::Set { key, val, resp } => {
  9. let res = client.set(&key, val.into()).await;
  10. // Ignore errors
  11. let _ = resp.send(res);
  12. }
  13. }
  14. }

Calling send on oneshot::Sender completes immediately and does not require an .await. This is because send on an oneshot channel will always fail or succeed immediately without any form of waiting.

Sending a value on a oneshot channel returns Err when the receiver half has dropped. This indicates the receiver is no longer interested in the response. In our scenario, the receiver cancelling interest is an acceptable event. The Err returned by resp.send(...) does not need to be handled.

You can find the entire code here.