消息队列

消息队列是另一种常用的线程间通讯方式,它能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。其他线程也能够从消息队列中读取相应的消息,而当消息队列是空的时候,可以挂起读取线程。当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。

消息队列主要操作包括:通过函数mq_open()创建或者打开,调用mq_send()发送一条消息到消息队列,调用mq_receive()从消息队列获取一条消息,当消息队列不在使用时,可以调用mq_unlink()删除消息队列。

POSIX消息队列主要用于进程间通信,RT-Thread操作系统的POSIX消息队列主要是基于RT-Thread内核消息队列的一个封装,主要还是用于系统内线程间的通讯。使用方式和RT-Thread内核的消息队列差不多。

消息队列控制块

每个消息队列对应一个消息队列控制块,创建消息队列前需要先定义一个消息队列控制块。消息队列控制块定义在mqueue.h头文件里。

  1. struct mqdes
  2. {
  3. rt_uint16_t refcount; /* 引用计数 */
  4. rt_uint16_t unlinked; /* 消息队列的分离状态,值为1表示消息队列已经分离 */
  5. rt_mq_t mq; /* RT-Thread 消息队列控制块 */
  6. struct mqdes* next; /* 指向下一个消息队列控制块 */
  7. };
  8. typedef struct mqdes* mqd_t; /* 消息队列控制块指针类型重定义 */

创建或打开消息队列

函数原型

  1. mqd_t mq_open(const char *name, int oflag, ...);

  1. 参数 描述

  1. name 消息队列名称
  2.  
  3. oflag 消息队列打开方式

函数返回

成功则返回消息队列句柄,否则返回NULL。

此函数会根据消息队列的名字name创建一个新的消息队列或者打开一个已经存在的消息队列。Oflag的可选值有0、O_CREAT或O_CREAT|O_EXCL。如果Oflag设置为O_CREAT则会创建一个新的消息队列。如果Oflag设置O_CREAT|O_EXCL,如果消息队列已经存在则会返回NULL,如果不存在则会创建一个新的消息队列。如果Oflag设置为0,消息队列不存在则会返回NULL。

分离消息队列

函数原型

  1. int mq_unlink(const char *name);

  1. 参数 描述

  1. name 消息队列名称

函数返回

成功返回0,若消息队列不存在则返回-1。

此函数会根据消息队列名称name查找消息队列,若找到,则将消息队列置为分离状态,之后若持有计数为0,则删除消息队列,并释放消息队列占有的资源。

关闭消息队列

函数原型

  1. int mq_close(mqd_t mqdes);

  1. 参数 描述

  1. mqdes 消息队列句柄

函数返回

成功返回0,否则返回-1。

当一个线程终止时,会对其占用的消息队列执行此关闭操作。不论线程是自愿终止还是非自愿终止都会执行这个关闭操作,相当于是消息队列的持有计数减1,若减1后持有计数为0,且消息队列处于分离状态,则会删除mqdes消息队列并释放其占有的资源。

阻塞方式发送消息

函数原型

  1. int mq_send(mqd_t mqdes,
  2. const char *msg_ptr,
  3. size_t msg_len,
  4. unsigned msg_prio);

  1. 参数 描述

  1. mqdes 消息队列句柄,不能为NULL
  2.  
  3. sg_ptr 指向要发送的消息的指针,不能为NULL
  4.  
  5. msg_len 发送的消息的长度
  6.  
  7. msg_prio Rt-thread未实现参数

函数返回

成功返回0,否则返回-1。

此函数用来向mqdes消息队列发送一条消息,是rt_mq_send()函数的封装。此函数把msg_ptr指向的消息添加到mqdes消息队列中,发送的消息长度msg_len必须小于或者等于创建消息队列时设置的最大消息长度。

如果消息队列已经满,即消息队列中的消息数量等于最大消息数,发送消息的的线程或者中断程序会收到一个错误码(-RT_EFULL)。

指定阻塞时间发送消息

函数原型

  1. int mq_timedsend(mqd_t mqdes,
  2. const char *msg_ptr,
  3. size_t msg_len,
  4. unsigned msg_prio,
  5. const struct timespec *abs_timeout);

  1. 参数 描述

  1. mqdes 消息队列句柄,不能为NULL
  2.  
  3. msg_ptr 指向要发送的消息的指针,不能为NULL
  4.  
  5. msg_len 发送的消息的长度
  6.  
  7. msg_prio Rt-thread未实现参数
  8.  
  9. abs_timeout 指定的等待时间,单位是操作系统时钟节拍(OS Tick

函数返回

成功返回0,否则返回-1。

目前RT-Thread不支持指定阻塞时间发送消息,但是函数接口已经实现,相当于调用mq_send()。

阻塞方式接受消息

函数原型

  1. ssize_t mq_receive(mqd_t mqdes,
  2. char *msg_ptr,
  3. size_t msg_len,
  4. unsigned *msg_prio);

  1. 参数 描述

  1. mqdes 消息队列句柄,不能为NULL
  2.  
  3. msg_ptr 指向要发送的消息的指针,不能为NULL
  4.  
  5. msg_len 发送的消息的长度
  6.  
  7. msg_prio Rt-thread未实现参数

函数返回

成功则返回消息长度,否则返回-1。

此函数会把mqdes消息队列里面最老的消息移除消息队列,并把消息放到msg_ptr指向的内存里。如果消息队列为空,调用mq_receive()函数的线程将会阻塞,直到消息队列中消息可用。

指定阻塞时间接受消息

函数原型

  1. ssize_t mq_timedreceive(mqd_t mqdes,
  2. char *msg_ptr,
  3. size_t msg_len,
  4. unsigned *msg_prio,
  5. const struct timespec *abs_timeout);

  1. 参数 描述

  1. mqdes 消息队列句柄,不能为NULL
  2.  
  3. msg_ptr 指向要发送的消息的指针,不能为NULL
  4.  
  5. msg_len 发送的消息的长度
  6.  
  7. msg_prio Rt-thread未实现参数
  8.  
  9. abs_timeout 指定的等待时间,单位是操作系统时钟节拍(OS Tick

函数返回

成功则返回消息长度,否则返回-1。

此函数和mq_receive()函数的区别在于,若消息队列为空,线程将阻塞abs_timeout时长,超时后函数直接返回-1,线程将被唤醒由阻塞态进入就绪态。

消息队列示例代码

这个程序会创建3个线程,线程2从消息队列接受消息,线程2和线程3往消息队列发送消息。

  1. #include <mqueue.h>
  2. #include <stdio.h>
  3.  
  4. /* 线程控制块 */
  5. static pthread_t tid1;
  6. static pthread_t tid2;
  7. static pthread_t tid3;
  8. /* 消息队列句柄 */
  9. static mqd_t mqueue;
  10.  
  11. /* 函数返回值检查函数 */
  12. static void check_result(char* str,int result)
  13. {
  14. if (0 == result)
  15. {
  16. printf("%s successfully!\n",str);
  17. }
  18. else
  19. {
  20. printf("%s failed! error code is %d\n",str,result);
  21. }
  22. }
  23. /*线程1入口函数*/
  24. static void* thread1_entry(void* parameter)
  25. {
  26. char buf[128];
  27. int result;
  28.  
  29. while (1)
  30. {
  31. /* 从消息队列中接收消息 */
  32. result = mq_receive(mqueue, &buf[0], sizeof(buf), 0);
  33. if (result != -1)
  34. {
  35. /* 输出内容 */
  36. printf("thread1 recv [%s]\n", buf);
  37. }
  38.  
  39. /* 休眠1秒*/
  40. // sleep(1);
  41. }
  42. }
  43. /*线程2入口函数*/
  44. static void* thread2_entry(void* parameter)
  45. {
  46. int i, result;
  47. char buf[] = "message2 No.x";
  48.  
  49. while (1)
  50. {
  51. for (i = 0; i < 10; i++)
  52. {
  53. buf[sizeof(buf) - 2] = '0' + i;
  54.  
  55. printf("thread2 send [%s]\n", buf);
  56. /* 发送消息到消息队列中 */
  57. result = mq_send(mqueue, &buf[0], sizeof(buf), 0);
  58. if ( result == -1)
  59. {
  60. /* 消息队列满, 延迟1s时间 */
  61. printf("thread2:message queue is full, delay 1s\n");
  62. sleep(1);
  63. }
  64. }
  65.  
  66. /* 休眠2秒*/
  67. sleep(2);
  68. }
  69. }
  70. /* 线程3入口函数 */
  71. static void* thread3_entry(void* parameter)
  72. {
  73. int i, result;
  74. char buf[] = "message3 No.x";
  75.  
  76. while (1)
  77. {
  78. for (i = 0; i < 10; i++)
  79. {
  80. buf[sizeof(buf) - 2] = '0' + i;
  81.  
  82. printf("thread3 send [%s]\n", buf);
  83. /* 发送消息到消息队列中 */
  84. result = mq_send(mqueue, &buf[0], sizeof(buf), 0);
  85. if ( result == -1)
  86. {
  87. /* 消息队列满, 延迟1s时间 */
  88. printf("thread3:message queue is full, delay 1s\n");
  89. sleep(1);
  90. }
  91. }
  92.  
  93. /* 休眠2秒*/
  94. sleep(2);
  95. }
  96. }
  97. /* 用户应用入口 */
  98. int rt_application_init()
  99. {
  100. int result;
  101. struct mq_attr mqstat;
  102. int oflag = O_CREAT|O_RDWR;
  103. #define MSG_SIZE 128
  104. #define MAX_MSG 128
  105. memset(&mqstat, 0, sizeof(mqstat));
  106. mqstat.mq_maxmsg = MAX_MSG;
  107. mqstat.mq_msgsize = MSG_SIZE;
  108. mqstat.mq_flags = 0;
  109. mqueue = mq_open("mqueue1",O_CREAT,0777,&mqstat);
  110.  
  111. /*创建线程1,线程入口是thread1_entry, 属性参数设为NULL选择默认值,入口参数为NULL*/
  112. result = pthread_create(&tid1,NULL,thread1_entry,NULL);
  113. check_result("thread1 created",result);
  114.  
  115. /*创建线程2,线程入口是thread2_entry, 属性参数设为NULL选择默认值,入口参数为NULL*/
  116. result = pthread_create(&tid2,NULL,thread2_entry,NULL);
  117. check_result("thread2 created",result);
  118.  
  119. /*创建线程3,线程入口是thread3_entry, 属性参数设为NULL选择默认值,入口参数为NULL*/
  120. result = pthread_create(&tid3,NULL,thread3_entry,NULL);
  121. check_result("thread3 created",result);
  122.  
  123.  
  124. return 0;
  125. }