Doris Connector

Doris是一款基于大规模并行处理技术的分布式 SQL 数据库,主要面向OLAP场景。 StreamX 基于Doris的stream load封装了DoirsSink用于向Doris实时写入数据。

StreamX 方式写入

StreamX写入 doris的数据, 目前DorisSink只支持JSON格式(单层)写入,如:{“id”:1,”name”:”streamx”} 运行程序样例为java,如下:

配置信息

  1. doris.sink:
  2. fenodes: 127.0.0.1:8030 //doris fe http 请求地址
  3. database: test //doris database
  4. table: test_tbl //doris table
  5. user: root
  6. password: 123456
  7. batchSize: 100 //doris sink 每次streamload的批次大小
  8. intervalMs: 3000 //doris sink 每次streamload的时间间隔
  9. maxRetries: 1 //stream load的重试次数
  10. streamLoad: //doris streamload 自身的参数
  11. format: json
  12. strip_outer_array: true
  13. max_filter_ratio: 1

写入doris

  • Java
  1. package com.streamxhub.streamx.test.flink.java.datastream;
  2. import com.streamxhub.streamx.flink.core.StreamEnvConfig;
  3. import com.streamxhub.streamx.flink.core.java.sink.doris.DorisSink;
  4. import com.streamxhub.streamx.flink.core.java.source.KafkaSource;
  5. import com.streamxhub.streamx.flink.core.scala.StreamingContext;
  6. import com.streamxhub.streamx.flink.core.scala.source.KafkaRecord;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. public class DorisJavaApp {
  10. public static void main(String[] args) {
  11. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  12. StreamingContext context = new StreamingContext(envConfig);
  13. DataStream<String> source = new KafkaSource<String>(context)
  14. .getDataStream()
  15. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
  16. .returns(String.class);
  17. new DorisSink<String>(context).sink(source);
  18. context.start();
  19. }
  20. }
提示

建议设置 batchSize 来批量插入数据提高性能,同时为了提高实时性,支持间隔时间 intervalMs 来批次写入
可以通过设置 maxRetries 来增加streamload的重试次数。