Kafka Bridge

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Kafka to setup kafka bridges in rule engine.

EMQX bridges and forwards MQTT messages to Kafka cluster:

image

Config file for Kafka bridge plugin: etc/plugins/emqx_bridge_kafka.conf

Configure Kafka Cluster

  1. ## Kafka Server
  2. ## bridge.kafka.servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
  3. bridge.kafka.servers = 127.0.0.1:9092
  4. ## Kafka Partition Strategy. option value: per_partition | per_broker
  5. bridge.kafka.connection_strategy = per_partition
  6. bridge.kafka.min_metadata_refresh_interval = 5S
  7. ## Produce writes type. option value: sync | async
  8. bridge.kafka.produce = sync
  9. bridge.kafka.produce.sync_timeout = 3S
  10. ## Base directory for replayq to store messages on disk.
  11. ## If this config entry if missing or set to undefined,
  12. ## replayq works in a mem-only manner.
  13. ## i.e. messages are not queued on disk -- in such case,
  14. ## the send or send_sync API callers are responsible for
  15. ## possible message loss in case of application,
  16. ## network or kafka disturbances. For instance,
  17. ## in the wolff:send API caller may trap_exit then
  18. ## react on parition-producer worker pid's 'EXIT'
  19. ## message to issue a retry after restarting the producer.
  20. ## bridge.kafka.replayq_dir = /tmp/emqx_bridge_kafka/
  21. ## default=10MB, replayq segment size.
  22. ## bridge.kafka.producer.replayq_seg_bytes = 10MB
  23. ## producer required_acks. option value all_isr | leader_only | none.
  24. bridge.kafka.producer.required_acks = none
  25. ## default=10000. Timeout leader wait for replicas before reply to producer.
  26. ## bridge.kafka.producer.ack_timeout = 10S
  27. ## default number of message sets sent on wire before block waiting for acks
  28. ## bridge.kafka.producer.max_batch_bytes = 1024KB
  29. ## by default, send max 1 MB of data in one batch (message set)
  30. ## bridge.kafka.producer.min_batch_bytes = 0
  31. ## Number of batches to be sent ahead without receiving ack for the last request.
  32. ## Must be 0 if messages must be delivered in strict order.
  33. ## bridge.kafka.producer.max_send_ahead = 0
  34. ## by default, no compression
  35. # bridge.kafka.producer.compression = no_compression
  36. # bridge.kafka.encode_payload_type = base64
  37. # bridge.kafka.sock.buffer = 32KB
  38. # bridge.kafka.sock.recbuf = 32KB
  39. bridge.kafka.sock.sndbuf = 1MB
  40. # bridge.kafka.sock.read_packets = 20

Configure Kafka Bridge Hooks

  1. ## Bridge Kafka Hooks
  2. ## ${topic}: the kafka topics to which the messages will be published.
  3. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
  4. bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
  5. bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
  6. bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
  7. bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
  8. bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
  9. bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
  10. bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}

Description of Kafka Bridge Hooks

EventDescription
bridge.kafka.hook.client.connected.1Client connected
bridge.kafka.hook.client.disconnected.1Client disconnected
bridge.kafka.hook.session.subscribed.1Topics subscribed
bridge.kafka.hook.session.unsubscribed.1Topics unsubscribed
bridge.kafka.hook.message.publish.1Messages published
bridge.kafka.hook.message.delivered.1Messages delivered
bridge.kafka.hook.message.acked.1Messages acknowledged

Forward Client Connected / Disconnected Events to Kafka

Client goes online, EMQX forwards ‘client_connected’ event message to Kafka:

  1. topic = "client_connected",
  2. value = {
  3. "client_id": ${clientid},
  4. "node": ${node},
  5. "ts": ${ts}
  6. }

Client goes offline, EMQX forwards ‘client_disconnected’ event message to Kafka:

  1. topic = "client_disconnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "reason": ${reason},
  5. "node": ${node},
  6. "ts": ${ts}
  7. }

Forward Subscription Event to Kafka

  1. topic = session_subscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

Forward Unsubscription Event to Kafka

  1. topic = session_unsubscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

Forward MQTT Messages to Kafka

  1. topic = message_publish
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

Forwarding MQTT Message Deliver Event to Kafka

  1. topic = message_delivered
  2. value = {"client_id": ${clientid},
  3. "username": ${username},
  4. "from": ${fromClientId},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

Forwarding MQTT Message Ack Event to Kafka

  1. topic = message_acked
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

Examples of Kafka Message Consumption

Kafka consumes MQTT clients connected / disconnected event messages:

  1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_connected --from-beginning
  2. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_disconnected --from-beginning

Kafka consumes MQTT subscription messages:

  1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_subscribed --from-beginning
  2. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_unsubscribed --from-beginning

Kafka consumes MQTT published messages:

  1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_publish --from-beginning

Kafka consumes MQTT message Deliver and Ack event messages:

  1. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_delivered --from-beginning
  2. sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_acked --from-beginning

TIP

the payload is base64 encoded

Enable Kafka Bridge

  1. ./bin/emqx_ctl plugins load emqx_bridge_kafka