Seatunnel

最新版本的 Apache SeaTunnel (原 waterdrop )Seatunnel Connector Flink Doris - 图1 (opens new window) 已经支持 Doris 的连接器, SeaTunnel 可以用过 Spark 引擎和 Flink 引擎同步数据至 Doris 中.

Seatunnel Flink Sink Doris 插件代码Seatunnel Connector Flink Doris - 图2 (opens new window)

参数列表

配置项类型必填默认值支持引擎
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe Http访问地址, eg: 127.0.01:8030

database [string]

写入 Doris 的库名

table [string]

写入 Doris 的表名

user [string]

Doris 访问用户

password [string]

Doris 访问用户密码

batch_size [int]

单次写Doris的最大行数,默认值100

interval [int]

flush 间隔时间(毫秒),超过该时间后异步线程将 缓存中数据写入Doris。设置为0表示关闭定期写入。

max_retries [int]

写Doris失败之后的重试次数

doris.* [string]

Stream load 的导入参数。例如:’doris.column_separator’ = ‘, ‘等

更多 Stream Load 参数配置Seatunnel Connector Flink Doris - 图3 (opens new window)

Examples

Socket 数据写入 Doris

  1. env {
  2. execution.parallelism = 1
  3. }
  4. source {
  5. SocketStream {
  6. host = 127.0.0.1
  7. port = 9999
  8. result_table_name = "socket"
  9. field_name = "info"
  10. }
  11. }
  12. transform {
  13. }
  14. sink {
  15. DorisSink {
  16. fenodes = "127.0.0.1:8030"
  17. user = root
  18. password = 123456
  19. database = test
  20. table = test_tbl
  21. batch_size = 5
  22. max_retries = 1
  23. interval = 5000
  24. }
  25. }

启动命令

  1. sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf