5 过滤消息样例

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
  2. consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

  1. ------------
  2. | message |
  3. |----------| a > 5 AND b = 'abc'
  4. | a = 10 | --------------------> Gotten
  5. | b = 'abc'|
  6. | c = true |
  7. ------------
  8. ------------
  9. | message |
  10. |----------| a > 5 AND b = 'abc'
  11. | a = 1 | --------------------> Missed
  12. | b = 'abc'|
  13. | c = true |
  14. ------------

5.1 基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:\=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

  1. public void subscribe(finalString topic, final MessageSelector messageSelector)

5.2 使用样例

1、生产者样例

发送消息时,你能通过putUserProperty来设置消息的属性

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.start();
  3. Message msg = new Message("TopicTest",
  4. tag,
  5. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
  6. );
  7. // 设置一些属性
  8. msg.putUserProperty("a", String.valueOf(i));
  9. SendResult sendResult = producer.send(msg);
  10. producer.shutdown();

2、消费者样例

用MessageSelector.bySql来使用sql筛选消息

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
  2. // 只有订阅的消息有这个属性a, a >=0 and a <= 3
  3. consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
  4. consumer.registerMessageListener(new MessageListenerConcurrently() {
  5. @Override
  6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  7. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  8. }
  9. });
  10. consumer.start();