Update API

更新请求

UpdateRequest 要求下列参数:

  1. UpdateRequest request = new UpdateRequest(
  2. "posts", // Index
  3. "doc", // Type
  4. "1"); // Document id

Update API允许通过使用脚本或传递部分文档来更新现有文档。

使用脚本更新

该脚本可以是一个内联脚本:

  1. Map<String, Object> parameters = singletonMap("count", 4); // 脚本参数以一个 Map 对象提供。
  2. Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters); // 使用 painless 语言创建内联脚本,并传入参数值。
  3. request.script(inline); // 将脚本传递给更新请求对象

或者是一个存储脚本:

  1. // 引用一个名为 increment-field 的painless 的存储脚本
  2. Script stored =
  3. new Script(ScriptType.STORED, "painless", "increment-field", parameters);
  4. request.script(stored); // 给请求设置脚本

使用局部文档更新

使用局部文档更新时,局部文档将与现有文档合并。

局部文档可以以不同的方式提供:

  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1");
  2. String jsonString = "{" +
  3. "\"updated\":\"2017-01-01\"," +
  4. "\"reason\":\"daily update\"" +
  5. "}";
  6. request.doc(jsonString, XContentType.JSON); // 以一个 JSON 格式的字符串提供的局部文档源
  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("updated", new Date());
  3. jsonMap.put("reason", "daily update");
  4. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  5. .doc(jsonMap); // 以一个可自动转换为 JSON 格式的 Map 提供局部文档源
  1. XContentBuilder builder = XContentFactory.jsonBuilder();
  2. builder.startObject();
  3. {
  4. builder.field("updated", new Date());
  5. builder.field("reason", "daily update");
  6. }
  7. builder.endObject();
  8. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  9. .doc(builder); // 以 XContentBuilder 对象提供局部文档源, Elasticsearch 构建辅助器将生成 JSON 格式。
  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  2. .doc("updated", new Date(),
  3. "reason", "daily update"); // 以 Object 键值对提供局部文档源,它将被转换为 JSON 格式。

更新或插入

如果文档不存在,可以使用upsert方法定义一些将作为新文档插入的内容:

  1. String jsonString = "{\"created\":\"2017-01-01\"}";
  2. request.upsert(jsonString, XContentType.JSON); // 以字符串提供更新插入的文档源
  3. 与局部文档更新类似,可以使用接受 String Map XContentBuilder Object 键值对的方式使用upsert 方法更新或插入文档的内容。

可选参数

提供下列可选参数:

  1. request.routing("routing"); // 路由值
  2. request.parent("parent"); //Parent 值
  3. request.timeout(TimeValue.timeValueMinutes(2)); // TimeValue 类型的等待主分片可用的超时时间
  4. request.timeout("2m"); // 字符串类型的等待主分片可用的超时时间
  5. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // Refresh policy as a WriteRequest.RefreshPolicy instance
  6. request.setRefreshPolicy("wait_for"); // Refresh policy as a String
  7. request.retryOnConflict(3); // How many times to retry the update operation if the document to update has been changed by another operation between the get and indexing phases of the update operation
  8. request.fetchSource(true); //Enable source retrieval, disabled by default
  9. request.version(2); // 版本号
  10. request.detectNoop(false); // Disable the noop detection
  11. request.scriptedUpsert(true); // Indicate that the script must run regardless of whether the document exists or not, ie the script takes care of creating the document if it does not already exist.
  12. request.docAsUpsert(true); // Indicate that the partial document must be used as the upsert document if it does not exist yet.
  13. request.waitForActiveShards(2); //Sets the number of shard copies that must be active before proceeding with the update operation.
  14. request.waitForActiveShards(ActiveShardCount.ALL); //Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)
  1. String[] includes = new String[]{"updated", "r*"};
  2. String[] excludes = Strings.EMPTY_ARRAY;
  3. request.fetchSource(new FetchSourceContext(true, includes, excludes)); // Configure source inclusion for specific fields
  1. String[] includes = Strings.EMPTY_ARRAY;
  2. String[] excludes = new String[]{"updated"};
  3. request.fetchSource(new FetchSourceContext(true, includes, excludes)); //Configure source exclusion for specific fields

同步执行

  1. UpdateResponse updateResponse = client.update(request);

异步执行

  1. client.updateAsync(request, new ActionListener<UpdateResponse>() {
  2. @Override
  3. public void onResponse(UpdateResponse updateResponse) {
  4. // Called when the execution is successfully completed. The response is provided as an argument.
  5. }
  6. @Override
  7. public void onFailure(Exception e) {
  8. // Called in case of failure. The raised exception is provided as an argument.
  9. }
  10. });

更新响应

返回的UpdateResponse允许获取执行操作的相关信息,如下所示:

  1. String index = updateResponse.getIndex();
  2. String type = updateResponse.getType();
  3. String id = updateResponse.getId();
  4. long version = updateResponse.getVersion();
  5. if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. //Handle the case where the document was created for the first time (upsert)
  7. } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  8. // Handle the case where the document was updated
  9. } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
  10. //Handle the case where the document was deleted
  11. } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
  12. //Handle the case where the document was not impacted by the update, ie no operation (noop) was executed on the document
  13. }

当通过 fetchSource 方法在UpdateRequest 里设置了启用接收源,相应对象将包含被更新的文档源:

  1. GetResult result = updateResponse.getGetResult(); //Retrieve the updated document as a GetResult
  2. if (result.isExists()) {
  3. String sourceAsString = result.sourceAsString(); //Retrieve the source of the updated document as a String
  4. Map<String, Object> sourceAsMap = result.sourceAsMap(); //Retrieve the source of the updated document as a Map<String, Object>
  5. byte[] sourceAsBytes = result.source(); //Retrieve the source of the updated document as a byte[]
  6. } else {
  7. //Handle the scenario where the source of the document is not present in the response (this is the case by default)
  8. }

这也可以用了检查分片故障:

  1. ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
  2. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  3. //Handle the situation where number of successful shards is less than total shards
  4. }
  5. if (shardInfo.getFailed() > 0) {
  6. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  7. String reason = failure.reason(); //Handle the potential failures
  8. }
  9. }

当对一个不存在的文档执行 UpdateRequest 时,响应将包含 404 状态码,并抛出一个需要如下所示处理的 ElasticsearchException 异常:

  1. UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist").doc("field", "value");
  2. try {
  3. UpdateResponse updateResponse = client.update(request);
  4. } catch (ElasticsearchException e) {
  5. if (e.status() == RestStatus.NOT_FOUND) {
  6. // 处理由于文档不存在导致的异常。
  7. }
  8. }

如果有文档版本冲突,也会抛出 ElasticsearchException:

  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  2. .doc("field", "value")
  3. .version(1);
  4. try {
  5. UpdateResponse updateResponse = client.update(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. // 表明异常是由于返回来了版本冲突错误导致的
  9. }
  10. }