Doris

概览

Doris Load 节点支持将数据写入 Doris 数据库。 本文档介绍如何设置 Doris Load 节点以对 Doris 数据库运行 SQL 查询。

支持的版本

Load 节点Doris 版本
Doris0.13+

依赖

为了设置 Doris Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)所需要的依赖信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-doris</artifactId>
  4. <version>1.3.0-SNAPSHOT</version>
  5. </dependency>

准备

创建 MySql Extract 表

在 MySql 数据库中创建表 cdc_mysql_source, 命令如下:

  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use test;
  3. Database changed
  4. mysql> CREATE TABLE `cdc_mysql_source` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL,
  7. `dr` tinyint(3) DEFAULT 0,
  8. PRIMARY KEY (`id`)
  9. );
  10. Query OK, 0 rows affected (0.02 sec)
  11. mysql> insert into cdc_mysql_source values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
  12. Query OK, 3 rows affected (0.01 sec)
  13. Records: 3 Duplicates: 0 Warnings: 0
  14. mysql> select * from cdc_mysql_source;
  15. +----+----------+----+
  16. | id | name | dr |
  17. +----+----------+----+
  18. | 1 | zhangsan | 0 |
  19. | 2 | lisi | 0 |
  20. | 3 | wangwu | 0 |
  21. +----+----------+----+
  22. 3 rows in set (0.07 sec)

创建 Doris Load 表

在 Doris 数据库中创建表 cdc_doris_sink, 命令如下:

  1. [root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
  2. mysql> use test;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `cdc_doris_sink` (
  7. `id` int(11) NOT NULL COMMENT "用户id",
  8. `name` varchar(50) NOT NULL COMMENT "昵称",
  9. `dr` tinyint(4) NULL COMMENT "逻辑删除"
  10. ) ENGINE=OLAP
  11. UNIQUE KEY(`id`)
  12. COMMENT "OLAP"
  13. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  14. PROPERTIES (
  15. "replication_allocation" = "tag.location.default: 1"
  16. );
  17. Query OK, 0 rows affected (0.06 sec)

如何创建 Doris Load 节点

SQL API 用法

  1. [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'test',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_doris_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'doris',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'table.identifier' = 'test.cdc_doris_sink',
  29. > 'username' = 'root',
  30. > 'password' = '000000',
  31. > 'sink.properties.format' = 'json',
  32. > 'sink.properties.strip_outer_array' = 'true',
  33. > 'sink.enable-delete' = 'true'
  34. > );
  35. [INFO] Execute statement succeed.
  36. -- 支持删除事件同步(sink.enable-delete='true'), 需要 Doris 表开启批量删除功能
  37. Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  38. [INFO] Submitting SQL update statement to the cluster...
  39. [INFO] SQL update statement has been successfully submitted to the cluster:
  40. Job ID: 5f89691571d7b3f3ca446589e3d0c3d3

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Doris Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)string指定要使用的连接器 doris
fenodes必选(none)stringDoris FE http 地址
table.identifier必选(none)stringDoris 表名,如:db1.tbl1
username必选(none)string访问 Doris 的用户名
password必选(none)string访问 Doris 的密码
doris.request.retries可选3int向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms可选30000int向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms可选30000int向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s可选3600int查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.size可选Integer.MAX_VALUEint一个 Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size可选1024int一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit可选2147483648long单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.async可选falseboolean是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size可选64int异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.field可选(none)string读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.query可选(none)string过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。
sink.batch.size可选10000int单次写 BE 的最大行数
sink.max-retries可选1int写 BE 失败之后的重试次数
sink.batch.interval可选10sstringFlush 间隔时间,超过该时间后异步线程将缓存中数据写入 BE。 默认值为10秒,支持时间单位 ms、s、min、h和d。设置为0表示关闭定期写入。
sink.properties.*可选(none)stringStream load 的导入参数

例如:
‘sink.properties.column_separator’ = ‘, ‘
定义列分隔符

‘sink.properties.escape_delimiters’ = ‘true’
特殊字符作为分隔符,’\x01’会被转换为二进制的0x01

‘sink.properties.format’ = ‘json’
‘sink.properties.strip_outer_array’ = ‘true’
JSON格式导入
sink.enable-delete可选trueboolean是否启用删除。此选项需要 Doris 表开启批量删除功能(0.15+版本默认开启),只支持 Uniq 模型。

数据类型映射

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

请参阅 flink-doris-connector 页面以获取更多细节。