Kafka Example

Here we use an example to introduce how to use Apache InLong creating MySQL -> Kafka -> ClickHouse data ingestion.

Deployment

Install InLong

Before we begin, we need to install InLong. Here we provide two ways:

Install ClickHouse

  1. docker run -d --rm --net=host --name clickhouse -e CLICKHOUSE_USER=admin -e CLICKHOUSE_PASSWORD=inlong -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 clickhouse/clickhouse-server:22.8

Install Kafka

  1. # Prepare zookeeper for Kafka
  2. docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
  3. # Install Kafka
  4. docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

Cluster Initialize

When all containers are successfully started, you can access the InLong dashboard address http://localhost, and use the following default account to log in.

  1. User: admin
  2. Password: inlong

Create Cluster Tag

Click [Clusters] -> [ClusterTags] -> [Create] on the page to specify the cluster label name and person in charge. Create Cluster label

Kafka Example - 图2caution

Since each component reports the ClusterTags as default_cluster by default, do not use other names.

Register Kafka Cluster

Click [Clusters] -> [Create] on the page to register Kafka Cluster. Create kafka cluster

Register Clickhouse DataNodes

Click [DataNodes] -> [Create] on the page to register ClickHouse DataNodes. Create kafka cluster

Create Task

Create Data Streams Group

Click [Ingestion] -> [Create], input ID and select Kafka MQ Type. Create datastream

Create Data Stream

Click [Next] -> [Create], config data stream. Create datastream

Create Data Source

Click [Sources create] -> [MySQL], config data source information. Create datastream

Kafka Example - 图8note

Please create test_mysql_db.test_mysql_table database and table, it’s schema is: CREATE TABLE test_mysql_db.test_mysql_table ( id INT PRIMARY KEY, name VARCHAR(50) );

Create Data Sink

Click [Sinks create] -> [ClickHouse], input Name, DbName, TableName and select created ck DataNode and so on, then click [save]. Create data object

Approve Data Stream

Click [Approval] -> [MyApproval] -> [Detail] -> [Select Cluster tag] -> [Ok]. clickhouse

Back to [Ingestion] page, wait for [configuration success]. clickhouse

Test Data

Send Data

clickhouse Add 1000 data items to MySQL.

Verify Data

Check data in [Audit] page. clickhouse

then enter ClickHouse container, check data in table. clickhouse

Questions

Task Configuration Failed

Generally, the MQ or Flink group configuration is incorrect. You can view the error information on the page, or enter the Manager container to view detailed logs.

The script for sending data

  1. #!/bin/bash
  2. # MySQL information
  3. DB_HOST="mysql"
  4. DB_USER="root"
  5. DB_PASS="inlong"
  6. DB_NAME="test_mysql_db"
  7. DB_TABLE="test_mysql_table"
  8. # insert data by a loop
  9. for ((i=1; i<=1000; i++))
  10. do
  11. # generate data
  12. id=$i
  13. name="name_$i"
  14. # insert data
  15. query="INSERT INTO $DB_TABLE (id, name) VALUES ($id, '$name');"
  16. # execute insert
  17. mysql -h $DB_HOST -u $DB_USER -p$DB_PASS $DB_NAME -e "$query"
  18. done