Stream Data into Apache Kafka

Apache KafkaStream Data into Apache Kafka - 图1 (opens new window) is a popular open-source distributed event streaming platform. EMQX’s integration with Apache Kafka/Confluent offers users dependable bi-directional data transport and processing capabilities in high-throughput situations. Furthermore, EMQX currently supports authentication with Apache Kafka/Confluent using SASL/SCRAM or SASL/GSSAPI.

Streaming data into or from Apache Kafka involves creating data bridges to Kafka in two roles: producer (sends messages to Kafka) and consumer (receives messages from Kafka). EMQX enables you to create data bridges in either of the roles.

TIP

EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platformStream Data into Apache Kafka - 图2 (opens new window) today.

Prerequisites

Feature List

Quick Start Tutorial

This section introduces how to stream data into or from Kafka, covering topics like how to set up a Kafka server, how to create a bridge and a rule for forwarding data to the bridge and how to test the data bridge and rule.

This tutorial assumes that you run both EMQX and Kafka on the local machine. If you have Kafka and EMQX running remotely, please adjust the settings accordingly.

Install Kafka

This section takes macOS as an example to illustrate the process. You can install and run Kafka with the commands below:

  1. wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
  2. tar -xzf kafka_2.13-3.3.1.tgz
  3. cd kafka_2.13-3.3.1
  4. # Use KRaft to run Kafka (optional)
  5. KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  6. bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
  7. bin/kafka-server-start.sh config/kraft/server.properties

For detailed operation steps, you may refer to the Quick Start section in Kafka DocumentationStream Data into Apache Kafka - 图3 (opens new window).

Create Kafka Topics

Relevant Kafka topics should be created before creating the data bridge in EMQX. Use the commands below to create two topics in Kafka: testtopic-in (for the producer role) and testtopic-out (for the consumer role).

  1. bin/kafka-topics.sh --create --topic testtopic-in --bootstrap-server localhost:9092
  2. bin/kafka-topics.sh --create --topic testtopic-out --bootstrap-server localhost:9092

Create Kafka Data Bridge

This section demonstrates how to create Kafka producer or consumer data bridges via Dashboard.

  1. Go to EMQX Dashboard, and click Integration -> Data Bridge.

  2. Click Create on the top right corner of the page.

  3. In the Create Data Bridge page, click to select Kafka, and then click Next.

  4. In Bridge Role field, select Producer or Consumer. Click the corresponding tabs for the configuration of each role.

    • Fill in the required fields (marked with an asterisk).

    • Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

    • Input the connection information. Input 127.0.0.1:9092 for the Bootstrap Hosts. For the other fields set as the actual condition.

    • Source MQTT Topic: Set the MQTT topics to create the data bridge. In this example, it is set to t/#, indicating all MQTT messages matching this topic will be sent to Kafka. You can also leave it blank, and create a rule to specify data to be sent to Kafka.

    • Kafka Topic Name: Input testtopic-in (the Kafka topic created before). Note: Variables are not supported here.

    • Message Key: Kafka message key. Insert a string here, either a plain string or a string containing placeholders (${var}).

    • Message Value: Kafka message value. Insert a string here, either a plain string or a string containing placeholders (${var}).

    • Advanced settings (optional): Set the Max Batch Bytes, Compression, and Partition Strategy as your business needs.

    • Fill the required fields (marked with an asterisk).

    • Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

    • Input the connection information. Input 127.0.0.1:9092 for the Bootstrap Hosts. For the other fields set as the actual condition.

    • The Topic Mapping field must contain at least one Kafka-to-MQTT topic mapping. The MQTT Payload Template subfield specifies the MQTT payload that should be used, and has the following Kafka message fields available for templating:

      Field NameDescription
      headersAn object containing string key-value pairs
      keyKafka message key (uses the same encoding method as the selected key)
      offsetOffset for the message in Kafka’s topic partition
      topicOriginating Kafka topic
      tsMessage timestamp
      ts_typeMessage timestamp type, which is one of create, append or undefined
      valueKafka message value (uses the same encoding method as the selected key)

      The default value for MQTT Payload Template is ${.}, which includes all available data encoded as a JSON object. For example, choosing ${.} as a template will produce the following for a Kafka message:

      1. {
      2. "value": "value",
      3. "ts_type": "create",
      4. "ts": 1679665968238,
      5. "topic": "my-kafka-topic",
      6. "offset": 2,
      7. "key": "key",
      8. "headers": {"header_key": "header_value"}
      9. }

      Subfields from the Kafka message may be accessed with dot notation. For example, ${.value} will resolve to the Kafka message value, and ${.headers.h1} will resolve to the value of the h1 Kafka header if such a subfield exists. Absent values will be replaced by empty strings.

      Note: Each Kafka-to-MQTT topic mapping must contain a unique Kafka topic name. That is, the Kafka topic must not be present in more than one mapping.

  5. Before clicking Create, you can click Test Connection to test that the bridge can connect to the Kafka server.

  6. Click Create, you’ll be offered the option of creating an associated rule.

    • For the Kafka producer data bridge, click Create Rule to create an associated rule. For detailed operating steps, see Create Rule for Kafka Producer Data Bridge.
    • For the Kafka consumer data bridge, it’s not strictly necessary to create a rule.

TIP

Creating a rule allows Kafka messages matching the rule to be further transformed and filtered if needed, and then forwarded to other rule actions, like different bridges. Refer to the Rules for more information on creating rules. The MQTT topics defined in Topic Mapping will start having messages published to them without further configuration.

Now the Kafka data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.

Create Rule for Kafka Producer Data Bridge

  1. Go to EMQX Dashboard, and click Integration -> Rules.

  2. Click Create on the top right corner of the page.

  3. Input, for example, my_rule as the rule ID.

  4. Input the following statement in the SQL Editor if you want to save the MQTT messages under topic t/# to Kafka.

Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT part.

  1. SELECT
  2. *
  3. FROM
  4. "t/#"
  1. Click the Add Action button, select Forwarding with Data Bridge from the dropdown list and then select the data bridge you just created under Data bridge. Then click the Add button.

  2. Click Create at the page bottom to finish the creation.

Now you have successfully created the data bridge to Kafka producer data bridge. You can click Integration -> Flows to view the topology. It can be seen that the messages under topic t/# are sent and saved to Kafka after parsing by rule my_rule.

Test the Data Bridge and Rule

Use MQTTX to send messages to topic t/1:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Kafka" }'

Check the running status of the two data bridges, there should be one new incoming and one new outgoing message.

Check whether messages are written into the topic testtopic-in with the following Kafka command:

  1. bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testtopic-in --from-beginning