Bulk API

注意: Java高级REST客户端提供批量处理器来协助大量请求

Bulk 请求

BulkRequest 可以被用在使用单个请求执行多个 索引,更新 和/或 删除 操作的情况下。

它要求至少要一个操作被添加到 Bulk 请求上:

  1. BulkRequest request = new BulkRequest(); // 创建 BulkRequest
  2. request.add(new IndexRequest("posts", "doc", "1") // 添加第一个 IndexRequest 到 Bulk 请求上, 参看 [Index API](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html) 获取更多关于如何构建 IndexRequest 的信息.
  3. .source(XContentType.JSON,"field", "foo"));
  4. request.add(new IndexRequest("posts", "doc", "2") // 添加第二个 IndexRequest
  5. .source(XContentType.JSON,"field", "bar"));
  6. request.add(new IndexRequest("posts", "doc", "3") // 添加第三个 IndexRequest
  7. .source(XContentType.JSON,"field", "baz"));

警告: Bulk API仅支持以JSON或SMILE编码的文档。 以任何其他格式提供文件将导致错误。

不同的操作类型也可以添加到同一个BulkRequest中:

  1. BulkRequest request = new BulkRequest();
  2. request.add(new DeleteRequest("posts", "doc", "3")); //
  3. Adds a DeleteRequest to the BulkRequest. See Delete API for more information on how to build DeleteRequest.
  4. request.add(new UpdateRequest("posts", "doc", "2")
  5. .doc(XContentType.JSON,"other", "test")); //
  6. Adds an UpdateRequest to the BulkRequest. See Update API for more information on how to build UpdateRequest.
  7. request.add(new IndexRequest("posts", "doc", "4") //Adds an IndexRequest using the SMILE format
  8. .source(XContentType.JSON,"field", "baz"));

可选参数

提供下列可选参数:

  1. request.timeout(TimeValue.timeValueMinutes(2));
  2. request.timeout("2m");
  3. Timeout to wait for the bulk request to be performed as a TimeValue
  4. Timeout to wait for the bulk request to be performed as a String
  5. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
  6. request.setRefreshPolicy("wait_for");
  7. Refresh policy as a WriteRequest.RefreshPolicy instance
  8. Refresh policy as a String
  9. request.waitForActiveShards(2);
  10. request.waitForActiveShards(ActiveShardCount.ALL);
  11. Sets the number of shard copies that must be active before proceeding with the index/update/delete operations.
  12. Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)

同步执行

  1. BulkResponse bulkResponse = client.bulk(request);

异步执行

  1. client.bulkAsync(request, new ActionListener<BulkResponse>() {
  2. @Override
  3. public void onResponse(BulkResponse bulkResponse) {
  4. //Called when the execution is successfully completed. The response is provided as an argument and contains a list of individual results for each operation that was executed. Note that one or more operations might have failed while the others have been successfully executed.
  5. }
  6. @Override
  7. public void onFailure(Exception e) {
  8. //Called when the whole BulkRequest fails. In this case the raised exception is provided as an argument and no operation has been executed.
  9. }
  10. });

Bulk 响应

返回的BulkResponse包含有关执行操作的信息,并允许对每个结果进行迭代,如下所示:

  1. for (BulkItemResponse bulkItemResponse : bulkResponse) { //迭代所有操作的结果
  2. DocWriteResponse itemResponse = bulkItemResponse.getResponse(); //Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
  3. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  4. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  5. // Handle the response of an index operation
  6. IndexResponse indexResponse = (IndexResponse) itemResponse;
  7. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  8. //Handle the response of a update operation
  9. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
  10. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  11. // Handle the response of a delete operation
  12. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
  13. }
  14. }

批量响应提供了一种快速检查一个或多个操作是否失败的方法:

  1. if (bulkResponse.hasFailures()) { // 只要有一个操作失败了,这个方法就返回 true
  2. }

在这种情况下,有必要迭代所有运算结果,以检查操作是否失败,如果是,则检索相应的故障:

  1. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  2. if (bulkItemResponse.isFailed()) { //Indicate if a given operation failed
  3. BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); //Retrieve the failure of the failed operation
  4. }
  5. }

Bulk 处理器

BulkProcessor通过提供允许索引/更新/删除操作在添加到处理器时透明执行的实用程序类来简化Bulk API的使用。

为了执行请求,BulkProcessor需要3个组件:

  • RestHighLevelClient

这个客户端用来执行 BulkRequest 并接收 BulkResponse 。

  • BulkProcessor.Listener

这个监听器会在每个 BulkRequest 执行之前和之后被调用,或者当 BulkRequest 失败时调用。

  • ThreadPool

BulkRequest执行是使用这个池的线程完成的,允许BulkProcessor以非阻塞的方式工作,并允许在批量请求执行的同时接受新的索引/更新/删除请求。

然后 BulkProcessor.Builder 类可以被用来构建新的 BulkProcessor :

  1. ThreadPool threadPool = new ThreadPool(settings); // 使用已有的 Settings 对象创建 ThreadPool。
  2. BulkProcessor.Listener listener = new BulkProcessor.Listener() { // 创建 BulkProcessor.Listener
  3. @Override
  4. public void beforeBulk(long executionId, BulkRequest request) {
  5. //This method is called before each execution of a BulkRequest
  6. }
  7. @Override
  8. public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  9. // This method is called after each execution of a BulkRequest
  10. }
  11. @Override
  12. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  13. //This method is called when a BulkRequest failed
  14. }
  15. };
  16. BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
  17. .build(); // 通过调用 BulkProcessor.Builder 的 build() 方法创建 BulkProcessor。 RestHighLevelClient.bulkAsync() 将被用来执行 BulkRequest。

BulkProcessor.Builder 提供了方法来配置 BulkProcessor 应该如何处理请求的执行:

  1. BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
  2. builder.setBulkActions(500); //Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
  3. builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
  4. builder.setConcurrentRequests(0); //Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
  5. builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
  6. builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.

一旦创建了BulkProcessor,可以向其添加请求:

  1. IndexRequest one = new IndexRequest("posts", "doc", "1").
  2. source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
  3. IndexRequest two = new IndexRequest("posts", "doc", "2")
  4. .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
  5. IndexRequest three = new IndexRequest("posts", "doc", "3")
  6. .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
  7. bulkProcessor.add(one);
  8. bulkProcessor.add(two);
  9. bulkProcessor.add(three);

这些请求将由 BulkProcessor 执行,它负责为每个批量请求调用 BulkProcessor.Listener 。
监听器提供方法接收 BulkResponse 和 BulkResponse :

  1. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  2. @Override
  3. public void beforeBulk(long executionId, BulkRequest request) {
  4. int numberOfActions = request.numberOfActions(); //Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
  5. logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
  6. }
  7. @Override
  8. public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  9. if (response.hasFailures()) {
  10. //在每个 BulkRequest 执行之后调用,此方法允许获知 BulkResponse 是否包含错误
  11. logger.warn("Bulk [{}] executed with failures", executionId);
  12. } else {
  13. logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
  14. }
  15. }
  16. @Override
  17. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  18. logger.error("Failed to execute bulk", failure); //如果 BulkRequest 执行失败则调用,此方法可获知失败情况。
  19. }
  20. };

一旦将所有请求都添加到BulkProcessor,其实例需要使用两种可用的关闭方法之一关闭。

一旦所有请求都被添加到了 BulkProcessor, 它的实例就需要使用两种可用的关闭方法之一进行关闭。

awaitClose() 可以被用来等待到所有请求都被处理,或者到指定的等待时间:

  1. boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

如果所有批量请求完成,则该方法返回 true ,如果在完成所有批量请求之前等待时间过长,则返回 false 。

close() 方法可以被用来立即关闭 BulkProcessor :

  1. bulkProcessor.close();

在关闭处理器之前,两个方法都会刷新已经被天教导处理器的请求,并禁止添加任何新的请求。