从 Pulsar 消费消息到 EMQ X

搭建 Pulsar 环境,以 MaxOS X 为例:

  1. $ wget http://apache.mirrors.hoobly.com/pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz
  2. $ tar xvfz apache-pulsar-2.3.2-bin.tar.gz
  3. $ cd apache-pulsar-2.3.2
  4. # 启动 Pulsar
  5. $ ./bin/pulsar standalone

创建 Pulsar 的主题:

  1. $ ./bin/pulsar-admin topics create-partitioned-topic -p 5 testTopic

创建资源:

打开 EMQ X Dashboard,选择左侧的 “资源” 选项卡。

点击 “新建” 按钮:

从 Pulsar 消费消息到 EMQ X - 图1

选择资源类型 “pulsar 消费组”:

从 Pulsar 消费消息到 EMQ X - 图2

填写资源参数:

从 Pulsar 消费消息到 EMQ X - 图3

1). Pulsar 服务器地址

2). Pulsar consumer 进程数量

3). Pulsar 的订阅主题

4). EMQ X 的消息主题

5). Pulsar 流控阈值 (Pulsar 流控阈值,配置 Pulsar 向消费者发送多少条消息后阻塞 Pulsar Consumer)

6). EMQ X 重置流控阈值百分比 (Pulsar 流控阈值重置百分比。此配置让消费者处理完成一定数量的消息之后,提前重置 Pulsar 流控阈值。 比如,Pulsar 流控阈值 为 1000,阈值重置百分比 为 80%,则重置)

最后点击 “确认”,资源创建完成:

从 Pulsar 消费消息到 EMQ X - 图4

资源已经创建完成,现在用Dashboard的websocket工具订阅MQTT的主题 “TestTopic”:

从 Pulsar 消费消息到 EMQ X - 图5

使用pulsar-cli 生产一条消息:

  1. ./bin/pulsar-client produce TestTopic --messages "hello-pulsar"

从 Pulsar 消费消息到 EMQ X - 图6

Dashboard的websocket工具接收到了pulsar 生产的消息”hello-pulsar”:

从 Pulsar 消费消息到 EMQ X - 图7