Bridge Data into RocketMQ

EMQX supports bridging data into RocketMQ, so you can forward MQTT messages and client events to RocketMQ, for example, use events to trigger the update of data to record the online status or online/offline of clients.

TIP

EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platformBridge Data into RocketMQ - 图1 (opens new window) today.

Prerequisites

Features List

Quick Start Tutorial

This section introduces how to configure the RocketMQ data bridge, covering topics like how to set up the RocketMQ server, create data bridges and rules for forwarding data to RocketMQ and test the data bridges and rules.

This tutorial assumes that you run both EMQX and RocketMQ on the local machine. If you have RocketMQ and EMQX running remotely, adjust the settings accordingly.

Install RocketMQ

  1. Prepare a docker-compose file, rocketmq.yaml, to set up the RocketMQ.
  1. version: '3.9'
  2. services:
  3. mqnamesrv:
  4. image: apache/rocketmq:4.9.4
  5. container_name: rocketmq_namesrv
  6. ports:
  7. - 9876:9876
  8. volumes:
  9. - ./rocketmq/logs:/opt/logs
  10. - ./rocketmq/store:/opt/store
  11. command: ./mqnamesrv
  12. mqbroker:
  13. image: apache/rocketmq:4.9.4
  14. container_name: rocketmq_broker
  15. ports:
  16. - 10909:10909
  17. - 10911:10911
  18. volumes:
  19. - ./rocketmq/logs:/opt/logs
  20. - ./rocketmq/store:/opt/store
  21. - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
  22. environment:
  23. NAMESRV_ADDR: "rocketmq_namesrv:9876"
  24. JAVA_OPTS: " -Duser.home=/opt"
  25. JAVA_OPT_EXT: "-server -Xms1024m -Xmx1024m -Xmn1024m"
  26. command: ./mqbroker -c /etc/rocketmq/broker.conf
  27. depends_on:
  28. - mqnamesrv
  1. Prepare the folders and configurations required for running RocetMQ.
  1. mkdir rocketmq
  2. mkdir rocketmq/logs
  3. mkdir rocketmq/store
  4. mkdir rocketmq/conf
  1. Save the below content into rocketmq/conf/broker.conf.
  1. brokerClusterName=DefaultCluster
  2. brokerName=broker-a
  3. brokerId=0
  4. brokerIP1=change me to your real IP address
  5. defaultTopicQueueNums=4
  6. autoCreateTopicEnable=true
  7. autoCreateSubscriptionGroup=true
  8. listenPort=10911
  9. deleteWhen=04
  10. fileReservedTime=120
  11. mapedFileSizeCommitLog=1073741824
  12. mapedFileSizeConsumeQueue=300000
  13. diskMaxUsedSpaceRatio=100
  14. maxMessageSize=65536
  15. brokerRole=ASYNC_MASTER
  16. flushDiskType=ASYNC_FLUSH
  1. Start the server.
  1. docker-compose -f rocketmq.yaml up
  1. Start a consumer.
  1. docker run --rm -e NAMESRV_ADDR=host.docker.internal:9876 apache/rocketmq:4.9.4 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

TIP

In Linux, you should change the host.docker.internal to your real IP address.

Create RocketMQ Data Bridge

  1. Go to EMQX Dashboard, and click Integration -> Data Bridge.

  2. Click Create on the top right corner of the page.

  3. In the Create Data Bridge page, click to select RocketMQ, and then click Next.

  4. Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

  5. Input the connection information. Input 127.0.0.1:9876 as the Server, TopicTest as the Topic, and leave others as default.

  6. Leave the Template empty by default.

    TIP

    When this value is empty the whole message will be forwarded to the RocketMQ. The actual value is JSON template data.

  7. Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see Integration.

  8. Before clicking Create, you can click Test Connectivity to test that the bridge can connect to the RocketMQ server.

  9. Then click Create to finish the creation of the data bridge.

    A confirmation dialog will appear and ask if you like to create a rule using this data bridge, you can click Create Rule to continue creating rules to specify the data to be saved into RocketMQ. You can also create rules by following the steps in Create Rules for RocketMQ Data Bridge.

Now the RocketMQ data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.

Create Rules for RocketMQ Data Bridge

Now that you have successfully created the data bridge to RocketMQ, you can continue to create rules to specify the data to be saved into RocketMQ. You need to create two different rules for messages forward and event records.

  1. Go to EMQX Dashboard, and click Integration -> Rules.

  2. Click Create on the top right corner of the page.

  3. Input my_rule as the rule ID, and set the rules in the SQL Editor based on the feature to use:

    • To create a rule for message storage, input the following statement, which means the MQTT messages under topic t/# will be saved to RocketMQ.

      Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT part.

      1. SELECT
      2. *
      3. FROM
      4. "t/#"
    • To create a rule for online/offline status recording, input the following statement:

      1. SELECT
      2. *
      3. FROM
      4. "$events/client_connected", "$events/client_disconnected"

      TIP

      For convenience, the TopicTest topic will be reused to receive online/offline events.

  4. Click the Add Action button, select Forwarding with Data Bridge from the dropdown list, and then select the data bridge you just created under Data Bridge. Click the Add button.

  5. Click the Create button to finish the setup.

Now you have successfully created the data bridge to RocketMQ. You can click Integration -> Flows to view the topology. It can be seen that the messages under topic t/# are sent and saved to RocketMQ after parsing by rule my_rule.

Test Data Bridge and Rule

Use MQTTX to send a message to topic t/1 to trigger an online/offline event.

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello RocketMQ" }'

Check the running status of the data bridge, there should be one new incoming and one new outgoing message.

Check whether the data is forwarded to the TopicTest topic.

The following data will be printed by the consumer.

  1. ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=581, queueOffset=0, sysFlag=0, bornTimestamp=1679037578889, bornHost=/172.26.83.106:43920, storeTimestamp=1679037578891, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F000000000000060E, commitLogOffset=1550, bodyCRC=7414108, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]
  2. ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=511, queueOffset=1, sysFlag=0, bornTimestamp=1679037580174, bornHost=/172.26.83.106:43920, storeTimestamp=1679037580176, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F0000000000000E61, commitLogOffset=3681, bodyCRC=1604860416, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]
  3. ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=458, queueOffset=2, sysFlag=0, bornTimestamp=1679037584933, bornHost=/172.26.83.106:43920, storeTimestamp=1679037584934, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F000000000000166E, commitLogOffset=5742, bodyCRC=383397630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]