3 延时消息样例

3.1 启动消费者等待传入订阅消息

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import java.util.List;
  7. public class ScheduledMessageConsumer {
  8. public static void main(String[] args) throws Exception {
  9. // 实例化消费者
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
  11. // 订阅Topics
  12. consumer.subscribe("TestTopic", "*");
  13. // 注册消息监听者
  14. consumer.registerMessageListener(new MessageListenerConcurrently() {
  15. @Override
  16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
  17. for (MessageExt message : messages) {
  18. // Print approximate delay time period
  19. System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
  20. }
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  22. }
  23. });
  24. // 启动消费者
  25. consumer.start();
  26. }
  27. }

3.2 发送延时消息

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.common.message.Message;
  3. public class ScheduledMessageProducer {
  4. public static void main(String[] args) throws Exception {
  5. // 实例化一个生产者来产生延时消息
  6. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  7. // 启动生产者
  8. producer.start();
  9. int totalMessagesToSend = 100;
  10. for (int i = 0; i < totalMessagesToSend; i++) {
  11. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  12. // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  13. message.setDelayTimeLevel(3);
  14. // 发送消息
  15. producer.send(message);
  16. }
  17. // 关闭生产者
  18. producer.shutdown();
  19. }
  20. }

3.3 验证

您将会看到消息的消费比存储时间晚10秒。

3.4 延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

3.5 延时消息的使用限制

  1. // org/apache/rocketmq/store/config/MessageStoreConfig.java
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java