消息队列

消息队列是另一种常用的线程间通讯方式,它能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。其他线程也能够从消息队列中读取相应的消息,而当消息队列是空的时候,可以挂起读取线程。当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。消息队列是一种异步的通信方式。

如图 消息队列的工作示意图 所示,通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常应将先进入消息队列的消息先传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出原则(FIFO)。

消息队列的工作示意图

RT-Thread操作系统的消息队列对象由多个元素组成,当消息队列被创建时,它就被分配了消息队列控制块:消息队列名称、内存缓冲区、消息大小以及队列长度等。同时每个消息队列对象中包含着多个消息框,每个消息框可以存放一条消息;消息队列中的第一个和最后一个消息框被分别称为消息链表头和消息链表尾,对应于消息队列控制块中的msg_queue_head和msg_queue_tail;有些消息框可能是空的,它们通过msg_queue_free形成一个空闲消息框链表。所有消息队列中的消息框总数即是消息队列的长度,这个长度可在消息队列创建时指定。

消息队列控制块

  1. struct rt_messagequeue
  2. {
  3. struct rt_ipc_object parent;
  4.  
  5. void* msg_pool; /* 存放消息的消息池开始地址 */
  6.  
  7. rt_uint16_t msg_size; /* 每个消息的长度*/
  8. rt_uint16_t max_msgs; /* 最大能够容纳的消息数*/
  9.  
  10. rt_uint16_t entry; /* 队列中已有的消息数*/
  11.  
  12. void* msg_queue_head; /* 消息链表头*/
  13. void* msg_queue_tail; /* 消息链表尾*/
  14. void* msg_queue_free; /* 空闲消息链表*/
  15. };
  16. typedef struct rt_messagequeue* rt_mq_t;

rt_messagequeue对象从rt_ipc_object中派生,由IPC容器管理。

消息队列相关接口

创建消息队列

消息队列在使用前,应该被创建出来,或对已有的静态消息队列对象进行初始化,创建消息队列的函数接口如下所示:

  1. rt_mq_t rt_mq_create(const char* name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag);

创建消息队列时先创建一个消息队列对象控制块,然后给消息队列分配一块内存空间,组织成空闲消息链表,这块内存的大小等于[消息大小+消息头(用于链表连接)]与消息队列容量的乘积,接着再初始化消息队列,此时消息队列为空。

函数参数


  1. 参数 描述

  1. name 消息队列的名称;
  2.  
  3. msg_size 消息队列中一条消息的最大长度;
  4.  
  5. max_msgs 消息队列的最大容量;
  6.  
  7. flag 消息队列采用的等待方式,可以取值:

  1. #define RT_IPC_FLAG_FIFO 0x00 /* IPC参数采用FIFO方式*/
  2. #define RT_IPC_FLAG_PRIO 0x01 /* IPC参数采用优先级方式*/

函数返回

成功创建返回消息队列对象的句柄;否则返回-RT_ERROR。

删除消息队列

当消息队列不再被使用时,应该删除它以释放系统资源,一旦操作完成,消息队列将被永久性的删除。删除消息队列的函数接口如下:

  1. rt_err_t rt_mq_delete(rt_mq_t mq);

删除消息队列时,如果有线程被挂起在该消息队列等待队列上,则内核先唤醒挂起在该消息等待队列上的所有线程(返回值是 -RT_ERROR),然后再释放消息队列使用的内存,最后删除消息队列对象。

函数参数


  1. 参数 描述

  1. mq 消息队列对象的句柄;

函数返回

RT_EOK

初始化消息队列

初始化静态消息队列对象跟创建消息队列对象类似,只是静态消息队列对象的内存是在系统编译时由编译器分配的,一般放于数据段或ZI段中。在使用这类静态消息队列对象前,需要进行初始化。初始化消息队列对象的函数接口如下:

  1. rt_err_t rt_mq_init(rt_mq_t mq, const char* name, void *msgpool, rt_size_t msg_size, rt_size_t pool_size,
  2. rt_uint8_t flag);

初始化消息队列时,该接口需要获得消息队列对象的句柄(即指向消息队列对象控制块的指针)、消息队列名、消息缓冲区指针、消息大小以及消息队列容量。如图 消息队列的工作示意图 所示,消息队列初始化后所有消息都挂在空闲消息链表上,消息队列为空。

函数参数


  1. 参数 描述

  1. mq 指向静态消息队列对象的句柄;
  2.  
  3. name 消息队列的名称;
  4.  
  5. msgpool 用于存放消息的缓冲区;
  6.  
  7. msg_size 消息队列中一条消息的最大长度;
  8.  
  9. pool_size 存放消息的缓冲区大小;
  10.  
  11. flag 消息队列采用的等待方式,可以取值:

  1. #define RT_IPC_FLAG_FIFO 0x00 /* IPC参数采用FIFO方式*/
  2. #define RT_IPC_FLAG_PRIO 0x01 /* IPC参数采用优先级方式*/

函数返回

RT_EOK

脱离消息队列

脱离消息队列将使消息队列对象被从内核对象管理器中删除。脱离消息队列使用下面的接口:

  1. rt_err_t rt_mq_detach(rt_mq_t mq);

使用该函数接口后,内核先唤醒所有挂在该消息等待队列对象上的线程(返回值是- RT_ERROR ),然后将该消息队列对象从内核对象管理器中删除。

函数参数


  1. 参数 描述

  1. mq 指向静态消息队列对象的句柄;

函数返回

RT_EOK

发送消息

线程或者中断服务程序都可以给消息队列发送消息。当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块,说明消息队列已满,此时,发送消息的的线程或者中断程序会收到一个错误码(-RT_EFULL)。发送消息的函数接口如下:

  1. rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size);

发送消息时,发送者需指定发送到的消息队列的对象句柄(即指向消息队列控制块的指针),并且指定发送的消息内容以及消息大小。如图 消息队列的工作示意图 所示,在发送一个普通消息之后,空闲消息链表上的队首消息被转移到了消息队列尾。

函数参数


  1. 参数 描述

  1. mq 消息队列对象的句柄;
  2.  
  3. buffer 消息内容;
  4.  
  5. size 消息大小。

函数返回

发送成功返回RT_EOK,如果消息队列已满返回-RT_EFULL。

发送紧急消息

发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,从空闲消息链表上取下来的消息块不是挂到消息队列的队尾,而是挂到队首,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。发送紧急消息的函数接口如下:

  1. rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size);

参数:

函数参数


  1. 参数 描述

  1. mq 消息队列对象的句柄;
  2.  
  3. buffer 消息内容;
  4.  
  5. size 消息大小。

函数返回

发送成功返回RT_EOK,如果消息队列已满返回-RT_EFULL。

接收消息

当消息队列中有消息时,接收者才能接收消息,否则接收者会根据超时时间设置或挂起在消息队列的等待线程队列上,或直接返回。接收消息函数接口如下:

  1. rt_err_t rt_mq_recv (rt_mq_t mq, void* buffer, rt_size_t size, rt_int32_t timeout);

接收消息时,接收者需指定存储消息的消息队列对象句柄,并且指定一个内存缓冲区,接收到的消息内容将被复制到该缓冲区里。此外,还需指定未能及时取到消息时的超时时间。如图 消息队列的工作示意图 所示,接收一个消息后消息队列上的队首消息被转移到了空闲消息链表的尾部。

函数参数


  1. 参数 描述

  1. mq 消息队列对象的句柄;
  2.  
  3. buffer 用于接收消息的数据块;
  4.  
  5. size 消息大小;
  6.  
  7. timeout 指定的超时时间。

函数返回

成功收到返回RT_EOK,超时返回-RT_ETIMEOUT,其他返回-RT_ERROR。

使用消息队列的例程如下例所示:

  1. /*
  2. * 程序清单:消息队列例程
  3. *
  4. * 这个程序会创建3个动态线程:
  5. * 一个线程会从消息队列中收取消息;
  6. * 一个线程会定时给消息队列发送消息;
  7. * 一个线程会定时给消息队列发送紧急消息。
  8. */
  9. #include <rtthread.h>
  10. #include "tc_comm.h"
  11.  
  12. /* 指向线程控制块的指针 */
  13. static rt_thread_t tid1 = RT_NULL;
  14. static rt_thread_t tid2 = RT_NULL;
  15. static rt_thread_t tid3 = RT_NULL;
  16.  
  17. /* 消息队列控制块 */
  18. static struct rt_messagequeue mq;
  19. /* 消息队列中用到的放置消息的内存池 */
  20. static char msg_pool[2048];
  21.  
  22. /* 线程1入口函数 */
  23. static void thread1_entry(void* parameter)
  24. {
  25. char buf[128];
  26.  
  27. while (1)
  28. {
  29. rt_memset(&buf[0], 0, sizeof(buf));
  30.  
  31. /* 从消息队列中接收消息 */
  32. if (rt_mq_recv(&mq, &buf[0], sizeof(buf), RT_WAITING_FOREVER)
  33. == RT_EOK)
  34. {
  35. /* 输出内容 */
  36. rt_kprintf("thread1: recv a msg, the content:%s\n", buf);
  37. }
  38.  
  39. /* 延迟10个OS Tick */
  40. rt_thread_delay(10);
  41. }
  42. }
  43.  
  44. /* 线程2入口函数 */
  45. static void thread2_entry(void* parameter)
  46. {
  47. int i, result;
  48. char buf[] = "this is message No.x";
  49.  
  50. while (1)
  51. {
  52. for (i = 0; i < 10; i++)
  53. {
  54. buf[sizeof(buf) - 2] = '0' + i;
  55.  
  56. rt_kprintf("thread2: send message - %s\n", buf);
  57. /* 发送消息到消息队列中 */
  58. result = rt_mq_send(&mq, &buf[0], sizeof(buf));
  59. if ( result == -RT_EFULL)
  60. {
  61. /* 消息队列满, 延迟1s时间 */
  62. rt_kprintf("message queue full, delay 1s\n");
  63. rt_thread_delay(100);
  64. }
  65. }
  66.  
  67. /* 延时10个OS Tick */
  68. rt_thread_delay(10);
  69. }
  70. }
  71.  
  72. /* 线程3入口函数 */
  73. static void thread3_entry(void* parameter)
  74. {
  75. char buf[] = "this is an urgent message!";
  76.  
  77. while (1)
  78. {
  79. rt_kprintf("thread3: send an urgent message\n");
  80.  
  81. /* 发送紧急消息到消息队列中 */
  82. rt_mq_urgent(&mq, &buf[0], sizeof(buf));
  83.  
  84. /* 延时25个OS Tick */
  85. rt_thread_delay(25);
  86. }
  87. }
  88.  
  89. int messageq_simple_init()
  90. {
  91. /* 初始化消息队列 */
  92. rt_mq_init(&mq, "mqt",
  93. &msg_pool[0], /* 内存池指向msg_pool */
  94. 128 - sizeof(void*), /* 每个消息的大小是 128 - void* */
  95. sizeof(msg_pool), /* 内存池的大小是msg_pool的大小 */
  96. RT_IPC_FLAG_FIFO); /* 如果有多个线程等待,按照FIFO的方法分配消息 */
  97.  
  98. /* 创建线程1 */
  99. tid1 = rt_thread_create("t1",
  100. thread1_entry, /* 线程入口是thread1_entry */
  101. RT_NULL, /* 入口参数是RT_NULL */
  102. THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
  103. if (tid1 != RT_NULL)
  104. rt_thread_startup(tid1);
  105. else
  106. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  107.  
  108. /* 创建线程2 */
  109. tid2 = rt_thread_create("t2",
  110. thread2_entry, /* 线程入口是thread2_entry */
  111. RT_NULL, /* 入口参数是RT_NULL */
  112. THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
  113. if (tid2 != RT_NULL)
  114. rt_thread_startup(tid2);
  115. else
  116. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  117.  
  118. /* 创建线程3 */
  119. tid3 = rt_thread_create("t3",
  120. thread3_entry, /* 线程入口是thread3_entry */
  121. RT_NULL, /* 入口参数是RT_NULL */
  122. THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
  123. if (tid3 != RT_NULL)
  124. rt_thread_startup(tid3);
  125. else
  126. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  127.  
  128. return 0;
  129. }
  130.  
  131. #ifdef RT_USING_TC
  132. static void _tc_cleanup()
  133. {
  134. /* 调度器上锁,上锁后,将不再切换到其他线程,仅响应中断 */
  135. rt_enter_critical();
  136.  
  137. /* 删除线程 */
  138. if (tid1 != RT_NULL && tid1->stat != RT_THREAD_CLOSE)
  139. rt_thread_delete(tid1);
  140. if (tid2 != RT_NULL && tid2->stat != RT_THREAD_CLOSE)
  141. rt_thread_delete(tid2);
  142. if (tid3 != RT_NULL && tid3->stat != RT_THREAD_CLOSE)
  143. rt_thread_delete(tid3);
  144.  
  145. /* 执行消息队列对象脱离 */
  146. rt_mq_detach(&mq);
  147.  
  148. /* 调度器解锁 */
  149. rt_exit_critical();
  150.  
  151. /* 设置TestCase状态 */
  152. tc_done(TC_STAT_PASSED);
  153. }
  154.  
  155. int _tc_messageq_simple()
  156. {
  157. /* 设置TestCase清理回调函数 */
  158. tc_cleanup(_tc_cleanup);
  159. messageq_simple_init();
  160.  
  161. /* 返回TestCase运行的最长时间 */
  162. return 100;
  163. }
  164. /* 输出函数命令到finsh shell中 */
  165. FINSH_FUNCTION_EXPORT(_tc_messageq_simple, a message queue example);
  166. #else
  167. /* 用户应用入口 */
  168. int rt_application_init()
  169. {
  170. messageq_simple_init();
  171.  
  172. return 0;
  173. }
  174. #endif

使用场合

消息队列可以应用于发送不定长消息的场合,包括线程与线程间的消息交换,以及中断服务例程中发送给线程的消息(中断服务例程不可能接收消息)。

典型使用

消息队列和邮箱的明显不同是消息的长度并不限定在4个字节以内,另外消息队列也包括了一个发送紧急消息的函数接口。但是当创建的是一个所有消息的最大长度是4字节的消息队列时,消息队列对象将蜕化成邮箱。这个不限定长度的消息,也及时的反应到了代码编写的场合上,同样是类似邮箱的代码:

  1. struct msg
  2. {
  3. rt_uint8_t *data_ptr; /* 数据块首地址 */
  4. rt_uint32_t data_size; /* 数据块大小 */
  5. };

和邮箱例子相同的消息结构定义,假设依然需要发送这么一个消息给接收线程。在邮箱例子中,这个结构只能够发送指向这个结构的指针(在函数指针被发送过去后,接收线程能够正确的访问指向这个地址的内容,通常这块数据需要留给接收线程来释放)。而使用消息队列的方式则大不相同:

  1. void send_op(void *data, rt_size_t length)
  2. {
  3. struct msg msg_ptr;
  4.  
  5. msg_ptr.data_ptr = data; /* 指向相应的数据块地址 */
  6. msg_ptr.data_size = length; /* 数据块的长度 */
  7.  
  8. /* 发送这个消息指针给mq消息队列 */
  9. rt_mq_send(mq, (void*)&msg_ptr, sizeof(struct msg));
  10. }

注意,上面的代码中,是把一个局部变量的数据内容发送到了消息队列中。在接收线程中,同样也采用局部变量进行消息接收的结构体:

  1. void message_handler()
  2. {
  3. struct msg msg_ptr; /* 用于放置消息的局部变量 */
  4.  
  5. /* 从消息队列中接收消息到msg_ptr中 */
  6. if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg)) == RT_EOK)
  7. {
  8. /* 成功接收到消息,进行相应的数据处理 */
  9. }
  10. }

因为消息队列是直接的数据内容复制,所以在上面的例子中,都采用了局部变量的方式保存消息结构体,这样也就免去动态内存分配的烦恼了(也就不用担心,接收线程在接收到消息时,消息内存空间已经被释放)。

同步消息

在一般的系统设计中会经常遇到要发送同步消息的问题,这个时候就可以根据当时的状态选择相应的实现:两个线程间可以采用[消息队列+信号量或邮箱]的形式实现。发送线程通过消息发送的形式发送相应的消息给消息队列,发送完毕后希望获得接收线程的收到确认,工作示意图如图 同步消息发送 所示:

同步消息发送

根据消息确认的不同,可以把消息结构体定义成:

  1. struct msg
  2. {
  3. /* 消息结构其他成员 */
  4. struct rt_mailbox ack;
  5. };
  6. /* 或者 */
  7. struct msg
  8. {
  9. /* 消息结构其他成员 */
  10. struct rt_semaphore ack;
  11. };

第一种类型的消息使用了邮箱来作为确认标志,而第二种类型的消息采用了信号量来作为确认标志。邮箱做为确认标志,代表着接收线程能够通知一些状态值给发送线程;而信号量作为确认标志只能够单一的通知发送线程,消息已经确认接收。