Example

To make it easier for you to create InLong Sort jobs, here we list some data stream configuration examples. The following will introduce SQL, Dashboard, Manager Client Tools methods to use Inlong Sort.

Environment Requirements

  • Apache Flink 1.13.5
  • MySQL
  • Apache Kafka
  • Apache Hadoop
  • Apache Hive 3.x

Prepare InLong Sort And Connectors

You can prepare InLong Sort and Data Node Connectors by referring to Deployment Guide.

Usage for SQL API

This example defines the data flow for a single table(mysql—>kafka—>hive).

MySQL to Kafka

Single table sync example:

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
  2. --sql.script.file mysql-to-kafka.sql
  • mysql-to-kafka.sql
  1. CREATE TABLE `table_1`(
  2. PRIMARY KEY (`id`) NOT ENFORCED,
  3. `id` BIGINT,
  4. `name` STRING,
  5. `age` INT,
  6. `salary` FLOAT,
  7. `ts` TIMESTAMP(2),
  8. `event_type` STRING)
  9. WITH (
  10. 'append-mode' = 'true',
  11. 'connector' = 'mysql-cdc-inlong',
  12. 'hostname' = 'localhost',
  13. 'username' = 'root',
  14. 'password' = 'password',
  15. 'database-name' = 'dbName',
  16. 'table-name' = 'tableName'
  17. );
  18. CREATE TABLE `table_2`(
  19. `id` BIGINT,
  20. `name` STRING,
  21. `age` INT,
  22. `salary` FLOAT,
  23. `ts` TIMESTAMP(2))
  24. WITH (
  25. 'topic' = 'topicName',-- Your kafka topic
  26. 'properties.bootstrap.servers' = 'localhost:9092',
  27. 'connector' = 'kafka',
  28. 'json.timestamp-format.standard' = 'SQL',
  29. 'json.encode.decimal-as-plain-number' = 'true',
  30. 'json.map-null-key.literal' = 'null',
  31. 'json.ignore-parse-errors' = 'true',
  32. 'json.map-null-key.mode' = 'DROP',
  33. 'format' = 'json',
  34. 'json.fail-on-missing-field' = 'false'
  35. );
  36. INSERT INTO `table_2`
  37. SELECT
  38. `id` AS `id`,
  39. `name` AS `name`,
  40. `age` AS `age`,
  41. CAST(NULL as FLOAT) AS `salary`,
  42. `ts` AS `ts`
  43. FROM `table_1`;

Kafka to Hive

Example - 图1caution

First you need to create user table in Hive.

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
  2. --sql.script.file kafka-to-hive.sql
  • kafka-to-hive.sql
  1. CREATE TABLE `table_1`(
  2. `id` BIGINT,
  3. `name` STRING,
  4. `age` INT,
  5. `salary` FLOAT,
  6. `ts` TIMESTAMP(2)
  7. WITH (
  8. 'topic' = 'topicName',-- Your kafka topic
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'connector' = 'kafka',
  11. 'scan.startup.mode' = 'earliest-offset',
  12. 'json.timestamp-format.standard' = 'SQL',
  13. 'json.encode.decimal-as-plain-number' = 'true',
  14. 'json.map-null-key.literal' = 'null',
  15. 'json.ignore-parse-errors' = 'true',
  16. 'json.map-null-key.mode' = 'DROP',
  17. 'format' = 'json',
  18. 'json.fail-on-missing-field' = 'false',
  19. 'properties.group.id' = 'groupId'-- Your group id
  20. );
  21. CREATE TABLE `user`(
  22. `id` BIGINT,
  23. `name` STRING,
  24. `age` INT,
  25. `salary` FLOAT,
  26. `ts` TIMESTAMP(9))
  27. WITH (
  28. 'connector' = 'hive',
  29. 'default-database' = 'default',
  30. 'hive-version' = '3.1.2',
  31. 'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
  32. );
  33. INSERT INTO `user`
  34. SELECT
  35. `id` AS `id`,
  36. `name` AS `name`,
  37. `age` AS `age`,
  38. CAST(NULL as FLOAT) AS `salary`,
  39. `ts` AS `ts`
  40. FROM `table_1`;

Other Connectors

there are lots of supported Extract Node and Load Node , you can use them directly.