BROKER LOAD

description

  1. Broker load will load data into Doris via Broker.
  2. Use `show broker;` to see the Broker deployed in cluster.
  3. Support following data sources:
  4. 1. Baidu HDFS: hdfs for Baidu. Only be used inside Baidu.
  5. 2. Baidu AFS: afs for Baidu. Only be used inside Baidu.
  6. 3. Baidu Object Storage(BOS): BOS on Baidu Cloud.
  7. 4. Apache HDFS.

Syntax:

  1. LOAD LABEL load_label
  2. (
  3. data_desc1[, data_desc2, ...]
  4. )
  5. WITH BROKER broker_name
  6. [broker_properties]
  7. [opt_properties];
  8. 1. load_label
  9. Unique load label within a database.
  10. syntax:
  11. [database_name.]your_label
  12. 2. data_desc
  13. To describe the data source.
  14. syntax:
  15. [MERGE|APPEND|DELETE]
  16. DATA INFILE
  17. (
  18. "file_path1"[, file_path2, ...]
  19. )
  20. [NEGATIVE]
  21. INTO TABLE `table_name`
  22. [PARTITION (p1, p2)]
  23. [COLUMNS TERMINATED BY "column_separator"]
  24. [FORMAT AS "file_type"]
  25. [(column_list)]
  26. [SET (k1 = func(k2))]
  27. [WHERE predicate]
  28. [DELETE ON label=true]
  29. Explain:
  30. file_path:
  31. File path. Support wildcard. Must match to file, not directory.
  32. PARTITION:
  33. Data will only be loaded to specified partitions. Data out of partition's range will be filtered. If not specifed, all partitions will be loaded.
  34. NEGATIVE:
  35. If this parameter is specified, it is equivalent to importing a batch of "negative" data to offset the same batch of data loaded before.
  36. This parameter applies only to the case where there are value columns and the aggregation type of value columns is only SUM.
  37. column_separator:
  38. Used to specify the column separator in the import file. Default is `\t`.
  39. If the character is invisible, it needs to be prefixed with `\\x`, using hexadecimal to represent the separator.
  40. For example, the separator `\x01` of the hive file is specified as `\\ x01`
  41. file_type:
  42. Used to specify the type of imported file, such as parquet, orc, csv. Default values are determined by the file suffix name.
  43. column_list:
  44. Used to specify the correspondence between columns in the import file and columns in the table.
  45. When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
  46. syntax:
  47. (col_name1, col_name2, ...)
  48. SET:
  49. If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be loaded into the table. The grammar is `column_name = expression`. Some examples are given to help understand.
  50. Example 1: There are three columns "c1, c2, c3" in the table. The first two columns in the source file correspond in turn (c1, c2), and the last two columns correspond to c3. Then, column (c1, c2, tmp_c3, tmp_c4) SET (c3 = tmp_c3 + tmp_c4) should be specified.
  51. Example 2: There are three columns "year, month, day" in the table. There is only one time column in the source file, in the format of "2018-06-01:02:03". Then you can specify columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) to complete the import.
  52. WHERE:
  53. After filtering the transformed data, data that meets where predicates can be loaded. Only column names in tables can be referenced in WHERE statements.
  54. merge_type:
  55. The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete on condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics
  56. delete_on_predicates:
  57. Only used when merge type is MERGE
  58. 3. broker_name
  59. The name of the Broker used can be viewed through the `show broker` command.
  60. 4. broker_properties
  61. Used to provide Broker access to data sources. Different brokers, and different access methods, need to provide different information.
  62. 1. Baidu HDFS/AFS
  63. Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
  64. username: hdfs username
  65. password: hdfs password
  66. 2. BOS
  67. bos_endpoint.
  68. bos_accesskey: cloud user's accesskey
  69. bos_secret_accesskey: cloud user's secret_accesskey
  70. 3. Apache HDFS
  71. Community version of HDFS supports simple authentication, Kerberos authentication, and HA configuration.
  72. Simple authentication:
  73. hadoop.security.authentication = simple (default)
  74. username: hdfs username
  75. password: hdfs password
  76. kerberos authentication:
  77. hadoop.security.authentication = kerberos
  78. kerberos_principal: kerberos's principal
  79. kerberos_keytab: path of kerberos's keytab file. This file should be able to access by Broker
  80. kerberos_keytab_content: Specify the contents of the KeyTab file in Kerberos after base64 encoding. This option is optional from the kerberos_keytab configuration.
  81. namenode HA:
  82. By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
  83. dfs.nameservices: hdfs service namecustomizeeg: "dfs.nameservices" = "my_ha"
  84. dfs.ha.namenodes.xxx: Customize the name of a namenode, separated by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
  85. dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode, where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
  86. dfs.client.failover.proxy.provider: Specify the provider that client connects to namenode by default: org. apache. hadoop. hdfs. server. namenode. ha. Configured Failover ProxyProvider.
  87. 4. opt_properties
  88. Used to specify some special parameters.
  89. Syntax:
  90. [PROPERTIES ("key"="value", ...)]
  91. You can specify the following parameters:
  92. timout: Specifies the timeout time for the import operation. The default timeout is 4 hours per second.
  93. max_filter_ratio: Data ratio of maximum tolerance filterable (data irregularity, etc.). Default zero tolerance.
  94. exc_mem_limit: Memory limit. Default is 2GB. Unit is Bytes.
  95. strict_mode: Whether the data is strictly restricted. The default is false.
  96. timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone.
  97. 5. Load data format sample
  98. IntegerTINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
  99. FloatFLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
  100. DateDATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.
  101. (Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
  102. String(CHAR/VARCHAR): "I am a student", "a"
  103. NULL: \N

example

  1. 1. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication.
  2. LOAD LABEL example_db.label1
  3. (
  4. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  5. INTO TABLE `my_table`
  6. )
  7. WITH BROKER my_hdfs_broker
  8. (
  9. "username" = "hdfs_user",
  10. "password" = "hdfs_passwd"
  11. )
  12. PROPERTIES
  13. (
  14. "timeout" = "3600",
  15. "max_filter_ratio" = "0.1"
  16. );
  17. Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
  18. 2. Load a batch of data from AFS contains multiple files. Import different tables, specify separators, and specify column correspondences.
  19. LOAD LABEL example_db.label2
  20. (
  21. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file1")
  22. INTO TABLE `my_table_1`
  23. COLUMNS TERMINATED BY ","
  24. (k1, k3, k2, v1, v2),
  25. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file2")
  26. INTO TABLE `my_table_2`
  27. COLUMNS TERMINATED BY "\t"
  28. (k1, k2, k3, v2, v1)
  29. )
  30. WITH BROKER my_afs_broker
  31. (
  32. "username" = "afs_user",
  33. "password" = "afs_passwd"
  34. )
  35. PROPERTIES
  36. (
  37. "timeout" = "3600",
  38. "max_filter_ratio" = "0.1"
  39. );
  40. 3. Load a batch of data from HDFS, specify hive's default delimiter \\x01, and use wildcard * to specify all files in the directory. Use simple authentication and configure namenode HA at the same time
  41. LOAD LABEL example_db.label3
  42. (
  43. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
  44. INTO TABLE `my_table`
  45. COLUMNS TERMINATED BY "\\x01"
  46. )
  47. WITH BROKER my_hdfs_broker
  48. (
  49. "username" = "hdfs_user",
  50. "password" = "hdfs_passwd",
  51. "dfs.nameservices" = "my_ha",
  52. "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
  53. "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
  54. "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
  55. "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  56. )
  57. 4. Load a batch of "negative" data from HDFS. Use Kerberos authentication to provide KeyTab file path.
  58. LOAD LABEL example_db.label4
  59. (
  60. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
  61. NEGATIVE
  62. INTO TABLE `my_table`
  63. COLUMNS TERMINATED BY "\t"
  64. )
  65. WITH BROKER my_hdfs_broker
  66. (
  67. "hadoop.security.authentication" = "kerberos",
  68. "kerberos_principal"="doris@YOUR.COM",
  69. "kerberos_keytab"="/home/palo/palo.keytab"
  70. )
  71. 5. Load a batch of data from HDFS, specify partition. At the same time, use Kerberos authentication mode. Provide the KeyTab file content encoded by base64.
  72. LOAD LABEL example_db.label5
  73. (
  74. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  75. INTO TABLE `my_table`
  76. PARTITION (p1, p2)
  77. COLUMNS TERMINATED BY ","
  78. (k1, k3, k2, v1, v2)
  79. )
  80. WITH BROKER my_hdfs_broker
  81. (
  82. "hadoop.security.authentication"="kerberos",
  83. "kerberos_principal"="doris@YOUR.COM",
  84. "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
  85. )
  86. 6. Load a batch of data from BOS, specify partitions, and make some transformations to the columns of the imported files, as follows:
  87. Table schema:
  88. k1 varchar(20)
  89. k2 int
  90. Assuming that the data file has only one row of data:
  91. Adele,1,1
  92. The columns in the data file correspond to the columns specified in the load statement:
  93. k1,tmp_k2,tmp_k3
  94. transform as:
  95. 1) k1: unchanged
  96. 2) k2: sum of tmp_k2 and tmp_k3
  97. LOAD LABEL example_db.label6
  98. (
  99. DATA INFILE("bos://my_bucket/input/file")
  100. INTO TABLE `my_table`
  101. PARTITION (p1, p2)
  102. COLUMNS TERMINATED BY ","
  103. (k1, tmp_k2, tmp_k3)
  104. SET (
  105. k2 = tmp_k2 + tmp_k3
  106. )
  107. )
  108. WITH BROKER my_bos_broker
  109. (
  110. "bos_endpoint" = "http://bj.bcebos.com",
  111. "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
  112. "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
  113. )
  114. 7. Load data into tables containing HLL columns, which can be columns in tables or columns in data
  115. If there are three columns in the table (id, v1, v2, v3). The V1 and V2 columns are HLL columns. The imported source file has three columns. Then (column_list) declares that the first column is id, and the second and third columns are temporarily named k1, k2.
  116. In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.The v3 column in the table does not have a corresponding value in the original data, and empty_hll is used to supplement the default value.
  117. LOAD LABEL example_db.label7
  118. (
  119. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  120. INTO TABLE `my_table`
  121. PARTITION (p1, p2)
  122. COLUMNS TERMINATED BY ","
  123. (id, k1, k2)
  124. SET (
  125. v1 = hll_hash(k1),
  126. v2 = hll_hash(k2),
  127. v3 = empty_hll()
  128. )
  129. )
  130. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  131. LOAD LABEL example_db.label8
  132. (
  133. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  134. INTO TABLE `my_table`
  135. PARTITION (p1, p2)
  136. COLUMNS TERMINATED BY ","
  137. (k1, k2, tmp_k3, tmp_k4, v1, v2)
  138. SET (
  139. v1 = hll_hash(tmp_k3),
  140. v2 = hll_hash(tmp_k4)
  141. )
  142. )
  143. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  144. 8. Data in load Parquet file specifies FORMAT as parquet. By default, it is judged by file suffix.
  145. LOAD LABEL example_db.label9
  146. (
  147. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  148. INTO TABLE `my_table`
  149. FORMAT AS "parquet"
  150. (k1, k2, k3)
  151. )
  152. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  153. 9. Extract partition fields in file paths
  154. If necessary, partitioned fields in the file path are resolved based on the field type defined in the table, similar to the Partition Discovery function in Spark.
  155. LOAD LABEL example_db.label10
  156. (
  157. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
  158. INTO TABLE `my_table`
  159. FORMAT AS "csv"
  160. (k1, k2, k3)
  161. COLUMNS FROM PATH AS (city, utc_date)
  162. SET (uniq_id = md5sum(k1, city))
  163. )
  164. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  165. Directory `hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing` contains following files:
  166. [hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
  167. Extract city and utc_date fields in the file path
  168. 10. To filter the load data, columns whose K1 value is greater than K2 value can be imported.
  169. LOAD LABEL example_db.label10
  170. (
  171. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  172. INTO TABLE `my_table`
  173. where k1 > k2
  174. );
  175. 11. Extract date partition fields in file paths, and date time include %3A (in hdfs path, all ':' will be replaced by '%3A')
  176. Assume we have files:
  177. /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
  178. /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
  179. Table schema is:
  180. data_time DATETIME,
  181. k2 INT,
  182. k3 INT
  183. LOAD LABEL example_db.label12
  184. (
  185. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  186. INTO TABLE `tbl12`
  187. COLUMNS TERMINATED BY ","
  188. (k2,k3)
  189. COLUMNS FROM PATH AS (data_time)
  190. SET (data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s'))
  191. )
  192. WITH BROKER "hdfs" ("username"="user", "password"="pass");
  193. 13. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. delete the data when v2 >100, other append
  194. LOAD LABEL example_db.label1
  195. (
  196. MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  197. INTO TABLE `my_table`
  198. COLUMNS TERMINATED BY "\t"
  199. (k1, k2, k3, v2, v1)
  200. )
  201. DELETE ON v2 >100
  202. WITH BROKER my_hdfs_broker
  203. (
  204. "username" = "hdfs_user",
  205. "password" = "hdfs_passwd"
  206. )
  207. PROPERTIES
  208. (
  209. "timeout" = "3600",
  210. "max_filter_ratio" = "0.1"
  211. );

keyword

  1. BROKER,LOAD