Index API

Index 请求

IndexRequest 要求下列参数:

  1. IndexRequest request = new IndexRequest(
  2. "posts", //Index
  3. "doc", //Type
  4. "1"); //Document id
  5. String jsonString = "{" +
  6. "\"user\":\"kimchy\"," +
  7. "\"postDate\":\"2013-01-30\"," +
  8. "\"message\":\"trying out Elasticsearch\"" +
  9. "}";
  10. request.source(jsonString, XContentType.JSON); /以字符串提供的 Document source

文档来源

文件来源可以以不同的方式提供:

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("user", "kimchy");
  3. jsonMap.put("postDate", new Date());
  4. jsonMap.put("message", "trying out Elasticsearch");
  5. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1").source(jsonMap); //Map 作为文档源,它可以自动转换为 JSON 格式。
  1. XContentBuilder builder = XContentFactory.jsonBuilder();
  2. builder.startObject();
  3. {
  4. builder.field("user", "kimchy");
  5. builder.field("postDate", new Date());
  6. builder.field("message", "trying out Elasticsearch");
  7. }
  8. builder.endObject();
  9. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1").source(builder); //XContentBuilder 对象作为文档源,由 Elasticsearch 内置的帮助器生成 JSON 内容
  1. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  2. .source("user", "kimchy",
  3. "postDate", new Date(),
  4. "message", "trying out Elasticsearch"); //以键值对对象作为文档来源,它自动转换为 JSON 格式

可选参数

下列参数可选:

  1. request.routing("routing"); //Routing 值
  1. request.parent("parent"); //Parent 值
  1. request.timeout(TimeValue.timeValueSeconds(1)); //`TimeValue`类型的等待主分片可用的超时时间
  2. request.timeout("1s"); //`String` 类型的等待主分片可用的超时时间
  1. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //以 WriteRequest.RefreshPolicy 实例的刷新策略参数
  2. request.setRefreshPolicy("wait_for"); // 字符串刷新策略参数
  1. request.version(2); //版本
  1. request.versionType(VersionType.EXTERNAL); //版本类型
  1. request.opType(DocWriteRequest.OpType.CREATE); //提供一个 DocWriteRequest.OpType 值作为操作类型
  2. request.opType("create"); //字符串类型的操作类型参数: 可以是 create 或 update (默认值)
  1. request.setPipeline("pipeline"); //在索引文档之前要执行的摄取管道的名称

同步执行

  1. IndexResponse indexResponse = client.index(request);

异步执行

  1. client.indexAsync(request, new ActionListener<IndexResponse>() {
  2. @Override
  3. public void onResponse(IndexResponse indexResponse) {
  4. //当操作成功完成的时候被调用。响应对象以参数的形式传入。
  5. }
  6. @Override
  7. public void onFailure(Exception e) {
  8. //故障时被调用。异常对象以参数的形式传入
  9. }
  10. });

Index 响应

返回的“IndexResponse”可以检索有关执行操作的信息,如下所示:

  1. String index = indexResponse.getIndex();
  2. String type = indexResponse.getType();
  3. String id = indexResponse.getId();
  4. long version = indexResponse.getVersion();
  5. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. //处理(如果需要)首次创建文档的情况
  7. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  8. //处理(如果需要)文档已经存在时被覆盖的情况
  9. }
  10. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  11. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  12. //处理成功碎片数少于总碎片的情况
  13. }
  14. if (shardInfo.getFailed() > 0) {
  15. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  16. String reason = failure.reason();//处理潜在的故障
  17. }
  18. }

如果存在版本冲突,将抛出 ElasticsearchException :

  1. IndexRequest request = new IndexRequest("posts", "doc", "1")
  2. .source("field", "value")
  3. .version(1);
  4. try {
  5. IndexResponse response = client.index(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. //表示是由于返回了版本冲突错误引发的异常
  9. }
  10. }

发生同样的情况发生在opType设置为create但是已经存在具有相同索引,类型和id的文档时:

  1. IndexRequest request = new IndexRequest("posts", "doc", "1")
  2. .source("field", "value")
  3. .opType(DocWriteRequest.OpType.CREATE);
  4. try {
  5. IndexResponse response = client.index(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. //表示由于返回了版本冲突错误引发的异常
  9. }
  10. }