BROKER-LOAD

Name

BROKER LOAD

Description

该命令主要用于通过 Broker 服务进程来导入远端存储(如S3、HDFS)上的数据。

  1. LOAD LABEL load_label
  2. (
  3. data_desc1[, data_desc2, ...]
  4. )
  5. WITH BROKER broker_name
  6. [broker_properties]
  7. [load_properties];
  • load_label

    每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。

    [database.]label_name

  • data_desc1

    用于描述一组需要导入的文件。

    1. [MERGE|APPEND|DELETE]
    2. DATA INFILE
    3. (
    4. "file_path1"[, file_path2, ...]
    5. )
    6. [NEGATIVE]
    7. INTO TABLE `table_name`
    8. [PARTITION (p1, p2, ...)]
    9. [COLUMNS TERMINATED BY "column_separator"]
    10. [FORMAT AS "file_type"]
    11. [(column_list)]
    12. [COLUMNS FROM PATH AS (c1, c2, ...)]
    13. [PRECEDING FILTER predicate]
    14. [SET (column_mapping)]
    15. [WHERE predicate]
    16. [DELETE ON expr]
    17. [ORDER BY source_sequence]
    • [MERGE|APPEND|DELETE]

      数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON] 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。

    • DATA INFILE

      指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。

    • NEGTIVE

      该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。

    • PARTITION(p1, p2, ...)

      可以指定仅导入表的某些分区。不再分区范围内的数据将被忽略。

    • COLUMNS TERMINATED BY

      指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。

    • FORMAT AS

      指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。

    • column list

      用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

      (k1, k2, tmpk1)

    • COLUMNS FROM PATH AS

      指定从导入文件路径中抽取的列。

    • PRECEDING FILTER predicate

      前置过滤条件。数据首先根据 column listCOLUMNS FROM PATH AS 按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

    • SET (column_mapping)

      指定列的转换函数。

    • WHERE predicate

      根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

    • DELETE ON expr

      需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。

    • ORDER BY

      仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。

  • WITH BROKER broker_name

    指定需要使用的 Broker 服务名称。在公有云 Doris 中。Broker 服务名称为 bos

  • broker_properties

    指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 Broker 文档。

    1. (
    2. "key1" = "val1",
    3. "key2" = "val2",
    4. ...
    5. )
  • load_properties

    指定导入的相关参数。目前支持以下参数:

    • timeout

      导入超时时间。默认为 4 小时。单位秒。

    • max_filter_ratio

      最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。

    • exec_mem_limit

      导入内存限制。默认为 2GB。单位为字节。

    • strict_mode

      是否对数据进行严格限制。默认为 false。

    • timezone

      指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 时区 文档。如果不指定,则使用 “Asia/Shanghai” 时区

Example

  1. 从 HDFS 导入一批数据

    1. LOAD LABEL example_db.label1
    2. (
    3. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
    4. INTO TABLE `my_table`
    5. COLUMNS TERMINATED BY ","
    6. )
    7. WITH BROKER hdfs
    8. (
    9. "username"="hdfs_user",
    10. "password"="hdfs_password"
    11. );

    导入文件 file.txt,按逗号分隔,导入到表 my_table

  2. 从 HDFS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中。

    1. LOAD LABEL example_db.label2
    2. (
    3. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
    4. INTO TABLE `my_table1`
    5. PARTITION (p1)
    6. COLUMNS TERMINATED BY ","
    7. (k1, tmp_k2, tmp_k3)
    8. SET (
    9. k2 = tmp_k2 + 1,
    10. k3 = tmp_k3 + 1
    11. )
    12. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
    13. INTO TABLE `my_table2`
    14. COLUMNS TERMINATED BY ","
    15. (k1, k2, k3)
    16. )
    17. WITH BROKER hdfs
    18. (
    19. "username"="hdfs_user",
    20. "password"="hdfs_password"
    21. );

    使用通配符匹配导入两批文件 file-10*file-20*。分别导入到 my_table1my_table2 两张表中。其中 my_table1 指定导入到分区 p1 中,并且将导入源文件中第二列和第三列的值 +1 后导入。

  3. 从 HDFS 导入一批数据。

    1. LOAD LABEL example_db.label3
    2. (
    3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*")
    4. INTO TABLE `my_table`
    5. COLUMNS TERMINATED BY "\\x01"
    6. )
    7. WITH BROKER my_hdfs_broker
    8. (
    9. "username" = "",
    10. "password" = "",
    11. "dfs.nameservices" = "my_ha",
    12. "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    13. "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    14. "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    15. "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    16. );

    指定分隔符为 Hive 的默认分隔符 \\x01,并使用通配符 * 指定 data 目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。

  4. 导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断

    1. LOAD LABEL example_db.label4
    2. (
    3. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
    4. INTO TABLE `my_table`
    5. FORMAT AS "parquet"
    6. (k1, k2, k3)
    7. )
    8. WITH BROKER hdfs
    9. (
    10. "username"="hdfs_user",
    11. "password"="hdfs_password"
    12. );
  5. 导入数据,并提取文件路径中的分区字段

    1. LOAD LABEL example_db.label10
    2. (
    3. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
    4. INTO TABLE `my_table`
    5. FORMAT AS "csv"
    6. (k1, k2, k3)
    7. COLUMNS FROM PATH AS (city, utc_date)
    8. )
    9. WITH BROKER hdfs
    10. (
    11. "username"="hdfs_user",
    12. "password"="hdfs_password"
    13. );

    my_table 表中的列为 k1, k2, k3, city, utc_date

    其中 hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing 目录下包括如下文件:

    1. hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
    2. hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
    3. hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
    4. hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv

    文件中只包含 k1, k2, k3 三列数据,city, utc_date 这两列数据会从文件路径中提取。

  6. 对待导入数据进行过滤。

    1. LOAD LABEL example_db.label6
    2. (
    3. DATA INFILE("hdfs://host:port/input/file")
    4. INTO TABLE `my_table`
    5. (k1, k2, k3)
    6. PRECEDING FILTER k1 = 1
    7. SET (
    8. k2 = k2 + 1
    9. )
    10. WHERE k1 > k2
    11. )
    12. WITH BROKER hdfs
    13. (
    14. "username"="user",
    15. "password"="pass"
    16. );

    只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。

  7. 导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ‘:’,所有 ‘:’ 会由 %3A 替换)

    1. LOAD LABEL example_db.label7
    2. (
    3. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
    4. INTO TABLE `tbl12`
    5. COLUMNS TERMINATED BY ","
    6. (k2,k3)
    7. COLUMNS FROM PATH AS (data_time)
    8. SET (
    9. data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
    10. )
    11. )
    12. WITH BROKER hdfs
    13. (
    14. "username"="user",
    15. "password"="pass"
    16. );

    路径下有如下文件:

    1. /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
    2. /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt

    表结构为:

    1. data_time DATETIME,
    2. k2 INT,
    3. k3 INT
  8. 从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入

    1. LOAD LABEL example_db.label8
    2. (
    3. MERGE DATA INFILE("HDFS://test:802/input/file")
    4. INTO TABLE `my_table`
    5. (k1, k2, k3, v2, v1)
    6. DELETE ON v2 > 100
    7. )
    8. WITH HDFS
    9. (
    10. "hadoop.username"="user",
    11. "password"="pass"
    12. )
    13. PROPERTIES
    14. (
    15. "timeout" = "3600",
    16. "max_filter_ratio" = "0.1"
    17. );

    使用 MERGE 方式导入。my_table 必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。

    导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。

  9. 导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序:

    1. LOAD LABEL example_db.label9
    2. (
    3. DATA INFILE("HDFS://test:802/input/file")
    4. INTO TABLE `my_table`
    5. COLUMNS TERMINATED BY ","
    6. (k1,k2,source_sequence,v1,v2)
    7. ORDER BY source_sequence
    8. )
    9. WITH HDFS
    10. (
    11. "hadoop.username"="user",
    12. "password"="pass"
    13. )

    my_table 必须是 Unqiue Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中 source_sequence 列的值来保证顺序性。

Keywords

  1. BROKER, LOAD

Best Practice

  1. 查看导入任务状态

    Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。

  2. 取消导入任务

    已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。

  3. Label、导入事务、多表原子性

    Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。

  4. 列映射、衍生列和过滤

    Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。

  5. 错误数据过滤

    Doris 的导入任务可以容忍一部分格式错误的数据。容忍了通过 max_filter_ratio 设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,Doris 会自动跳过哪些数据格式不正确的行。

    关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。

  6. 严格模式

    strict_mode 属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。

  7. 超时时间

    Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。

  8. 数据量和任务数限制

    Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。

    同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。

    Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。