关于SeaTunnel

SeaTunnel是一个非常简单易用的超高性能分布式数据集成平台,支持海量数据的实时同步。每天稳定高效地同步数百亿数据

Connector-V2

2.3.1版本的 Apache SeaTunnel Connector-V2 支持了Doris Sink,并且支持exactly-once的精准一次写入和CDC数据同步

插件代码

SeaTunnel Doris Sink 插件代码

参数列表

nametyperequireddefault value
fenodesstringyes-
usernamestringyes-
passwordstringyes-
table.identifierstringyes-
sink.label-prefixstringyes-
sink.enable-2pcboolnotrue
sink.enable-deleteboolnofalse
doris.configmapyes-

fenodes [string]

Doris 集群 FE 节点地址,格式为 "fe_ip:fe_http_port,..."

username [string]

Doris 用户名

password [string]

Doris 用户密码

table.identifier [string]

Doris 表名称,格式为 DBName.TableName

sink.label-prefix [string]

Stream Load 导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义

sink.enable-2pc [bool]

是否启用两阶段提交(2pc),默认为true,以确保exact - once语义。关于两阶段提交,请参考这里

sink.enable-delete [bool]

是否启用删除。该选项需要Doris表开启批量删除功能(默认开启0.15+版本),且只支持Unique表模型。你可以在这个链接获得更多细节:

批量删除

doris.config [map]

Stream Load data_desc 的参数,你可以在这个链接获得更多细节:

更多Stream Load 参数

使用示例

使用JSON格式导入数据

  1. sink {
  2. Doris {
  3. fenodes = "doris_fe:8030"
  4. username = root
  5. password = ""
  6. table.identifier = "test.table_sink"
  7. sink.enable-2pc = "true"
  8. sink.label-prefix = "test_json"
  9. doris.config = {
  10. format="json"
  11. read_json_by_line="true"
  12. }
  13. }
  14. }

使用CSV格式导入数据

  1. sink {
  2. Doris {
  3. fenodes = "doris_fe:8030"
  4. username = root
  5. password = ""
  6. table.identifier = "test.table_sink"
  7. sink.enable-2pc = "true"
  8. sink.label-prefix = "test_csv"
  9. doris.config = {
  10. format = "csv"
  11. column_separator = ","
  12. line_delimiter = "\n"
  13. }
  14. }
  15. }

Connector-V1

2.1.0的 Apache SeaTunnel 支持 Doris 的连接器, SeaTunnel 可以通过 Spark 引擎和 Flink 引擎同步数据至 Doris 中.

插件代码

Seatunnel Flink Sink Doris 插件代码

参数列表

配置项类型必填默认值支持引擎
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe Http访问地址, eg: 127.0.01:8030

database [string]

写入 Doris 的库名

table [string]

写入 Doris 的表名

user [string]

Doris 访问用户

password [string]

Doris 访问用户密码

batch_size [int]

单次写Doris的最大行数,默认值100

interval [int]

flush 间隔时间(毫秒),超过该时间后异步线程将 缓存中数据写入Doris。设置为0表示关闭定期写入。

max_retries [int]

写Doris失败之后的重试次数

doris.* [string]

Stream load 的导入参数。例如:’doris.column_separator’ = ‘, ‘等

更多 Stream Load 参数配置

Examples

Socket 数据写入 Doris

  1. env {
  2. execution.parallelism = 1
  3. }
  4. source {
  5. SocketStream {
  6. host = 127.0.0.1
  7. port = 9999
  8. result_table_name = "socket"
  9. field_name = "info"
  10. }
  11. }
  12. transform {
  13. }
  14. sink {
  15. DorisSink {
  16. fenodes = "127.0.0.1:8030"
  17. user = root
  18. password = 123456
  19. database = test
  20. table = test_tbl
  21. batch_size = 5
  22. max_retries = 1
  23. interval = 5000
  24. }
  25. }

启动命令

  1. sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Spark Sink Doris

插件代码

Spark Sink Doris 的插件代码在这里

参数列表

参数名参数类型是否必要默认值引擎类型
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris Fe节点地址:8030

database [string]

写入 Doris 的库名

table [string]

写入 Doris 的表名

user [string]

Doris 访问用户

password [string]

Doris 访问用户密码

batch_size [string]

Spark 通过 Stream Load 方式写入,每个批次提交条数

doris. [string]

Stream Load 方式写入的 Http 参数优化,在官网参数前加上’Doris.’前缀

更多 Stream Load 参数配置

Examples

Hive 迁移数据至 Doris

  1. env{
  2. spark.app.name = "hive2doris-template"
  3. }
  4. spark {
  5. spark.sql.catalogImplementation = "hive"
  6. }
  7. source {
  8. hive {
  9. preSql = "select * from tmp.test"
  10. result_table_name = "test"
  11. }
  12. }
  13. transform {
  14. }
  15. sink {
  16. Console {
  17. }
  18. Doris {
  19. fenodes="xxxx:8030"
  20. database="tmp"
  21. table="test"
  22. user="root"
  23. password="root"
  24. batch_size=1000
  25. doris.column_separator="\t"
  26. doris.columns="date_key,date_value,day_in_year,day_in_month"
  27. }
  28. }

启动命令

  1. sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf