Kafka 桥接

EMQ X 桥接转发 MQTT 消息到 Kafka 集群,Apache Kafka是一个快速、高可扩展、高吞吐的分布式日志系统,配合kafka Stream,在流式数据处理中非常常用。

image

Kafka 桥接插件配置文件: etc/plugins/emqx_bridge_kafka.conf。

配置 Kafka 集群地址

  1. ## Kafka 服务器地址
  2. ## bridge.kafka.servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
  3. bridge.kafka.servers = 127.0.0.1:9092
  4. ## Kafka 分区策略。可选值: per_partition | per_broker
  5. bridge.kafka.connection_strategy = per_partition
  6. bridge.kafka.min_metadata_refresh_interval = 5S
  7. ## Produce 写类型。可选值: sync | async
  8. bridge.kafka.produce = sync
  9. bridge.kafka.produce.sync_timeout = 3S
  10. ## 指定 replayq 在磁盘上存储消息的基本目录。
  11. ## 如果该配置项缺失活着设置为 undefined, replayq 将以使用内存的
  12. ## 的方式工作。也就是说,消息不在磁盘上排队 -- 在这种情况下,send
  13. ## 和 send_async API 的调用者负责处理在应用程序、网络或 kafka
  14. ## 干扰时可能丢失的消息。
  15. ## bridge.kafka.replayq_dir = /tmp/emqx_bridge_kafka/
  16. ## default=10MB, replayq 分段大小。
  17. ## bridge.kafka.producer.replayq_seg_bytes = 10MB
  18. ## producer required_acks. 可选值: all_isr | leader_only | none.
  19. bridge.kafka.producer.required_acks = none
  20. ## default=10000. leader 在回复 producer 前等待副本的超时时间。
  21. bridge.kafka.producer.ack_timeout = 10S
  22. ## 收集到一次 produce 请求中的最大字节数
  23. bridge.kafka.producer.max_batch_bytes = 1024KB
  24. ## 收集到一次 produce 请求中的最少字节数
  25. bridge.kafka.producer.min_batch_bytes = 0
  26. ## 在没有接收到上次请求的 ack 的情况下,可以提前发送的 batch 数。
  27. ## 如果消息必须严格按照顺序传递,则必须为0。
  28. bridge.kafka.producer.max_send_ahead = 0
  29. ## 默认为无压缩
  30. ## bridge.kafka.producer.compression = no_compression
  31. ## 默认值为 base64, 可选值: base64 | plain
  32. ## bridge.kafka.encode_payload_type = base64
  33. ## bridge.kafka.sock.buffer = 32KB
  34. ## bridge.kafka.sock.recbuf = 32KB
  35. bridge.kafka.sock.sndbuf = 1MB
  36. ## bridge.kafka.sock.read_packets = 20

配置 Kafka 桥接规则

  1. ## Bridge Kafka Hooks
  2. ## ${topic}: the kafka topics to which the messages will be published.
  3. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed.
  4. ## Client Connected Record Hook
  5. bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
  6. ## Client Disconnected Record Hook
  7. bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
  8. ## Session Subscribed Record Hook
  9. bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
  10. ## Session Unsubscribed Record Hook
  11. bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
  12. ## Message Publish Record Hook
  13. bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
  14. ## Message Delivered Record Hook
  15. bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
  16. ## Message Acked Record Hook
  17. bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
  18. ## More Configures
  19. ## partitioner strategy:
  20. ## Option: random | roundrobin | first_key_dispatch
  21. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
  22. ## key:
  23. ## Option: ${clientid} | ${username}
  24. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
  25. ## format:
  26. ## Option: json | json
  27. ## Example: bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}

Kafka 桥接规则说明

事件说明
bridge.kafka.hook.client.connected.1客户端登录
bridge.kafka.hook.client.disconnected.1客户端退出
bridge.kafka.hook.session.subscribed.1订阅主题
bridge.kafka.hook.session.unsubscribed.1取消订阅主题
bridge.kafka.hook.message.publish.1发布消息
bridge.kafka.hook.message.delivered.1delivered 消息
bridge.kafka.hook.message.acked.1ACK 消息

客户端上下线事件转发 Kafka

设备上线 EMQ X 转发上线事件消息到 Kafka:

  1. topic = "client_connected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "node": ${node},
  6. "ts": ${ts}
  7. }

设备下线 EMQ X 转发下线事件消息到 Kafka:

  1. topic = "client_disconnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "reason": ${reason},
  6. "node": ${node},
  7. "ts": ${ts}
  8. }

客户端订阅主题事件转发 Kafka

  1. topic = session_subscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

客户端取消订阅主题事件转发 Kafka

  1. topic = session_unsubscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

MQTT 消息转发到 Kafka

  1. topic = message_publish
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

MQTT 消息派发 (Deliver) 事件转发 Kafka

  1. topic = message_delivered
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

MQTT 消息确认 (Ack) 事件转发 Kafka

  1. topic = message_acked
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

Kafka 消费示例

Kafka 读取 MQTT 客户端上下线事件消息:

  1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_connected --from-beginning
  2. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic client_disconnected --from-beginning

Kafka 读取 MQTT 主题订阅事件消息:

  1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_subscribed --from-beginning
  2. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic session_unsubscribed --from-beginning

Kafka 读取 MQTT 发布消息:

  1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic message_publish --from-beginning

Kafka 读取 MQTT 消息发布 (Deliver)、确认 (Ack)事件:

  1. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic message_delivered --from-beginning
  2. kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic message_acked --from-beginning

默认 payload 被 base64 编码,可通过修改配置 bridge.kafka.encode_payload_type 指定 payload 数据格式。

启用 Kafka 桥接插件

  1. ./bin/emqx_ctl plugins load emqx_bridge_kafka