Using scrolls in Java

首先需要阅读 scroll documentation

一般搜索请求都是返回一”页”数据,无论数据量多大都一起返回给用户,Scroll API可以允许我们检索大量数据(甚至全部数据)。Scroll API允许我们做一个初始阶段搜索并且持续批量从Elasticsearch里拉取结果直到没有结果剩下。这有点像传统数据库里的cursors(游标)。
Scroll API的创建并不是为了实时的用户响应,而是为了处理大量的数据(Scrolling is not intended for real time user requests, but rather for processing large amounts of data)。从 scroll 请求返回的结果只是反映了 search 发生那一时刻的索引状态,就像一个快照(The results that are returned from a scroll request reflect the state of the index at the time that the initial search request was made, like a snapshot in time)。后续的对文档的改动(索引、更新或者删除)都只会影响后面的搜索请求。

  1. import static org.elasticsearch.index.query.QueryBuilders.*;
  1. QueryBuilder qb = termQuery("multi", "test");
  2. SearchResponse scrollResp = client.prepareSearch(test)
  3. .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
  4. .setScroll(new TimeValue(60000)) //为了使用 scroll,初始搜索请求应该在查询中指定 scroll 参数,告诉 Elasticsearch 需要保持搜索的上下文环境多长时间(滚动时间)
  5. .setQuery(qb)
  6. .setSize(100).get(); //max of 100 hits will be returned for each scroll
  7. //Scroll until no hits are returned
  8. do {
  9. for (SearchHit hit : scrollResp.getHits().getHits()) {
  10. //Handle the hit...
  11. }
  12. scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
  13. } while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

如果超过滚动时间,继续使用该滚动ID搜索数据,则会报错:

  1. Caused by: SearchContextMissingException[No search context found for id [2861]]
  2. at org.elasticsearch.search.SearchService.findContext(SearchService.java:613)
  3. at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:403)
  4. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:384)
  5. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:381)
  6. at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
  7. at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
  8. at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
  9. at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  12. at java.lang.Thread.run(Thread.java:745)

虽然当滚动有效时间已过,搜索上下文(Search Context)会自动被清除,但是一值保持滚动代价也是很大的,所以当我们不在使用滚动时要尽快使用Clear-Scroll API进行清除。

清除Scroll

  1. /**
  2. * 清除滚动ID
  3. * @param client
  4. * @param scrollIdList
  5. * @return
  6. */
  7. public static boolean clearScroll(Client client, List<String> scrollIdList){
  8. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
  9. clearScrollRequestBuilder.setScrollIds(scrollIdList);
  10. ClearScrollResponse response = clearScrollRequestBuilder.get();
  11. return response.isSucceeded();
  12. }
  13. /**
  14. * 清除滚动ID
  15. * @param client
  16. * @param scrollId
  17. * @return
  18. */
  19. public static boolean clearScroll(Client client, String scrollId){
  20. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
  21. clearScrollRequestBuilder.addScrollId(scrollId);
  22. ClearScrollResponse response = clearScrollRequestBuilder.get();
  23. return response.isSucceeded();
  24. }

实例

  1. public class ScrollsAPI extends ElasticsearchClientBase {
  2. private String scrollId;
  3. @Test
  4. public void testScrolls() throws Exception {
  5. SearchResponse scrollResp = client.prepareSearch("twitter")
  6. .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
  7. .setScroll(new TimeValue(60000)) //为了使用 scroll,初始搜索请求应该在查询中指定 scroll 参数,告诉 Elasticsearch 需要保持搜索的上下文环境多长时间(滚动时间)
  8. .setQuery(QueryBuilders.termQuery("user", "kimchy")) // Query 查询条件
  9. .setSize(5).get(); //max of 100 hits will be returned for each scroll
  10. //Scroll until no hits are returned
  11. scrollId = scrollResp.getScrollId();
  12. do {
  13. for (SearchHit hit : scrollResp.getHits().getHits()) {
  14. //Handle the hit...
  15. System.out.println("" + hit.getSource().toString());
  16. }
  17. scrollResp = client.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();
  18. }
  19. while (scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.
  20. }
  21. @Override
  22. public void tearDown() throws Exception {
  23. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
  24. clearScrollRequestBuilder.addScrollId(scrollId);
  25. ClearScrollResponse response = clearScrollRequestBuilder.get();
  26. if (response.isSucceeded()) {
  27. System.out.println("成功清除");
  28. }
  29. super.tearDown();
  30. }
  31. }