StarRocks

概览

StarRocks Load 节点支持将数据写入 StarRocks 数据库。 支持单表写入和多表写入两种模式:单表写入为指定固定库名表名写入;多表写入支持根据源端数据格式自定义库名表名写入,适用于源端多表写入或者整库同步等场景。 本文档介绍如何设置 StarRocks Load 节点实现写入 StarRocks 数据库表。

支持的版本

Load 节点StarRocks 版本
StarRocks2.0+

依赖

为了设置 StarRocks Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)所需要的依赖信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-starrocks</artifactId>
  4. <version>1.8.0</version>
  5. </dependency>

准备

创建 MySQL Extract 表

  • 单表写入:在 MySQL cdc 数据库中创建表 cdc_mysql_source。 命令如下:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use cdc;
  3. Database changed
  4. mysql> CREATE TABLE `cdc_mysql_source` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL,
  7. `dr` tinyint(3) DEFAULT 0,
  8. PRIMARY KEY (`id`)
  9. );
  10. Query OK, 0 rows affected (0.02 sec)
  11. mysql> insert into cdc_mysql_source values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
  12. Query OK, 3 rows affected (0.01 sec)
  13. Records: 3 Duplicates: 0 Warnings: 0
  14. mysql> select * from cdc_mysql_source;
  15. +----+----------+----+
  16. | id | name | dr |
  17. +----+----------+----+
  18. | 1 | zhangsan | 0 |
  19. | 2 | lisi | 0 |
  20. | 3 | wangwu | 0 |
  21. +----+----------+----+
  22. 3 rows in set (0.07 sec)
  • 多表写入:在 MySQL user_db 数据库中创建表 user_id_nameuser_id_score。 命令如下:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use user_db;
  3. Database changed
  4. mysql> CREATE TABLE `user_id_name` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL
  7. PRIMARY KEY (`id`)
  8. );
  9. Query OK, 0 rows affected (0.02 sec)
  10. mysql> CREATE TABLE `user_id_score` (
  11. `id` int(11) NOT NULL AUTO_INCREMENT,
  12. `score` double default 0,
  13. PRIMARY KEY (`id`)
  14. );
  15. Query OK, 0 rows affected (0.02 sec)
  16. mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
  17. Query OK, 3 rows affected (0.01 sec)
  18. Records: 3 Duplicates: 0 Warnings: 0
  19. mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
  20. Query OK, 3 rows affected (0.01 sec)
  21. Records: 3 Duplicates: 0 Warnings: 0
  22. mysql> select * from user_id_name;
  23. +------+--------+
  24. | id | name |
  25. +------+--------+
  26. | 1001 | lily |
  27. | 1002 | tom |
  28. | 1003 | alan |
  29. +----+----------+
  30. 3 rows in set (0.07 sec)
  31. mysql> select * from user_id_score;
  32. +------+------+
  33. | id | name |
  34. +------+------+
  35. | 1001 | 99 |
  36. | 1002 | 96 |
  37. | 1003 | 98 |
  38. +----+--------+
  39. 3 rows in set (0.07 sec)

创建 StarRocks Load 表

  • 单表写入:在 StarRocks cdc数据库中创建表cdc_starrocks_sink。命令如下:
  1. [root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
  2. mysql> use cdc;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `cdc_starrocks_sink` (
  7. `id` int(11) NOT NULL COMMENT "用户id",
  8. `name` varchar(50) NOT NULL COMMENT "昵称",
  9. `dr` tinyint(4) NULL COMMENT "逻辑删除"
  10. ) ENGINE=OLAP
  11. PRIMARY KEY(`id`)
  12. COMMENT "OLAP"
  13. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  14. PROPERTIES (
  15. "replication_allocation" = "tag.location.default: 1"
  16. );
  17. Query OK, 0 rows affected (0.06 sec)
  • 多表写入:在 StarRocks user_db数据库中创建表starrocks_user_id_namestarrocks_user_id_score。命令如下:
  1. [root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
  2. mysql> use user_db;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `starrocks_user_id_name` (
  7. `id` int(11) NOT NULL COMMENT "用户id",
  8. `name` varchar(50) NOT NULL COMMENT "昵称"
  9. ) ENGINE=OLAP
  10. PRIMARY KEY(`id`)
  11. COMMENT "OLAP"
  12. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  13. PROPERTIES (
  14. "replication_allocation" = "tag.location.default: 1"
  15. );
  16. Query OK, 0 rows affected (0.06 sec)
  17. mysql> CREATE TABLE `starrocks_user_id_score` (
  18. `id` int(11) NOT NULL COMMENT "用户id",
  19. `score` double default 0
  20. ) ENGINE=OLAP
  21. PRIMARY KEY(`id`)
  22. COMMENT "OLAP"
  23. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  24. PROPERTIES (
  25. "replication_allocation" = "tag.location.default: 1"
  26. );
  27. Query OK, 0 rows affected (0.06 sec)

如何创建 StarRocks Load 节点

SQL API 用法

  • 单表写入: StarRocks 单表写入
  1. [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'cdc',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_starrocks_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'starrocks-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'table.identifier' = 'cdc.cdc_starrocks_sink',
  29. > 'username' = 'username',
  30. > 'password' = 'password',
  31. > 'sink.properties.format' = 'json'
  32. > );
  33. [INFO] Execute statement succeed.
  34. Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  35. [INFO] Submitting SQL update statement to the cluster...
  36. [INFO] SQL update statement has been successfully submitted to the cluster:
  37. Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
  • 多表写入: StarRocks 多表写入
  1. ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'test',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_starrocks_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'starrocks-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'username' = 'username',
  29. > 'password' = 'password',
  30. > 'sink.multiple.enable' = 'true',
  31. > 'sink.multiple.format' = 'canal-json',
  32. > 'sink.multiple.database-pattern' = '${database}',
  33. > 'sink.multiple.table-pattern' = 'StarRocks_${table}'
  34. > );
  35. [INFO] Execute statement succeed.
  36. Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  37. [INFO] Submitting SQL update statement to the cluster...
  38. [INFO] SQL update statement has been successfully submitted to the cluster:
  39. Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

StarRocks Load 节点参数

参数是否必选默认值数据类型描述
connector必选string指定使用哪个 connector ,合法值为 starrocks-inlong
jdbc-url必选string用于在 StarRocks 中执行查询
load-url必选string格式为 fe_ip:http_port;fe_ip:http_port 用分号(;)隔开。用于向 StarRocks 批量写入数据。
database-name必选stringStarRocks 的数据库名
table-name必选stringStarRocks 的表名
username必选stringStarRocks 连接的用户名
password必选stringStarRocks 连接的口令
sink.semantic可选at-least-oncestring可选值为 at-least-once 或 exactly-once (仅在 checkpoint 时刷新数据,sink.buffer-flush. 等参数将不再工作)
sink.version可选AUTOstringexectly-once语义的实现版本,只有 connector 在1.2.4及以上的版本时才可用。如果填 V2,则使用 StarRocks 的 stream load 事务接口需要 2.4 及以上的 StarRocks 版本。如果填 V1,则使用 stream load 非事务接口。如果填 AUTO,则 connector 根据 StarRocks 是否支持事务的特性来自动选择 stream load 的事务接口。
sink.buffer-flush.max-bytes可选94371840(90M)string批量刷新缓存数据的大小阈值,范围:[64MB, 10GB]
sink.buffer-flush.max-rows可选500000string批量刷新缓存数据的行数阈值,范围:[64,000, 5000,000]
sink.buffer-flush.interval-ms可选300000string批量刷新缓存数据的时间间隔,范围:[1000ms, 3600000ms]
sink.max-retries可选3stringStream load 请求的最大重试次数,范围:[0, 10]
sink.connect.timeout-ms可选1000string连接到指定的 load-url 的超时时间,单位:毫秒,范围:[100, 60000]
sink.properties.format可选CSVstring导入到 StarRocks 的数据文件格式,可选的值为:CSV 和 JSON 。默认为: CSV
sink.properties.可选stringStream load 的属性,例如:’sink.properties.columns’ = ‘k1, k2, k3’。从 StarRocks 2.4 开始,flink-connector-starrocks 支持 Primary Key 模式下的数据部分更新。
sink.properties.ignore_json_size可选falsestring忽略 json 数据的批量大小限制(100MB)
sink.multiple.enable可选falseboolean决定是否开始多表(整库)写入特性,默认为 false 。当 sink.multiple.enabletrue 时,也需要设置 sink.multiple.formatsink.multiple.database-patternsink.multiple.table-pattern
sink.multiple.format可选string多表(整库)写入的数据格式,它表示 connector 之间流转的原始二进制数据的实际格式,目前支持 canal-jsondebezium-json 。可以查看 kafka — Dynamic Topic Extraction获取更多信息。
sink.multiple.database-pattern可选string从原始二进制数据中提取数据库名,仅在多表(整库)同步场景中使用。
sink.multiple.table-pattern可选string从原始二进制数据中提取表名,仅在多表(整库)同步场景中使用。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为 groupId={groupId}&streamId={streamId}&nodeId={nodeId}
sink.multiple.schema-update.policy可选(none)string往 StarRocks 表同步数据时,如果 StarRocks 表不存在或字段长度超过限制,StarRocks 服务器会抛出异常。

当该属性设置为 THROW_WITH_STOP,异常会向上抛给 Flink 框架。Flink 框架会自动重启任务,尝试恢复。

当该属性设置为 STOP_PARTIAL 时,StarRocks connector 会忽略该表的写入,新数据不再往该表写入,其它表则正常同步。

当该属性设置为 LOG_WITH_IGNORE 时,异常会打印到日志中,不会向上抛出。后续新数据到来时,继续尝试往该表写入。
dirty.ignore可选(none)boolean往 StarRocks 表同步数据时,如果遇到错误和异常,通过该变量可以控制是否忽略脏数据。如果设置为 false ,则忽略脏数据,不归档。如果为 true ,则根据其它的 dirty.side-output.* 的配置决定如何归档数据。
dirty.side-output.connector可选(none)string支持 s3log 两种配置。当配置为 log 时,仅打印日志,不归档数据。当配置为 s3 时,可以将数据归档到亚马逊 S3 或腾讯云 COS 存储。
dirty.side-output.s3.bucket可选(none)stringS3 或 COS 的桶名称
dirty.side-output.s3.endpoint可选(none)stringS3 或 COS 的 endpoint 地址
dirty.side-output.s3.key可选(none)stringS3 或 COS 的 key
dirty.side-output.s3.region可选(none)stringS3 或 COS 的区域
dirty.side-output.line-delimiter可选(none)string脏数据的行分隔符
dirty.side-output.field-delimiter可选(none)string脏数据的字段分隔符
dirty.side-output.s3.secret-key-id可选(none)stringS3 或 COS 的 secret key
dirty.side-output.s3.access-key-id可选(none)stringS3 或 COS 的 access key
dirty.side-output.format可选(none)string脏数据归档的格式,支持 jsoncsv
dirty.side-output.log-tag可选(none)string脏数据的 tag 。通过该变量区分每条脏数据归属于 StarRocks 的哪个库表。
dirty.identifier可选(none)string归档后的文件名
dirty.side-output.labels可选(none)string归档后的每条数据包括标签和业务数据两部分。标签在前面,业务数据在后面。

数据类型映射

Flink类型StarRocks类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARJSON / STRING
VARCHARJSON / STRING
STRINGJSON / STRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON / JSON STRING
ROW<arg T…>JSON / JSON STRING

查看 flink-connector-starrocks 获取更多信息。