StarRocks

Overview

  • StarRocks Load node supports writing data to the StarRocks database.
  • Two modes are supported for sink to StarRocks: Single-sink for specify fixed database name and table name to sink. Multi-sink for custom database name and table name according to src format, which suitable for scenarios such as multi-table writing or whole database synchronization.
  • This document describes how to set up a StarRocks Load node to sink to StarRocks.

Supported Version

Load NodeStarRocks version
StarRocks2.0+

Dependencies

In order to set up the StarRocks Load node, the dependency information needed to use a build automation tool such as Maven or SBT is provided below.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-starrocks</artifactId>
  4. <version>1.5.0-SNAPSHOT</version>
  5. </dependency>
  1. ## Prepare
  2. ### Create MySql Extract table
  3. - For Single-sink: Create a table `cdc.cdc_mysql_source` in the MySQL database. The command is as follows:
  4. ```sql
  5. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  6. mysql> use cdc;
  7. Database changed
  8. mysql> CREATE TABLE `cdc_mysql_source` (
  9. `id` int(11) NOT NULL AUTO_INCREMENT,
  10. `name` varchar(64) DEFAULT NULL,
  11. `dr` tinyint(3) DEFAULT 0,
  12. PRIMARY KEY (`id`)
  13. );
  14. Query OK, 0 rows affected (0.02 sec)
  15. mysql> insert into cdc_mysql_source values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
  16. Query OK, 3 rows affected (0.01 sec)
  17. Records: 3 Duplicates: 0 Warnings: 0
  18. mysql> select * from cdc_mysql_source;
  19. +----+----------+----+
  20. | id | name | dr |
  21. +----+----------+----+
  22. | 1 | zhangsan | 0 |
  23. | 2 | lisi | 0 |
  24. | 3 | wangwu | 0 |
  25. +----+----------+----+
  26. 3 rows in set (0.07 sec)
  • For Multi-sink: Create tables user_db.user_id_nameuser_db.user_id_name in the MySQL database. The command is as follows:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use user_db;
  3. Database changed
  4. mysql> CREATE TABLE `user_id_name` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL
  7. PRIMARY KEY (`id`)
  8. );
  9. Query OK, 0 rows affected (0.02 sec)
  10. mysql> CREATE TABLE `user_id_score` (
  11. `id` int(11) NOT NULL AUTO_INCREMENT,
  12. `score` double default 0,
  13. PRIMARY KEY (`id`)
  14. );
  15. Query OK, 0 rows affected (0.02 sec)
  16. mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
  17. Query OK, 3 rows affected (0.01 sec)
  18. Records: 3 Duplicates: 0 Warnings: 0
  19. mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
  20. Query OK, 3 rows affected (0.01 sec)
  21. Records: 3 Duplicates: 0 Warnings: 0
  22. mysql> select * from user_id_name;
  23. +------+--------+
  24. | id | name |
  25. +------+--------+
  26. | 1001 | lily |
  27. | 1002 | tom |
  28. | 1003 | alan |
  29. +----+----------+
  30. 3 rows in set (0.07 sec)
  31. mysql> select * from user_id_score;
  32. +------+------+
  33. | id | name |
  34. +------+------+
  35. | 1001 | 99 |
  36. | 1002 | 96 |
  37. | 1003 | 98 |
  38. +----+--------+
  39. 3 rows in set (0.07 sec)

Create StarRocks Load table

  • For Single-sink: Create a table cdc.cdc_starrocks_sink in the StarRocks database. The command is as follows:
  1. [root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
  2. mysql> use cdc;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `cdc_starrocks_sink` (
  7. `id` int(11) NOT NULL COMMENT "user id",
  8. `name` varchar(50) NOT NULL COMMENT "user name",
  9. `dr` tinyint(4) NULL COMMENT "delete tag"
  10. ) ENGINE=OLAP
  11. PRIMARY KEY(`id`)
  12. COMMENT "OLAP"
  13. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  14. PROPERTIES (
  15. "replication_allocation" = "tag.location.default: 1"
  16. );
  17. Query OK, 0 rows affected (0.06 sec)
  • For Multi-sink: Create tables user_db.starrocks_user_id_nameuser_db.starrocks_user_id_score in the StarRocks database. The command is as follows:
  1. [root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
  2. mysql> use user_db;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `starrocks_user_id_name` (
  7. `id` int(11) NOT NULL COMMENT "用户id",
  8. `name` varchar(50) NOT NULL COMMENT "昵称"
  9. ) ENGINE=OLAP
  10. PRIMARY KEY(`id`)
  11. COMMENT "OLAP"
  12. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  13. PROPERTIES (
  14. "replication_allocation" = "tag.location.default: 1"
  15. );
  16. Query OK, 0 rows affected (0.06 sec)
  17. mysql> CREATE TABLE `starrocks_user_id_score` (
  18. `id` int(11) NOT NULL COMMENT "用户id",
  19. `score` double default 0
  20. ) ENGINE=OLAP
  21. PRIMARY KEY(`id`)
  22. COMMENT "OLAP"
  23. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  24. PROPERTIES (
  25. "replication_allocation" = "tag.location.default: 1"
  26. );
  27. Query OK, 0 rows affected (0.06 sec)

How to create a StarRocks Load Node

Usage for SQL API

  • For Single-sink: StarRocks load
  1. [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'cdc',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_starrocks_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'starrocks-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'table.identifier' = 'cdc.cdc_starrocks_sink',
  29. > 'username' = 'username',
  30. > 'password' = 'password',
  31. > 'sink.properties.format' = 'json',
  32. > 'sink.properties.strip_outer_array' = 'true'
  33. > );
  34. [INFO] Execute statement succeed.
  35. Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  36. [INFO] Submitting SQL update statement to the cluster...
  37. [INFO] SQL update statement has been successfully submitted to the cluster:
  38. Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
  • For Single-sink: StarRocks load
  1. ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'test',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_starrocks_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'starrocks-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'username' = 'username',
  29. > 'password' = 'password',
  30. > 'sink.multiple.enable' = 'true',
  31. > 'sink.multiple.format' = 'canal-json',
  32. > 'sink.multiple.database-pattern' = '${database}',
  33. > 'sink.multiple.table-pattern' = 'starrocks_${table}'
  34. > );
  35. [INFO] Execute statement succeed.
  36. Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  37. [INFO] Submitting SQL update statement to the cluster...
  38. [INFO] SQL update statement has been successfully submitted to the cluster:
  39. Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

StarRocks Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)stringSpecify which connector to use, valid values are: starrocks-inlong
jdbc-urlrequired(none)stringthis will be used to execute queries in starrocks.
load-urlrequired(none)stringfe_ip:http_port;fe_ip:http_port separated with ‘;’, which would be used to do the batch sinking.
database-namerequired(none)stringstarrocks database name
table-namerequired(none)stringstarrocks table name
usernamerequired(none)stringstarrocks connecting username
passwordrequired(none)stringstarrocks connecting password
sink.semanticoptionalat-least-oncestringat-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush. won’t work either).
sink.versionoptionalAUTOstringThe version of implementaion for sink exactly-once. Only availible for connector 1.2.4+. If V2, use StarRocks’ stream load transaction interface which requires StarRocks 2.4+. If V1, use stream load non-transaction interface. If AUTO, connector will choose the stream load transaction interface automatically if the StarRocks supports the feature, otherwise choose non-transaction interface.
sink.buffer-flush.max-bytesoptional94371840(90M)stringthe max batching size of the serialized data, range: [64MB, 10GB].
sink.buffer-flush.max-rowsoptional500000stringthe max batching rows, range: [64,000, 5000,000].
sink.buffer-flush.interval-msoptional300000stringthe flushing time interval, range: [1000ms, 3600000ms].
sink.max-retriesoptional3stringmax retry times of the stream load request, range: [0, 10].
sink.connect.timeout-msoptional1000stringTimeout in millisecond for connecting to the load-url, range: [100, 60000].
sink.properties.formatoptionalCSVstringThe file format of data loaded into starrocks. Valid values: CSV and JSON. Default value: CSV.
sink.properties.optionalNONEstringthe stream load properties like ‘sink.properties.columns’ = ‘k1, k2, k3’,details in STREAM LOAD. Since 2.4, the flink-connector-starrocks supports partial updates for Primary Key model.
sink.properties.ignore_json_sizeoptionalfalsestringignore the batching size (100MB) of json data
sink.multiple.enableoptionalfalsebooleanDetermine whether to support multiple sink writing, default is false. when sink.multiple.enable is true, need sink.multiple.formatsink.multiple.database-patternsink.multiple.table-pattern be correctly set.
sink.multiple.formatoptional(none)stringThe format of multiple sink, it represents the real format of the raw binary data. can be canal-json or debezium-json at present. See kafka — Dynamic Topic Extraction for more details.
sink.multiple.database-patternoptional(none)stringExtract database name from the raw binary data, this is only used in the multiple sink writing scenario.
sink.multiple.table-patternoptional(none)stringExtract table name from the raw binary data, this is only used in the multiple sink writing scenario.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Data Type Mapping

Flink typeStarRocks type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARJSON / STRING
VARCHARJSON / STRING
STRINGJSON / STRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON / JSON STRING
ROW<arg T…>JSON / JSON STRING

See flink-connector-starrocks for more details.