基于多线程的非阻塞 socket 编程

本文的目的和结构

本文的目的和背景

随着物联网的发展,越来越多产品需要基于网络进行数据传输。在实际开发中,往往要求网络传输时不能阻塞当前线程,以致无法及时处理其他消息。在用户无法直接套用简单的 socket demo 时,RT-Thread 提供基于多线程的非阻塞 socket 编程示例,方便用户进行应用程序开发。

本文的结构

本文首先介绍 socket 常规使用以及阐述存在的问题。然后提出针对该问题的解决方案,并在 QEMU 平台上验证。接着分析了具体的实现代码,最后总结这种方案的优点。

问题阐述

在 RT-Thread 使用 socket 网络编程时,当一个任务调用 socket的 recv()函数接收数据时,如果 socket 上并没有接收到数据,这个任务将阻塞在这个 recv() 函数里。这个时候,这个任务想要处理一些其他事情,例如进行一些数据采集,发送一些额外数据到网络上等,将变得不可能了。

与此同时,其他线程也需要将数据上传同一个服务器,如果直接多个线程共同使用一个 socket 操作,这将会破坏底层 lwip 的消息事件模型。

常规的socket编程模型及其存在的问题

首先根据下图简单介绍一下 socket 的使用流程

1客户端:

  • socket() 创建一个 socket,返回套接字的描述符,并为其分配系统资源
  • connect() 向服务器发出连接请求
  • send()/recv() 与服务器进行通信
  • closesocket() 关闭 socket,回收资源
    服务器:

  • socket() 创建一个 socket,返回套接字的描述符,并为其分配系统资源

  • bind() 将套接字绑定到一个本地地址和端口上
  • listen() 将套接字设为监听模式并设置监听数量,准备接收客户端请求
  • accept() 等待监听的客户端发起连接,并返回已接受连接的新套接字描述符
  • recv()/send() 用新套接字与客户端进行通信
  • closesocket() 关闭 socket,回收资源
    例如在上面网络客户端操作过程中,当进行 recv 操作时,如果对应的通道数据没有准备好,那系统就会让当前任务进入阻塞状态,当前任务不能再进行其他的操作。

问题的解决

在 RT-Thread 中,自 v3.0.0 以来更标准化,支持更多的 POSIX API。这其中就包括 poll / select 接口实现,并且可以进行 socket 和设备文件的联合 poll / select。select、poll的内部实现机制相似,由于本文选用 select 方式,故在此不对 poll 展开介绍。

下面结合框图进一步说明如何使用 select 和 pipe 来解决这类问题:2图中存在有三个线程:应用线程 thread1thread2 和客户端线程 thread client,其中 thread client 完成 select 功能

  • 数据发送过程:
  • 应用线程通过 pipe 往 thread client 发送数据 data1,select 探测到 pipe 有数据可读,thread client 被唤醒,然后读取 pipe 中的数据并通过 TCP socket 发送到 server
  • 数据接收过程:
  • server 通过 TCP socket 发送数据 data2 到 thread client,select 探测到 socket 有数据可读,thread client 被唤醒,thread client 可以获得接收到的数据
    下面将详细介绍 select 和 pipe 的使用方法。

select

select 的介绍

select() 可以阻塞地同时探测一组支持非阻塞的 I / O 设备是否有事件发生(如可读,可写,出现异常等等),直至某一个设备触发了事件或者超过了指定的等待时间。此时我们可以把需要的数据源通道放到 select 的探测范围内,只要相应的数据源准备好 select 就会返回,这时就能无阻塞地读取到数据。

select 的适用场合

select() 主要用来处理 I / O 多路复用的情况,适用如下场合:

  • 客户端处理多个描述符时(一般是交互式输入和网络套接口)
  • 服务器既要处理监听套接口,又要处理已连接套接口
  • 服务器既要处理 TCP,又要处理 UDP
  • 服务器要处理多个服务或多个协议

    select() API 说明

int select(int nfds, fd_set readfds, fd_set writefds, fd_set errorfds, struct timeval timeout);

参数 描述
nfds 集合中所有文件描述符的范围,即所有文件描述符的最大值加1
readfds 需要监视读变化的文件描述符集合
writefds 需要监视写变化的文件描述符集合
errorfds 需要监视出现异常的文件描述符集合
timeout select 的超时时间
返回 描述
正值 监视的文件集合出现可读写事件或异常事件
0 等待超时,没有可读写或异常的事件
负值 select 出现错误

pipe

pipe 的介绍

pipe 是一个基于文件描述符的单向数据通道,可用于线程间的通信。

选择 pipe的原因

在 RT-Thread 里面,pipe 支持文件描述符的形式操作,而且 pipe 不需要控制协议,操作简单。

在 msh />中,输入 list_fd 可查看当前打开的文件描述符,详情如下:
  1. msh />list_fd
  2. fd type ref magic path
  3.  
  4. -- ------ --- ----- ------
  5.  
  6. 0 file 1 fdfd /uart0
  7. 1 socket 1 fdfd
  8. 2 file 1 fdfd /pipe0
  9. 3 file 1 fdfd /pipe0
  10. msh />

下面将详细介绍代码的实现情况。

tcpclient 整体实现情况

tcpclient.c 是上文提出的 select、pipe 方案的具体实现代码,该源码采用面向对象的思想实现,提供 TCP 连接、发送、关闭以及注册接收回调四个 API 提供用户使用

tcpclient 序列图介绍

序列图反映了 tcpclient.c 整个流程

tcpclient_uml

下面来详细说明:

  • 调用 tcpclient_start() 设置服务器 ip 地址 & 端口号,以及完成 pipe、socket 初始化和 TCP 连接、select 配置等工作
  • 注册接收回调函数 rx_callback()
  • 调用 tcpclientsend() 通过 pipe 发送数据(图中绿线表示 select 探测到 pipe 可读事件_)
  • 图中绿线表示 select 探测到 pipe 可读事件, tcpclient 被唤醒并读取 pipe 的数据
  • tcpclient 通过 socket 发送数据给 server
  • server 通过 socket 发送数据给 tcpclient
  • 图中蓝线表示 select 探测到 socket 可读事件 ,tcpclient 被唤醒并读取 socket 的数据
  • app 通过 rx_callback() 获得 tcpclient 读取到的数据
  • 通信完毕,app 调用 tcpclient_close() 关闭 pipe、socket,并清理相关资源

    tcpclient 代码实现

节选核心代码
  1. static void select_handle(rt_tcpclient_t *thiz, char *pipe_buff, char *sock_buff)
  2. {
  3.  
  4. fd_set fds;
  5.  
  6. rt_int32_t max_fd = 0, res = 0;
  7. max_fd = MAX_VAL(thiz->sock_fd, thiz->pipe_read_fd) + 1;
  8.  
  9. /* 清空可读事件描述符列表 */
  10. FD_ZERO(&fds);
  11.  
  12. while (1)
  13. {
  14. /* 将需要监听可读事件的描述符加入列表 */
  15. FD_SET(thiz->sock_fd, &fds);
  16. FD_SET(thiz->pipe_read_fd, &fds);
  17.  
  18. /* 等待设定的网络描述符有事件发生 */
  19. res = select(max_fd, &fds, RT_NULL, RT_NULL, RT_NULL);
  20.  
  21. /* select 返回错误及超时处理 */
  22. EXCEPTION_HANDLE(res, "select handle", "error", "timeout");
  23.  
  24. /* 查看 sock 描述符上有没有发生可读事件 */
  25. if (FD_ISSET(thiz->sock_fd, &fds))
  26. {
  27. /* 从 sock 连接中接收最大BUFSZ - 1字节数据 */
  28. res = recv(thiz->sock_fd, sock_buff, BUFF_SIZE, 0);
  29.  
  30. /* recv 返回异常 */
  31. EXCEPTION_HANDLE(res, "socket recv handle", "error", "TCP disconnected");
  32.  
  33. /* 有接收到数据,把末端清零 */
  34. sock_buff[res] = '\0';
  35.  
  36. /* 通过回调函数的方式,数据发给 thread1 */
  37. RX_CB_HANDLE(sock_buff, res);
  38.  
  39. /* 如果接收的是exit,关闭这个连接 */
  40. EXIT_HANDLE(sock_buff);
  41. }
  42.  
  43. /* 查看 pipe 描述符上有没有发生可读事件 */
  44. if (FD_ISSET(thiz->pipe_read_fd, &fds))
  45. {
  46. /* 从 pipe 连接中接收最大BUFSZ - 1字节数据 */
  47. res = read(thiz->pipe_read_fd, pipe_buff, BUFF_SIZE);
  48.  
  49. /* recv 返回异常 */
  50. EXCEPTION_HANDLE(res, "pipe recv handle", "error", RT_NULL);
  51.  
  52. /* 有接收到数据,把末端清零 */
  53. pipe_buff[res] = '\0';
  54.  
  55. /* 读取 pipe 的数据,转发给 server */
  56. send(thiz->sock_fd, pipe_buff, res, 0);
  57.  
  58. /* recv 返回异常 */
  59. EXCEPTION_HANDLE(res, "socket write handle", "error", "warning");
  60.  
  61. /* 如果接收的是 exit,关闭这个连接 */
  62. EXIT_HANDLE(pipe_buff);
  63. }
  64. }
  65.  
  66. exit:
  67. /* 释放接收缓冲 */
  68. free(pipe_buff);
  69. free(sock_buff);
  70. }

这段代码是 tcpclient 线程的核心部分,按照例程配置 select,根据 FD_ISSET() 宏检查描述符。

  • 假如 socket 有数据可读,采用回调函数的方式把数据发送给应用线程。
  • 假如 pipe 有数据可读,处理数据,通过 socket 发送到服务器。

    参考

  • README.md

  • tcpclient 源码
  • example 源码
    下面将介绍 example.c 源码运行情况以及 tcpclient.c 的使用方法。

example 运行情况

运行前的准备工作

首先在 github 上拉取 tcpclient.c 的源码

  • tcpclient 文件夹 放置 rt-thread\bsp\qemu-vexpress-a9目录下,详情如下:添加到QEMU工程

  • 在 env 里使用 scons 命令编译 QEMU 工程,详情如下:

QEMU工程编译

  • 在 env 里使用 .\qemu.bat 命令启动,详情如下:
    QEMU启动

QEMU 成功启动,下面来介绍代码运行情况。

TCP 连接前的准备工作

  • TCP 测试前的准备,设置网络调试助手端口号,详情如下:
    网络助手设置

  • 在 env 里输入 ipconfig 查看本机 ip 地址,详情如下:

  1. > ipconfig
  2. ...
  3. IPv4 Address. . . . . . . . . . . : 192.168.12.53
  4. ...
  • 通过 rt_tcpclient_start() API 设置服务器 IP 地址和端口号,详情如下:
  1. rt_tcpclient_start("192.168.12.53", 9008);
  • 在 msh /> 里,输入 rt_tc_test_init 详情如下:
  1. msh />rt_tc_test_init

example 基于 QEMU 的运行情况

  • 运行效果图及其描述
    在 example.c 里建立两个线程,一个是 thread1,另一个是 thread2,两个线程交替给服务端发送数据。

服务端每秒钟往客户端发送数据

运行效果图

  • 网络助手发送 i am server ,thread1 接收并且打印出来,详情如下:
  1. msh />D/tc_rx_cb [-30-01-01 00:00:00 tcpc] (packages\tcpclient\examples\tcpclient_example.c:52)recv data: i am server

总结

  • select() 也是阻塞模式,它的好处在于可以同时选择多个数据源通道:只要通道里数据有效时,就可以进行操作;而在没有数据需要处理时,则操作线程会被挂起。
  • 通过使用 pipe / select 的方式,让 tcpclient 网络任务实现了在等待网络数据的同时额外处理其他消息的目的。

原文: https://www.rt-thread.org/document/site/rtthread-application-note/components/network/an0019-rtthread-system-tcpclient-socket/