BROKER LOAD

description

  1. Broker load will load data into Doris via Broker.
  2. Use `show broker;` to see the Broker deployed in cluster.
  3. Support following data sources:
  4. 1. Baidu HDFS: hdfs for Baidu. Only be used inside Baidu.
  5. 2. Baidu AFS: afs for Baidu. Only be used inside Baidu.
  6. 3. Baidu Object Storage(BOS): BOS on Baidu Cloud.
  7. 4. Apache HDFS.
  8. 5. Amazon S3Amazon S3

Syntax:

  1. LOAD LABEL load_label
  2. (
  3. data_desc1[, data_desc2, ...]
  4. )
  5. WITH [BROKER broker_name | S3]
  6. [load_properties]
  7. [opt_properties];
  8. 1. load_label
  9. Unique load label within a database.
  10. syntax:
  11. [database_name.]your_label
  12. 2. data_desc
  13. To describe the data source.
  14. syntax:
  15. [MERGE|APPEND|DELETE]
  16. DATA INFILE
  17. (
  18. "file_path1"[, file_path2, ...]
  19. )
  20. [NEGATIVE]
  21. INTO TABLE `table_name`
  22. [PARTITION (p1, p2)]
  23. [COLUMNS TERMINATED BY "column_separator"]
  24. [FORMAT AS "file_type"]
  25. [(column_list)]
  26. [PRECEDING FILTER predicate]
  27. [SET (k1 = func(k2))]
  28. [WHERE predicate]
  29. [DELETE ON label=true]
  30. [read_properties]
  31. Explain:
  32. file_path:
  33. File path. Support wildcard. Must match to file, not directory.
  34. PARTITION:
  35. Data will only be loaded to specified partitions. Data out of partition's range will be filtered. If not specifed, all partitions will be loaded.
  36. NEGATIVE:
  37. If this parameter is specified, it is equivalent to importing a batch of "negative" data to offset the same batch of data loaded before.
  38. This parameter applies only to the case where there are value columns and the aggregation type of value columns is only SUM.
  39. column_separator:
  40. Used to specify the column separator in the import file. Default is `\t`.
  41. If the character is invisible, it needs to be prefixed with `\\x`, using hexadecimal to represent the separator.
  42. For example, the separator `\x01` of the hive file is specified as `\\ x01`
  43. file_type:
  44. Used to specify the type of imported file, such as parquet, orc, csv. Default values are determined by the file suffix name.
  45. column_list:
  46. Used to specify the correspondence between columns in the import file and columns in the table.
  47. When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
  48. syntax:
  49. (col_name1, col_name2, ...)
  50. PRECEDING FILTER predicate:
  51. Used to filter original data. The original data is the data without column mapping and transformation. The user can filter the data before conversion, select the desired data, and then perform the conversion.
  52. SET:
  53. If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be loaded into the table. The grammar is `column_name = expression`. Some examples are given to help understand.
  54. Example 1: There are three columns "c1, c2, c3" in the table. The first two columns in the source file correspond in turn (c1, c2), and the last two columns correspond to c3. Then, column (c1, c2, tmp_c3, tmp_c4) SET (c3 = tmp_c3 + tmp_c4) should be specified.
  55. Example 2: There are three columns "year, month, day" in the table. There is only one time column in the source file, in the format of "2018-06-01:02:03". Then you can specify columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) to complete the import.
  56. WHERE:
  57. After filtering the transformed data, data that meets where predicates can be loaded. Only column names in tables can be referenced in WHERE statements.
  58. merge_type:
  59. The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete on condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics
  60. delete_on_predicates:
  61. Only used when merge type is MERGE
  62. read_properties:
  63. Used to specify some special parameters.
  64. Syntax:
  65. [PROPERTIES ("key"="value", ...)]
  66. You can specify the following parameters:
  67. line_delimiter: Used to specify the line delimiter in the load file. The default is `\n`. You can use a combination of multiple characters as the column separator.
  68. fuzzy_parse: Boolean type, true to indicate that parse json schema as the first line, this can make import more faster,but need all key keep the order of first line, default value is false. Only use for json format.
  69. jsonpaths: There are two ways to import json: simple mode and matched mode.
  70. simple mode: it is simple mode without setting the jsonpaths parameter. In this mode, the json data is required to be the object type. For example:
  71. {"k1": 1, "k2": 2, "k3": "hello"}, where k1, k2, k3 are column names.
  72. matched mode: the json data is relatively complex, and the corresponding value needs to be matched through the jsonpaths parameter.
  73. strip_outer_array: Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. For example:
  74. [
  75. {"k1" : 1, "v1" : 2},
  76. {"k1" : 3, "v1" : 4}
  77. ]
  78. if strip_outer_array is true, and two rows of data are generated when imported into Doris.
  79. json_root: json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "".
  80. num_as_string: Boolean type, true means that when parsing the json data, it will be converted into a number type and converted into a string, and then it will be imported without loss of precision.
  81. 3. broker_name
  82. The name of the Broker used can be viewed through the `show broker` command.
  83. 4. load_properties
  84. Used to provide Broker access to data sources. Different brokers, and different access methods, need to provide different information.
  85. 1. Baidu HDFS/AFS
  86. Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
  87. username: hdfs username
  88. password: hdfs password
  89. 2. BOS
  90. bos_endpoint.
  91. bos_accesskey: cloud user's accesskey
  92. bos_secret_accesskey: cloud user's secret_accesskey
  93. 3. Apache HDFS
  94. Community version of HDFS supports simple authentication, Kerberos authentication, and HA configuration.
  95. Simple authentication:
  96. hadoop.security.authentication = simple (default)
  97. username: hdfs username
  98. password: hdfs password
  99. kerberos authentication:
  100. hadoop.security.authentication = kerberos
  101. kerberos_principal: kerberos's principal
  102. kerberos_keytab: path of kerberos's keytab file. This file should be able to access by Broker
  103. kerberos_keytab_content: Specify the contents of the KeyTab file in Kerberos after base64 encoding. This option is optional from the kerberos_keytab configuration.
  104. namenode HA:
  105. By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
  106. dfs.nameservices: hdfs service name, customize, eg: "dfs.nameservices" = "my_ha"
  107. dfs.ha.namenodes.xxx: Customize the name of a namenode, separated by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
  108. dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode, where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
  109. dfs.client.failover.proxy.provider: Specify the provider that client connects to namenode by default: org. apache. hadoop. hdfs. server. namenode. ha. Configured Failover ProxyProvider.
  110. 4. Amazon S3
  111. fs.s3a.access.keyAmazonS3access key
  112. fs.s3a.secret.keyAmazonS3secret key
  113. fs.s3a.endpointAmazonS3endpoint
  114. 5. If using the S3 protocol to directly connect to the remote storage, you need to specify the following attributes
  115. (
  116. "AWS_ENDPOINT" = "",
  117. "AWS_ACCESS_KEY" = "",
  118. "AWS_SECRET_KEY"="",
  119. "AWS_REGION" = ""
  120. )
  121. 6. if using load with hdfs, you need to specify the following attributes
  122. (
  123. "fs.defaultFS" = "",
  124. "hdfs_user"="",
  125. "dfs.nameservices"="my_ha",
  126. "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
  127. "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
  128. "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
  129. "dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  130. )
  131. fs.defaultFS: defaultFS
  132. hdfs_user: hdfs user
  133. namenode HA
  134. By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
  135. dfs.nameservices: hdfs service name, customize, eg: "dfs.nameservices" = "my_ha"
  136. dfs.ha.namenodes.xxx: Customize the name of a namenode, separated by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
  137. dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode, where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
  138. dfs.client.failover.proxy.provider: Specify the provider that client connects to namenode by default: org. apache. hadoop. hdfs. server. namenode. ha. Configured Failover ProxyProvider.
  139. 4. opt_properties
  140. Used to specify some special parameters.
  141. Syntax:
  142. [PROPERTIES ("key"="value", ...)]
  143. You can specify the following parameters:
  144. timout: Specifies the timeout time for the import operation. The default timeout is 4 hours per second.
  145. max_filter_ratio: Data ratio of maximum tolerance filterable (data irregularity, etc.). Default zero tolerance.
  146. exc_mem_limit: Memory limit. Default is 2GB. Unit is Bytes.
  147. strict_mode: Whether the data is strictly restricted. The default is false.
  148. timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone.
  149. send_batch_parallelism: Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`.
  150. 5. Load data format sample
  151. IntegerTINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
  152. FloatFLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
  153. DateDATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.
  154. (Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
  155. String(CHAR/VARCHAR): "I am a student", "a"
  156. NULL: \N

example

  1. 1. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication.
  2. LOAD LABEL example_db.label1
  3. (
  4. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  5. INTO TABLE `my_table`
  6. )
  7. WITH BROKER my_hdfs_broker
  8. (
  9. "username" = "hdfs_user",
  10. "password" = "hdfs_passwd"
  11. )
  12. PROPERTIES
  13. (
  14. "timeout" = "3600",
  15. "max_filter_ratio" = "0.1"
  16. );
  17. Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
  18. 2. Load a batch of data from AFS contains multiple files. Import different tables, specify separators, and specify column correspondences.
  19. LOAD LABEL example_db.label2
  20. (
  21. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file1")
  22. INTO TABLE `my_table_1`
  23. COLUMNS TERMINATED BY ","
  24. (k1, k3, k2, v1, v2),
  25. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file2")
  26. INTO TABLE `my_table_2`
  27. COLUMNS TERMINATED BY "\t"
  28. (k1, k2, k3, v2, v1)
  29. )
  30. WITH BROKER my_afs_broker
  31. (
  32. "username" = "afs_user",
  33. "password" = "afs_passwd"
  34. )
  35. PROPERTIES
  36. (
  37. "timeout" = "3600",
  38. "max_filter_ratio" = "0.1"
  39. );
  40. 3. Load a batch of data from HDFS, specify hive's default delimiter \\x01, and use wildcard * to specify all files in the directory. Use simple authentication and configure namenode HA at the same time
  41. LOAD LABEL example_db.label3
  42. (
  43. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
  44. INTO TABLE `my_table`
  45. COLUMNS TERMINATED BY "\\x01"
  46. )
  47. WITH BROKER my_hdfs_broker
  48. (
  49. "username" = "hdfs_user",
  50. "password" = "hdfs_passwd",
  51. "dfs.nameservices" = "my_ha",
  52. "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
  53. "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
  54. "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
  55. "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  56. )
  57. 4. Load a batch of "negative" data from HDFS. Use Kerberos authentication to provide KeyTab file path.
  58. LOAD LABEL example_db.label4
  59. (
  60. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
  61. NEGATIVE
  62. INTO TABLE `my_table`
  63. COLUMNS TERMINATED BY "\t"
  64. )
  65. WITH BROKER my_hdfs_broker
  66. (
  67. "hadoop.security.authentication" = "kerberos",
  68. "kerberos_principal"="doris@YOUR.COM",
  69. "kerberos_keytab"="/home/palo/palo.keytab"
  70. )
  71. 5. Load a batch of data from HDFS, specify partition. At the same time, use Kerberos authentication mode. Provide the KeyTab file content encoded by base64.
  72. LOAD LABEL example_db.label5
  73. (
  74. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  75. INTO TABLE `my_table`
  76. PARTITION (p1, p2)
  77. COLUMNS TERMINATED BY ","
  78. (k1, k3, k2, v1, v2)
  79. )
  80. WITH BROKER my_hdfs_broker
  81. (
  82. "hadoop.security.authentication"="kerberos",
  83. "kerberos_principal"="doris@YOUR.COM",
  84. "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
  85. )
  86. 6. Load a batch of data from BOS, specify partitions, and make some transformations to the columns of the imported files, as follows:
  87. Table schema:
  88. k1 varchar(20)
  89. k2 int
  90. Assuming that the data file has only one row of data:
  91. Adele,1,1
  92. The columns in the data file correspond to the columns specified in the load statement:
  93. k1,tmp_k2,tmp_k3
  94. transform as:
  95. 1) k1: unchanged
  96. 2) k2: sum of tmp_k2 and tmp_k3
  97. LOAD LABEL example_db.label6
  98. (
  99. DATA INFILE("bos://my_bucket/input/file")
  100. INTO TABLE `my_table`
  101. PARTITION (p1, p2)
  102. COLUMNS TERMINATED BY ","
  103. (k1, tmp_k2, tmp_k3)
  104. SET (
  105. k2 = tmp_k2 + tmp_k3
  106. )
  107. )
  108. WITH BROKER my_bos_broker
  109. (
  110. "bos_endpoint" = "http://bj.bcebos.com",
  111. "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
  112. "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
  113. )
  114. 7. Load data into tables containing HLL columns, which can be columns in tables or columns in data
  115. If there are 4 columns in the table are (id, v1, v2, v3). The v1 and v2 columns are hll columns. The imported source file has 3 columns, where the first column in the table = the first column in the source file, and the second and third columns in the table are the second and third columns in the source file, and the third column in the table is transformed. The four columns do not exist in the source file.
  116. Then (column_list) declares that the first column is id, and the second and third columns are temporarily named k1, k2.
  117. In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.The v3 column in the table does not have a corresponding value in the original data, and empty_hll is used to supplement the default value.
  118. LOAD LABEL example_db.label7
  119. (
  120. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  121. INTO TABLE `my_table`
  122. PARTITION (p1, p2)
  123. COLUMNS TERMINATED BY ","
  124. (id, k1, k2)
  125. SET (
  126. v1 = hll_hash(k1),
  127. v2 = hll_hash(k2),
  128. v3 = empty_hll()
  129. )
  130. )
  131. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  132. LOAD LABEL example_db.label8
  133. (
  134. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  135. INTO TABLE `my_table`
  136. PARTITION (p1, p2)
  137. COLUMNS TERMINATED BY ","
  138. (k1, k2, tmp_k3, tmp_k4, v1, v2)
  139. SET (
  140. v1 = hll_hash(tmp_k3),
  141. v2 = hll_hash(tmp_k4)
  142. )
  143. )
  144. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  145. 8. Data in load Parquet file specifies FORMAT as parquet. By default, it is judged by file suffix.
  146. LOAD LABEL example_db.label9
  147. (
  148. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  149. INTO TABLE `my_table`
  150. FORMAT AS "parquet"
  151. (k1, k2, k3)
  152. )
  153. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  154. 9. Extract partition fields in file paths
  155. If necessary, partitioned fields in the file path are resolved based on the field type defined in the table, similar to the Partition Discovery function in Spark.
  156. LOAD LABEL example_db.label10
  157. (
  158. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
  159. INTO TABLE `my_table`
  160. FORMAT AS "csv"
  161. (k1, k2, k3)
  162. COLUMNS FROM PATH AS (city, utc_date)
  163. SET (uniq_id = md5sum(k1, city))
  164. )
  165. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  166. Directory `hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing` contains following files:
  167. [hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
  168. Extract city and utc_date fields in the file path
  169. 10. To filter the load data, columns whose K1 value is greater than K2 value can be imported.
  170. LOAD LABEL example_db.label10
  171. (
  172. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  173. INTO TABLE `my_table`
  174. where k1 > k2
  175. );
  176. 11. Extract date partition fields in file paths, and date time include %3A (in hdfs path, all ':' will be replaced by '%3A')
  177. Assume we have files:
  178. /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
  179. /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
  180. Table schema is:
  181. data_time DATETIME,
  182. k2 INT,
  183. k3 INT
  184. LOAD LABEL example_db.label12
  185. (
  186. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  187. INTO TABLE `tbl12`
  188. COLUMNS TERMINATED BY ","
  189. (k2,k3)
  190. COLUMNS FROM PATH AS (data_time)
  191. SET (data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s'))
  192. )
  193. WITH BROKER "hdfs" ("username"="user", "password"="pass");
  194. 12. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. delete the data when v2 >100, other append
  195. LOAD LABEL example_db.label1
  196. (
  197. MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  198. INTO TABLE `my_table`
  199. COLUMNS TERMINATED BY "\t"
  200. (k1, k2, k3, v2, v1)
  201. )
  202. DELETE ON v2 >100
  203. WITH BROKER my_hdfs_broker
  204. (
  205. "username" = "hdfs_user",
  206. "password" = "hdfs_passwd"
  207. )
  208. PROPERTIES
  209. (
  210. "timeout" = "3600",
  211. "max_filter_ratio" = "0.1"
  212. );
  213. 13. Filter the original data first, and perform column mapping, conversion and filtering operations
  214. LOAD LABEL example_db.label_filter
  215. (
  216. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  217. INTO TABLE `tbl1`
  218. COLUMNS TERMINATED BY ","
  219. (k1,k2,v1,v2)
  220. PRECEDING FILTER k1 > 2
  221. SET (k1 = k1 +1)
  222. WHERE k1 > 3
  223. )
  224. with BROKER "hdfs" ("username"="user", "password"="pass");
  225. 14. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data
  226. LOAD LABEL example_db.label9
  227. (
  228. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  229. INTO TABLE `my_table`
  230. FORMAT AS "json"
  231. (k1, k2, k3)
  232. properties("fuzzy_parse"="true", "strip_outer_array"="true")
  233. )
  234. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  235. 15. LOAD WITH HDFS, normal HDFS cluster
  236. LOAD LABEL example_db.label_filter
  237. (
  238. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  239. INTO TABLE `tbl1`
  240. COLUMNS TERMINATED BY ","
  241. (k1,k2,v1,v2)
  242. )
  243. with HDFS (
  244. "fs.defaultFS"="hdfs://testFs",
  245. "hdfs_user"="user"
  246. );
  247. 16. LOAD WITH HDFS, hdfs ha
  248. LOAD LABEL example_db.label_filter
  249. (
  250. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  251. INTO TABLE `tbl1`
  252. COLUMNS TERMINATED BY ","
  253. (k1,k2,v1,v2)
  254. )
  255. with HDFS (
  256. "fs.defaultFS"="hdfs://testFs",
  257. "hdfs_user"="user"
  258. "dfs.nameservices"="my_ha",
  259. "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
  260. "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
  261. "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
  262. "dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  263. );

keyword

  1. BROKER,LOAD