导入本地数据

Stream Load 用于将本地文件导入到 Doris 中。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

  • BE 的 HTTP 协议端口,默认为 8040。
  • FE 的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 BE 所在机器。

本文文档我们以 curl 命令为例演示如何进行数据导入。

文档最后,我们给出一个使用 Java 导入数据的代码示例

导入数据

Stream Load 的请求体如下:

  1. PUT /api/{db}/{table}/_stream_load
  1. 创建一张表

    通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

    1. CREATE TABLE IF NOT EXISTS load_local_file_test
    2. (
    3. id INT,
    4. age TINYINT,
    5. name VARCHAR(50)
    6. )
    7. unique key(id)
    8. DISTRIBUTED BY HASH(id) BUCKETS 3;
  2. 导入数据

    执行以下 curl 命令导入本地文件:

    1. curl -u user:passwd -H "label:load_local_file_test" -T /path/to/local/demo.txt http://host:port/api/demo/load_local_file_test/_stream_load
    • user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空。
    • host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。
    • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

    关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。

  3. 等待导入结果

    Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

    1. {
    2. "TxnId": 1003,
    3. "Label": "load_local_file_test",
    4. "Status": "Success",
    5. "Message": "OK",
    6. "NumberTotalRows": 1000000,
    7. "NumberLoadedRows": 1000000,
    8. "NumberFilteredRows": 1,
    9. "NumberUnselectedRows": 0,
    10. "LoadBytes": 40888898,
    11. "LoadTimeMs": 2144,
    12. "BeginTxnTimeMs": 1,
    13. "StreamLoadPutTimeMs": 2,
    14. "ReadDataTimeMs": 325,
    15. "WriteDataTimeMs": 1933,
    16. "CommitAndPublishTimeMs": 106,
    17. "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    18. }
    • Status 字段状态为 Success 即表示导入成功。
    • 其他字段的详细介绍,请参阅 Stream Load 命令文档。

导入建议

  • Stream Load 只能导入本地文件。
  • 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。

Java 代码示例

这里通过一个简单的 JAVA 示例来执行 Stream Load:

  1. package demo.doris;
  2. import org.apache.commons.codec.binary.Base64;
  3. import org.apache.http.HttpHeaders;
  4. import org.apache.http.client.methods.CloseableHttpResponse;
  5. import org.apache.http.client.methods.HttpPut;
  6. import org.apache.http.entity.FileEntity;
  7. import org.apache.http.impl.client.CloseableHttpClient;
  8. import org.apache.http.impl.client.DefaultRedirectStrategy;
  9. import org.apache.http.impl.client.HttpClientBuilder;
  10. import org.apache.http.impl.client.HttpClients;
  11. import org.apache.http.util.EntityUtils;
  12. import java.io.File;
  13. import java.io.IOException;
  14. import java.nio.charset.StandardCharsets;
  15. /*
  16. 这是一个 Doris Stream Load 示例,需要依赖
  17. <dependency>
  18. <groupId>org.apache.httpcomponents</groupId>
  19. <artifactId>httpclient</artifactId>
  20. <version>4.5.13</version>
  21. </dependency>
  22. */
  23. public class DorisStreamLoader {
  24. //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
  25. private final static String HOST = "your_host";
  26. private final static int PORT = 8040;
  27. private final static String DATABASE = "db1"; // 要导入的数据库
  28. private final static String TABLE = "tbl1"; // 要导入的表
  29. private final static String USER = "root"; // Doris 用户名
  30. private final static String PASSWD = ""; // Doris 密码
  31. private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
  32. private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
  33. HOST, PORT, DATABASE, TABLE);
  34. private final static HttpClientBuilder httpClientBuilder = HttpClients
  35. .custom()
  36. .setRedirectStrategy(new DefaultRedirectStrategy() {
  37. @Override
  38. protected boolean isRedirectable(String method) {
  39. // 如果连接目标是 FE,则需要处理 307 redirect。
  40. return true;
  41. }
  42. });
  43. public void load(File file) throws Exception {
  44. try (CloseableHttpClient client = httpClientBuilder.build()) {
  45. HttpPut put = new HttpPut(loadUrl);
  46. put.setHeader(HttpHeaders.EXPECT, "100-continue");
  47. put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
  48. // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
  49. put.setHeader("label","label1");
  50. put.setHeader("column_separator",",");
  51. // 设置导入文件。
  52. // 这里也可以使用 StringEntity 来传输任意数据。
  53. FileEntity entity = new FileEntity(file);
  54. put.setEntity(entity);
  55. try (CloseableHttpResponse response = client.execute(put)) {
  56. String loadResult = "";
  57. if (response.getEntity() != null) {
  58. loadResult = EntityUtils.toString(response.getEntity());
  59. }
  60. final int statusCode = response.getStatusLine().getStatusCode();
  61. if (statusCode != 200) {
  62. throw new IOException(
  63. String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
  64. }
  65. System.out.println("Get load result: " + loadResult);
  66. }
  67. }
  68. }
  69. private String basicAuthHeader(String username, String password) {
  70. final String tobeEncode = username + ":" + password;
  71. byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
  72. return "Basic " + new String(encoded);
  73. }
  74. public static void main(String[] args) throws Exception{
  75. DorisStreamLoader loader = new DorisStreamLoader();
  76. File file = new File(LOAD_FILE_NAME);
  77. loader.load(file);
  78. }
  79. }

注意:这里 http client 的版本要是4.5.13

org.apache.httpcomponentshttpclient4.5.13