Kafka Consumer Group

The Kafka consumer group uses external Kafka as a message queue, which can convert consumer messages from Kafka into MQTT messages and publish them in emqx.

Set up the Kafka environment, taking MacOS X as an example:

  1. wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
  2. tar -xzf kafka_2.13-2.8.0.tgz
  3. cd kafka_2.13-2.8.0
  4. # Start Zookeeper
  5. $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
  6. # Start Kafka
  7. $ ./bin/kafka-server-start.sh config/server.properties

TIP

Kafka consumer groups do not support Kafka versions below 0.9

Before creating resources, you need to create Kafka topics in advance, otherwise an error will be prompted

Create Kafka topic:

  1. ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic --create

Create module

Open EMQX DashboardKafka Consumer Group - 图1 (opens new window), click the “Modules” tab on the left, and choose to add:

Select Kafka consumer group module:

img

Click “Select”, and then select “Kafka Consumer Group”:

img

Fill in the relevant parameters:

Kafka Consumer Group - 图4

1). Kafka server address

2). Kafka consumer connection pool size

3). Kafka subscription topic

4). MQTT message subject

5). MQTT topic service quality

6). Kafka Max Bytes (the maximum number of bytes of messages consumed each time from Kafka)

7). Kafka Offset Reset Policy (reset Offset policy, reset_to_latest | reset_by_subdcriber)

  • Kafka server address
  • Kafka consumer connection pool size
  • Kafka subscription topic
  • MQTT message subject
  • MQTT theme service quality
  • MQTT Payload. Use Kafka message.value or entire message
  • Binary Key encode mode, force UTF-8 or base64 encode. The encoding method of the key in the message, if the key value is a non-string or a value that may generate a character set encoding exception, base64 mode is recommended
  • Binary Value encode mode, force UTF-8 or base64 encode. The encoding method of the value in the message, if the value is a non-string or a value that may generate a character set encoding exception, base64 mode is recommended
  • Kafka Max Bytes (the maximum number of bytes of messages consumed each time from Kafka)
  • Kafka Offset Reset Policy (reset Offset policy, reset_to_latest | reset_by_subdcriber)
  • Is Kafka consumer reconnected?

After clicking Add, the module is added:

img

The resource has been created, now use Dashboard’s websocket tool to subscribe to the MQTT topic “TestTopic”:

img

Use the kafka command line to produce a message:

  1. ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic

img

The websocket tool of Dashboard received the message “hello-kafka” produced by Kafka:

img