从 Kafka 消费消息到 EMQ X

搭建 Kafka 环境,以 MaxOS X 为例:

  1. $ wget http://apache.claz.org/kafka/2.3.1/kafka_2.12-2.3.1.tgz
  2. $ tar -xzf kafka_2.12-2.3.1.tgz
  3. $ cd kafka_2.12-2.3.1
  4. # 启动 Zookeeper
  5. $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
  6. # 启动 Kafka
  7. $ ./bin/kafka-server-start.sh config/server.properties

Kafka消费组不支持Kafka0.9以下版本

创建资源之前,需要提前创建Kafka主题,不然会提示错误

创建 Kafka 的主题:

  1. $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic --create

创建资源:

打开 EMQ X Dashboard,选择左侧的 “资源” 选项卡。

点击 “新建” 按钮:

从 Kafka 消费消息到 EMQ X - 图1

选择资源类型 “Kafka 消费组”:

从 Kafka 消费消息到 EMQ X - 图2

填写资源参数:

从 Kafka 消费消息到 EMQ X - 图3

1). Kafka 服务器地址

2). Kafka consumer 连接池大小

3). Kafka 的订阅主题

4). Kafka 的消息主题

5). Kafka Max Bytes (每次从 Kafka 里消费消息的最大字节数)

6). Kafka Offset Reset Policy (重置Offset策略,reset_to_latest | reset_by_subdcriber)

7). Kafka consumer 是否重连

最后点击 “确认”,资源创建完成:

从 Kafka 消费消息到 EMQ X - 图4

资源已经创建完成,现在用Dashboard的websocket工具订阅MQTT的主题 “TestTopic”:

从 Kafka 消费消息到 EMQ X - 图5

使用kafka 命令行 生产一条消息:

  1. ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic

从 Kafka 消费消息到 EMQ X - 图6

Dashboard的websocket工具接收到了Kafka 生产的消息”hello-kafka”:

从 Kafka 消费消息到 EMQ X - 图7