Pulsar Bridge

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Pulsar to setup pulasr bridges in rule engine.

EMQX bridges and forwards MQTT messages to Pulsar cluster:

image

Config file for Pulsar bridge plugin: etc/plugins/emqx_bridge_pulsar.conf

Configure Pulsar Cluster

  1. ## Pulsar Server
  2. bridge.pulsar.servers = 127.0.0.1:6650
  3. ## Pick a partition producer and sync/async
  4. bridge.pulsar.produce = sync
  5. ## bridge.pulsar.produce.sync_timeout = 3s
  6. ## bridge.pulsar.producer.batch_size = 1000
  7. ## by default, no compression
  8. ## bridge.pulsar.producer.compression = no_compression
  9. ## bridge.pulsar.encode_payload_type = base64
  10. ## bridge.pulsar.sock.buffer = 32KB
  11. ## bridge.pulsar.sock.recbuf = 32KB
  12. bridge.pulsar.sock.sndbuf = 1MB
  13. ## bridge.pulsar.sock.read_packets = 20

Configure Pulsar Bridge Hooks

  1. ## Bridge Pulsar Hooks
  2. ## ${topic}: the pulsar topics to which the messages will be published.
  3. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
  4. ## Client Connected Record Hook
  5. bridge.pulsar.hook.client.connected.1 = {"topic": "client_connected"}
  6. ## Client Disconnected Record Hook
  7. bridge.pulsar.hook.client.disconnected.1 = {"topic": "client_disconnected"}
  8. ## Session Subscribed Record Hook
  9. bridge.pulsar.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
  10. ## Session Unsubscribed Record Hook
  11. bridge.pulsar.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
  12. ## Message Publish Record Hook
  13. bridge.pulsar.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
  14. ## Message Delivered Record Hook
  15. bridge.pulsar.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
  16. ## Message Acked Record Hook
  17. bridge.pulsar.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
  18. ## More Configures
  19. ## partitioner strategy:
  20. ## Option: random | roundrobin | first_key_dispatch
  21. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
  22. ## key:
  23. ## Option: ${clientid} | ${username}
  24. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
  25. ## format:
  26. ## Option: json | json
  27. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}

Description of Pulsar Bridge Hooks

EventDescription
bridge.pulsar.hook.client.connected.1Client connected
bridge.pulsar.hook.client.disconnected.1Client disconnected
bridge.pulsar.hook.session.subscribed.1Topics subscribed
bridge.pulsar.hook.session.unsubscribed.1Topics unsubscribed
bridge.pulsar.hook.message.publish.1Messages published
bridge.pulsar.hook.message.delivered.1Messages delivered
bridge.pulsar.hook.message.acked.1Messages acknowledged

Forward Client Connected / Disconnected Events to Pulsar

Client goes online, EMQX forwards ‘client_connected’ event message to Pulsar:

  1. topic = "client_connected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "node": ${node},
  6. "ts": ${ts}
  7. }

Client goes offline, EMQX forwards ‘client_disconnected’ event message to Pulsar:

  1. topic = "client_disconnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "reason": ${reason},
  6. "node": ${node},
  7. "ts": ${ts}
  8. }

Forward Subscription Event to Pulsar

  1. topic = session_subscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

Forward Unsubscription Event to Pulsar

  1. topic = session_unsubscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

Forward MQTT Messages to Pulsar

  1. topic = message_publish
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

Forwarding MQTT Message Deliver Event to Pulsar

  1. topic = message_delivered
  2. value = {"client_id": ${clientid},
  3. "username": ${username},
  4. "from": ${fromClientId},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

Forwarding MQTT Message Ack Event to Pulsar

  1. topic = message_acked
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

Examples of Pulsar Message Consumption

Pulsar consumes MQTT clients connected / disconnected event messages:

  1. sh pulsar-client consume client_connected -s "client_connected" -n 1000
  2. sh pulsar-client consume client_disconnected -s "client_disconnected" -n 1000

Pulsar consumes MQTT subscription messages:

  1. sh pulsar-client consume session_subscribed -s "session_subscribed" -n 1000
  2. sh pulsar-client consume session_unsubscribed -s "session_unsubscribed" -n 1000

Pulsar consumes MQTT published messages:

  1. sh pulsar-client consume message_publish -s "message_publish" -n 1000

Pulsar consumes MQTT message Deliver and Ack event messages:

  1. sh pulsar-client consume message_delivered -s "message_delivered" -n 1000
  2. sh pulsar-client consume message_acked -s "message_acked" -n 1000

TIP

The payload is base64 encoded default

Enable Pulsar Bridge

  1. ./bin/emqx_ctl plugins load emqx_bridge_pulsar