EMQ X 到 Kafka 的桥接(企业版)

EMQ X 节点可以桥接到流式数据处理。Apache Kafka是一个快速、高可扩展、高吞吐的分布式日志系统,配合kafka Stream,在流式数据处理中非常常用。在这个章节我们以一个配置例来说明如何配置 EMQ X 到 Kafka 的桥接。

场景描述

假设我们有一台 EMQ X 服务器'emqx1',和一台 Kafka 服务器,我们需要在'emqx1'上创建一条桥接把一些主题消息转发至 Kafka 服务器。在实际应用中,考虑到容错、扩展性和性能等方面,Kafka经常作为集群出现,但是在本例中Kafka的配置不是我们关注的重点,为了演示方便,我们只使用一台kafka服务器。为了使用Kafka服务,我们还需要Zookeeper服务器来管理Kafka的动态集群。关于Kafka的更多信息,请参阅官方文档

EMQ X

节点节点名监听端口
emqx1emqx1@192.168.1.1001883

Kafka

地址监听端口
192.168.1.1019092

Zookeeper

地址监听端口
192.168.1.1012181

配置 Zookeeper

因为仅使用一台本地zookeeper服务器,我们可以使用Kafka安装自带的zookeeper和它的默认配置。Kafka的安装目录下的config/zookeeper.properties为配置文件。打开配置文件,确认一下配置项:

  1. dataDir=/tmp/zookeeper # 数据文件保存路径
  2. clientPort=2181 # zookeeper客户端监听端口
  3. maxClientCnxns=0 # 客户端和zookeeper的最大连接数

启动 zookeeper:

  1. $ bin/zookeeper-server-start.sh config/zookeeper.properties

配置Kafka

打开Kafka配置文件config/server.properties,确认以下各项:

  1. broker.id=0 # Kafka服务器id,集群范围内唯一
  2. listeners=PLAINTEXT://:9092 # Kafka监听地址
  3. log.dirs=/tmp/kafka-logs # 日志文件保存路径
  4. zookeeper.connect=localhost:2181 # zookeeper地址和端口
  5. zookeeper.connection.timeout.ms=6000 # zookeeper连接超时
  6. `

启动 Kafka:

  1. $ bin/kafka-server-start.sh config/server.properties

创建topic

创建一个名为"message"的topic。该topic有一个分区和一个副本:

  1. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic message

创建一个名为"session"的topic。该topic有一个分区和一个副本:

  1. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic session

检查刚才创建的topic:

  1. $ bin/kafka-topics.sh --list --zookeeper localhost:2181
  2. message
  3. session

配置 EMQ X 企业版的Kafka Bridge

在 EMQ X 企业版中,Kafka桥接以插件的方式实现。配置文件为etc/plugins/emqx_bridge_kafka。打开该文件并配置以下行:

  1. # Kafka服务器的地址和端口。如果有多个kafka服务器,中间以逗号隔开(",")。
  2. bridge.kafka.servers = 192.168.1.101:9092
  3. # 配置kafka桥接的hook
  4. # filter 是EMQ X 上消息的过滤器,满足filter定于的消息会触发相应的动作。
  5. # topic是定义在kafka上的主题,它决定了消息将被送往哪个主题。
  6. bridge.kafka.hook.session.subscribed.1 = {"filter":"#", "topic":"session"}
  7. bridge.kafka.hook.session.unsubscribed.1 = {"filter":"#", "topic":"session"}
  8. bridge.kafka.hook.message.publish.1 = {"filter":"#", "topic":"message"}
  9. bridge.kafka.hook.message.delivered.1 = {"filter":"#", "topic":"message"}
  10. bridge.kafka.hook.message.acked.1 = {"filter":"#", "topic":"message"}

加载emqx_bridge_kafka插件:

  1. $ bin/emqx_ctl plugins load emqx_bridge_kafka
  2. Start apps: [emqx_bridge_kafka]
  3. Plugin emqx_bridge_kafka loaded successfully.

测试

我们使用mosquitto_pubmosquitto_sub 订阅消息,并在kafka服务器上使用自带的命令行consumer来观察收到的消息:

测试kafka的message主题

对 EMQ X 发布一条消息:

  1. $ mosquitto_pub -h 192.168.1.100 -t sensor/1/temperature -m 37.5 -d -u user1 -P 123456
  2. Client mosqpub|25082-Zeus- sending CONNECT
  3. Client mosqpub|25082-Zeus- received CONNACK
  4. Client mosqpub|25082-Zeus- sending PUBLISH (d0, q0, r0, m1, 'sensor/1/temperature', ... (4 bytes))
  5. Client mosqpub|25082-Zeus- sending DISCONNECT

在kafka上使用命令行comsumer:

  1. $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 test --from-beginning --topic message
  2. {"client_id":"mosqpub|25093-Zeus-","username":"user1","topic":"sensor/1/temperature","payload":"MzcuNQ==","qos":0,"node":"emqx1@192.168.1.100","ts":1542113746}

测试kafka的session主题

在EMQ X 上订阅一条主题:

  1. $ mosquitto_sub -h 192.168.1.100 -t control/1 -d -u user1 -P 123456
  2. Client mosqsub|25095-Zeus- sending CONNECT
  3. Client mosqsub|25095-Zeus- received CONNACK
  4. Client mosqsub|25095-Zeus- sending SUBSCRIBE (Mid: 1, Topic: control/1, QoS: 0)

在kafka上使用命令行comsumer:bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 test —from-beginning —topic session

{"client_id":"mosqsub|25095-Zeus-","topic":"control/1","qos":0,"node":"emqx1@192.168.1.100","ts":1542113850}```