示例:Echo服务器

我们将使用到目前为止所覆盖的内容来构建echo服务器。这是一个Tokio应用程序,它包含了我们迄今为止学到的所有内容。服务器将简单地从连接的客户端接收消息,并将收到的相同消息发送回客户端。

我们将能够使用我们在hello world部分中创建的基本Tcp客户端来测试此echo服务器 。

完整的代码可以在这里找到。

创建

首先,生成一个新的箱子。

  1. $ cargo new --bin echo-server
  2. cd echo-server

接下来,添加必要的依赖项:

  1. [dependencies]
  2. tokio = "0.1"

main.rs

  1. extern crate tokio;
  2. extern crate futures;
  3. use tokio::io;
  4. use tokio::net::TcpListener;
  5. use tokio::prelude::*;

现在,我们为服务器设置必要的结构:

  • 绑定TcpListener到本地端口。
  • 定义接受入站连接并处理它们的任务。
  • 生成服务器任务。
  • 启动Tokio运行时
    同样,在执行者上生成服务器任务之前,实际上不会执行任何工作。
  1. fn main() {
  2. let addr = "127.0.0.1:6142".parse().unwrap();
  3. let listener = TcpListener::bind(&addr).unwrap();
  4. // Here we convert the `TcpListener` to a stream of incoming connections
  5. // with the `incoming` method. We then define how to process each element in
  6. // the stream with the `for_each` combinator method
  7. let server = listener.incoming().for_each(|socket| {
  8. // TODO: Process socket
  9. Ok(())
  10. })
  11. .map_err(|err| {
  12. // Handle error by printing to STDOUT.
  13. println!("accept error = {:?}", err);
  14. });
  15. println!("server running on localhost:6142");
  16. # // `select` completes when the first of the two futures completes. Since
  17. # // future::ok() completes immediately, the server won't hang waiting for
  18. # // more connections. This is just so the doc test doesn't hang.
  19. # let server = server.select(futures::future::ok(())).then(|_| Ok(()));
  20. // Start the server
  21. //
  22. // This does a few things:
  23. //
  24. // * Start the Tokio runtime
  25. // * Spawns the `server` task onto the runtime.
  26. // * Blocks the current thread until the runtime becomes idle, i.e. all
  27. // spawned tasks have completed.
  28. tokio::run(server);
  29. }

在这里,我们创建了一个可以侦听传入TCP连接的TcpListener。在监听器上, 我们调用incoming,将监听器转换为入站客户端连接流。然后我们调用for_each,它将产生每个入站客户端连接。 目前我们没有对此入站连接做任何事情 - 这是我们的下一步。

一旦我们拥有了我们的服务器,我们就可以将它交给tokio::run。到目前为止,我们的服务器功能一无所获。由Tokio运行时驱动我们的Future完成。

注意:我们必须在服务器上调用map_err,因为tokio :: run需要一个Item为type()和Error为type()的Future。 这是为了确保在将Future交付给运行时之前处理所有值和错误。

处理连接

既然我们有传入的客户端连接,我们应该处理它们。

我们只想将从套接字读取的所有数据复制回套接字本身(例如“echo”)。 我们可以使用标准的io :: copy函数来做到这一点。

该copy函数有两个参数,从哪里读取以及在哪里写入。 但是,我们只有一个参数,使用socket。 幸运的是,有一个方法split,它将可读和可写的流分成两半。 此操作允许我们独立地处理每个流,例如将它们作为copy函数的两个参数传递。

然后,copy函数返回一个Future,当复制操作完成时,将接收此Future,解析为复制的数据量。

让我们来看看我们再次传递给for_each的闭包。

  1. let server = listener.incoming().for_each(|socket| {
  2. // split the socket stream into readable and writable parts
  3. let (reader, writer) = socket.split();
  4. // copy bytes from the reader into the writer
  5. let amount = io::copy(reader, writer);
  6. let msg = amount.then(|result| {
  7. match result {
  8. Ok((amount, _, _)) => println!("wrote {} bytes", amount),
  9. Err(e) => println!("error: {}", e),
  10. }
  11. Ok(())
  12. });
  13. // spawn the task that handles the client connection socket on to the
  14. // tokio runtime. This means each client connection will be handled
  15. // concurrently
  16. tokio::spawn(msg);
  17. Ok(())
  18. })

如您所见,我们已将socket流拆分为可读写部分。 然后我们使用io :: copyreader读取并写入writer。 我们使用then 组合器来查看amount未来的ItemError作为Result打印一些诊断。

对tokio::spawn](https://docs.rs/tokio-executor/0.1/tokio_executor/fn.spawn.html)的调用是关键所在。 至关重要的是我们希望所有clients同时取得进展,而不是在完成另一个client时阻止其中一个。 为此,我们使用tokio :: spawn函数在后台执行工作。

如果我们没有这样做,那么for_each中块的每次调用都会在一次解决,这意味着我们永远不会同时处理两个客户端连接!