Spark Doris Connector

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

版本兼容

ConnectorSparkDorisJavaScala
1.0.02.x0.12+82.11
1.0.03.x0.12.+82.12

编译与安装

extension/spark-doris-connector/ 源码目录下执行:

注意:

  1. 这里如果你没有整体编译过 doris 源码,需要首先编译一次 Doris 源码,不然会出现 thrift 命令找不到的情况,需要到 incubator-doris 目录下执行 sh build.sh
  2. 建议在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。
  1. sh build.sh 3 ## spark 3.x版本,默认是3.1.2
  2. sh build.sh 2 ## spark 2.x版本,默认是2.3.4

编译成功后,会在 output/ 目录下生成文件 doris-spark-1.0.0-SNAPSHOT.jar。将此文件复制到 SparkClassPath 中即可使用 Spark-Doris-Connector。例如,Local 模式运行的 Spark,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。

使用示例

读取

SQL

  1. CREATE TEMPORARY VIEW spark_doris
  2. USING doris
  3. OPTIONS(
  4. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  5. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  6. "user"="$YOUR_DORIS_USERNAME",
  7. "password"="$YOUR_DORIS_PASSWORD"
  8. );
  9. SELECT * FROM spark_doris;

DataFrame

  1. val dorisSparkDF = spark.read.format("doris")
  2. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  3. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  4. .option("user", "$YOUR_DORIS_USERNAME")
  5. .option("password", "$YOUR_DORIS_PASSWORD")
  6. .load()
  7. dorisSparkDF.show(5)

RDD

  1. import org.apache.doris.spark._
  2. val dorisSparkRDD = sc.dorisRDD(
  3. tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
  4. cfg = Some(Map(
  5. "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  6. "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
  7. "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
  8. ))
  9. )
  10. dorisSparkRDD.collect()

写入

SQL

  1. CREATE TEMPORARY VIEW spark_doris
  2. USING doris
  3. OPTIONS(
  4. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  5. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  6. "user"="$YOUR_DORIS_USERNAME",
  7. "password"="$YOUR_DORIS_PASSWORD"
  8. );
  9. INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
  10. # or
  11. INSERT INTO spark_doris SELECT * FROM YOUR_TABLE

DataFrame(batch/stream)

  1. ## batch sink
  2. val mockDataDF = List(
  3. (3, "440403001005", "21.cn"),
  4. (1, "4404030013005", "22.cn"),
  5. (33, null, "23.cn")
  6. ).toDF("id", "mi_code", "mi_name")
  7. mockDataDF.show(5)
  8. mockDataDF.write.format("doris")
  9. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  10. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  11. .option("user", "$YOUR_DORIS_USERNAME")
  12. .option("password", "$YOUR_DORIS_PASSWORD")
  13. //其它选项
  14. //指定你要写入的字段
  15. .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
  16. .save()
  17. ## stream sink(StructuredStreaming)
  18. val kafkaSource = spark.readStream
  19. .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  20. .option("startingOffsets", "latest")
  21. .option("subscribe", "$YOUR_KAFKA_TOPICS")
  22. .format("kafka")
  23. .load()
  24. kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
  25. .writeStream
  26. .format("doris")
  27. .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  28. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  29. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  30. .option("user", "$YOUR_DORIS_USERNAME")
  31. .option("password", "$YOUR_DORIS_PASSWORD")
  32. //其它选项
  33. //指定你要写入的字段
  34. .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
  35. .start()
  36. .awaitTermination()

配置

通用配置项

KeyDefault ValueComment
doris.fenodesDoris FE http 地址,支持多个地址,使用逗号分隔
doris.table.identifierDoris 表名,如:db1.tbl1
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个RDD Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.write.fields指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照Doris表字段顺序写入全部字段。

SQL 和 Dataframe 专有配置

KeyDefault ValueComment
user访问Doris的用户名
password访问Doris的密码
doris.filter.query.in.max.count100谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

RDD 专有配置

KeyDefault ValueComment
doris.request.auth.user访问Doris的用户名
doris.request.auth.password访问Doris的密码
doris.read.field读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。

Doris 和 Spark 列类型映射关系

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType1
DATETIMEDataTypes.StringType1
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
  • 注:Connector中,将DATEDATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。