Bridge data to RabbitMQ

Setup a RabbitMQ, taking Mac OSX for instance:

  1. $ brew install rabbitmq
  2. # start rabbitmq
  3. $ rabbitmq-server

Create a rule:

Go to EMQX DashboardBridge data to RabbitMQ - 图1 (opens new window), select the “rule” tab on the menu to the left.

Select “message.publish”, then type in the following SQL:

  1. SELECT
  2. *
  3. FROM
  4. "message.publish"

image

Bind an action:

  1. Click on the "+ Add" button under "Action Handler", and then select
  2. "Data bridge to RabbitMQ" in the pop-up dialog window.

image

Fill in the parameters required by the action:

Two parameters is required by action “Data bridge to RabbitMQ”:

1). RabbitMQ Exchange. Here set it to “messages”

2). RabbitMQ Exchange Type. Here set it to “topic”

3). RabbitMQ Routing Key. Here set it to “test”

image

4). Bind a resource to the action. Since the dropdown list “Resource” is empty for now, we create a new resource by clicking on the “New Resource” to the top right, and then select “RabbitMQ”:

image

Configure the resource:

Set “RabbitMQ Server” to “127.0.0.1:5672”, and keep all other configs as default, and click on the “Testing Connection” button to make sure the connection can be created successfully, and then click on the “Create” button.

image

Back to the “Actions” dialog, and then click on the “Confirm” button.

image

Back to the creating rule page, then click on “Create” button. The rule we created will be show in the rule list:

image

We have finished, testing the rule by sending an MQTT message to emqx:

  1. Topic: "t/1"
  2. QoS: 0
  3. Retained: false
  4. Payload: "Hello, World\!"

Write an AMQP Client to consume the messages, following is the one written in python:

  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(
  4. pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='messages', exchange_type='topic')
  7. result = channel.queue_declare(queue='', exclusive=True)
  8. queue_name = result.method.queue
  9. channel.queue_bind(exchange='messages', queue=queue_name, routing_key='test')
  10. print('[*] Waiting for messages. To exit press CTRL+C')
  11. def callback(ch, method, properties, body):
  12. print(" [x] %r" % body)
  13. channel.basic_consume(
  14. queue=queue_name, on_message_callback=callback, auto_ack=True)
  15. channel.start_consuming()

image

And from the rule list, verify that the “Matched” column has increased to 1:

image