Elasticsearch Sliced Scroll分页检索案例分享

bboss

The best elasticsearch highlevel java rest api——-bboss

Elasticsearch Sliced Scroll分页检索案例分享

我们在文章《Elasticsearch Scroll分页检索案例分享》中介绍了elasticsearch scroll的基本用法,本文介绍Elasticsearch Sliced Scroll分页检索功能。

1.准备工作

参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置es客户端

2.定义Sliced Scroll检索dsl

创建配置文件-在resources目录下定义文件scroll.xml

  1. esmapper/scroll.xml

文件内容包含Sliced Scroll检索dsl语句-scrollSliceQuery

  1. <property name="scrollSliceQuery">
  2. <![CDATA[
  3. {
  4. "slice": {
  5. "id": $id,
  6. "max": $max
  7. },
  8. "size":$size,
  9. "query": {
  10. "term" : {
  11. "gc.jvmGcOldCount" : 3
  12. }
  13. }
  14. }
  15. ]]>
  16. </property>

3.串行方式执行slice检索

  1. /**
  2. * 串行方式执行slice scroll操作
  3. */
  4. @Test
  5. public void testSliceScroll() {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  7. List<String> scrollIds = new ArrayList<>();
  8. long starttime = System.currentTimeMillis();
  9. //scroll slice分页检索
  10. int max = 6;
  11. long realTotalSize = 0;
  12. for (int i = 0; i < max; i++) {
  13. Map params = new HashMap();
  14. params.put("id", i);
  15. params.put("max", max);//最多6个slice,不能大于share数
  16. params.put("size", 100);//每页100条记录
  17. ESDatas<Map> sliceResponse = clientUtil.searchList("agentstat-*/_search?scroll=1m",
  18. "scrollSliceQuery", params,Map.class);
  19. List<Map> sliceDatas = sliceResponse.getDatas();
  20. realTotalSize = realTotalSize + sliceDatas.size();
  21. long totalSize = sliceResponse.getTotalSize();
  22. String scrollId = sliceResponse.getScrollId();
  23. if (scrollId != null)
  24. scrollIds.add(scrollId);
  25. System.out.println("totalSize:" + totalSize);
  26. System.out.println("scrollId:" + scrollId);
  27. if (sliceDatas != null && sliceDatas.size() >= 100) {//每页100条记录,迭代scrollid,遍历scroll分页结果
  28. do {
  29. sliceResponse = clientUtil.searchScroll("1m", scrollId, Map.class);
  30. String sliceScrollId = sliceResponse.getScrollId();
  31. if (sliceScrollId != null)
  32. scrollIds.add(sliceScrollId);
  33. sliceDatas = sliceResponse.getDatas();
  34. if (sliceDatas == null || sliceDatas.size() < 100) {
  35. break;
  36. }
  37. realTotalSize = realTotalSize + sliceDatas.size();
  38. } while (true);
  39. }
  40. }
  41. //打印处理耗时和实际检索到的数据
  42. long endtime = System.currentTimeMillis();
  43. System.out.println("耗时:"+(endtime - starttime)+",realTotalSize:"+realTotalSize);
  44. //查询存在es服务器上的scroll上下文信息
  45. String scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
  46. System.out.println(scrolls);
  47. //处理完毕后清除scroll上下文信息
  48. if(scrollIds.size() > 0) {
  49. scrolls = clientUtil.deleteScrolls(scrollIds);
  50. System.out.println(scrolls);
  51. }
  52. //清理完毕后查看scroll上下文信息
  53. scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
  54. System.out.println(scrolls);
  55. }

4.并行方式执行slice检索

  1. //用来存放实际slice检索总记录数
  2. long realTotalSize ;
  3. //辅助方法,用来累计每次scroll获取到的记录数
  4. synchronized void incrementSize(int size){
  5. this.realTotalSize = this.realTotalSize + size;
  6. }
  7. /**
  8. * 并行方式执行slice scroll操作
  9. */
  10. @Test
  11. public void testParralSliceScroll() {
  12. final ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  13. final List<String> scrollIds = new ArrayList<>();
  14. long starttime = System.currentTimeMillis();
  15. //scroll slice分页检索
  16. final int max = 6;
  17. final CountDownLatch countDownLatch = new CountDownLatch(max);//线程任务完成计数器,每个线程对应一个sclice,每运行完一个slice任务,countDownLatch计数减去1
  18. for (int j = 0; j < max; j++) {//启动max个线程,并行处理每个slice任务
  19. final int i = j;
  20. Thread sliceThread = new Thread(new Runnable() {//多线程并行执行scroll操作做,每个线程对应一个sclice
  21. @Override
  22. public void run() {
  23. Map params = new HashMap();
  24. params.put("id", i);
  25. params.put("max", max);//最多6个slice,不能大于share数
  26. params.put("size", 100);//每页100条记录
  27. ESDatas<Map> sliceResponse = clientUtil.searchList("agentstat-*/_search?scroll=1m",
  28. "scrollSliceQuery", params,Map.class);
  29. List<Map> sliceDatas = sliceResponse.getDatas();
  30. incrementSize( sliceDatas.size());//统计实际处理的文档数量
  31. long totalSize = sliceResponse.getTotalSize();
  32. String scrollId = sliceResponse.getScrollId();
  33. if (scrollId != null)
  34. scrollIds.add(scrollId);
  35. System.out.println("totalSize:" + totalSize);
  36. System.out.println("scrollId:" + scrollId);
  37. if (sliceDatas != null && sliceDatas.size() >= 100) {//每页100条记录,迭代scrollid,遍历scroll分页结果
  38. do {
  39. sliceResponse = clientUtil.searchScroll("1m", scrollId, Map.class);
  40. String sliceScrollId = sliceResponse.getScrollId();
  41. if (sliceScrollId != null)
  42. scrollIds.add(sliceScrollId);
  43. sliceDatas = sliceResponse.getDatas();
  44. if (sliceDatas == null || sliceDatas.size() < 100) {
  45. break;
  46. }
  47. incrementSize( sliceDatas.size());//统计实际处理的文档数量
  48. } while (true);
  49. }
  50. countDownLatch.countDown();//slice检索完毕后计数器减1
  51. }
  52. });
  53. sliceThread.start();//启动线程
  54. }
  55. try {
  56. countDownLatch.await();//等待所有的线程执行完毕,计数器变成0
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. //打印处理耗时和实际检索到的数据
  61. long endtime = System.currentTimeMillis();
  62. System.out.println("耗时:"+(endtime - starttime)+",realTotalSize:"+realTotalSize);
  63. //查询存在es服务器上的scroll上下文信息
  64. String scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
  65. // System.out.println(scrolls);
  66. //处理完毕后清除scroll上下文信息
  67. if(scrollIds.size() > 0) {
  68. scrolls = clientUtil.deleteScrolls(scrollIds);
  69. // System.out.println(scrolls);
  70. }
  71. //清理完毕后查看scroll上下文信息
  72. scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
  73. // System.out.println(scrolls);
  74. }

通过串行运行和并行运行结果比较,并行处理的性能要好很多,实际检索到的文档数量等价一致。

5.参考文档

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/search-request-scroll.html

6.开发交流

elasticsearch技术交流群:166471282

elasticsearch微信公众号:

img