Elasticsearch聚合查询案例分享

bboss

Elasticsearch聚合查询案例分享

1.案例介绍

本文包含三个案例:

案例1:统计特定时间范围内每个应用的总访问量、访问成功数、访问失败数,每个应用请求响应时间分段统计(1秒内,1-3秒,3-5秒,5秒以上 )

案例2:简单的term统计

案例3:简单的cardinality统计

2.准备工作

参考文档《高性能elasticsearch ORM开发库使用介绍》中的第1章节和第2章节,在自己的工程中导入bboss es依赖包和配置es参数

3.案例

3.1 案例1

3.1.1 定义统计dsl

在源码目录下新建文件esmapper/estrace/ESTracesMapper.xml,内容如下

  1. <properties>
  2. <!--
  3. 应用汇总统计:总访问量,成功数,失败数
  4. bboss es dao通过名称applicationSumStatic引用脚本
  5. -->
  6. <property name="applicationSumStatic">
  7. <![CDATA[
  8. {
  9. "query": {
  10. "bool": {
  11. "filter": [
  12. #if($channelApplications && $channelApplications.size() > 0)
  13. {
  14. "terms": {
  15. "applicationName.keyword": [ ##指定并统计多个应用的数据
  16. #foreach($application in $channelApplications)
  17. #if($velocityCount > 0),#end $application.applicationName
  18. #end
  19. ]
  20. }
  21. },
  22. #end
  23. {"range": {
  24. "startTime": {
  25. "gte": #[startTime],##统计开始时间
  26. "lt": #[endTime] ##统计截止时间
  27. }
  28. }
  29. }
  30. ]
  31. }
  32. },
  33. "size":0,
  34. "aggs": {
  35. "applicationsums": {
  36. "terms": {
  37. "field": "applicationName.keyword",##按应用名称进行统计计数
  38. "size":10000
  39. },
  40. "aggs":{
  41. "successsums" : {
  42. "terms" : {
  43. "field" : "err" ##按err标识统计每个应用的成功数和失败数,0标识成功,1标识失败
  44. }
  45. },
  46. "elapsed_ranges" : {
  47. "range" : {
  48. "field" : "elapsed", ##按响应时间分段统计
  49. "keyed" : true,
  50. "ranges" : [
  51. { "key" : "1秒", "to" : 1000 },
  52. { "key" : "3秒", "from" : 1000, "to" : 3000 },
  53. { "key" : "5秒", "from" : 3000, "to" : 5000 },
  54. { "key" : "5秒以上", "from" : 5000 }
  55. ]
  56. }
  57. }
  58. }
  59. }
  60. }
  61. }
  62. ]]>
  63. </property>
  64. </properties>

3.1.2 编写统计dao及统计方法

Java代码

  1. public class TraceESDao {
  2. public List<ApplicationStatic> getApplicationSumStatic(TraceExtraCriteria traceExtraCriteria){
  3. ClientInterface clientUtil= ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
  4. //返回json统计报文,调试用,一遍根据json报文组装统计结果列表
  5. // String response = clientUtil.executeRequest("trace-*/_search",
  6. // "applicationSumStatic",traceExtraCriteria);
  7. //根据条件进行统计,在对象traceExtraCriteria中指定开始时间和结束时间
  8. MapRestResponse restResponse = clientUtil.search("trace-*/_search",
  9. "applicationSumStatic",traceExtraCriteria);
  10. //组装统计结果
  11. //获取应用统计列表,包含每个应用的名称、总访问量以及成功数和失败数
  12. List<Map<String,Object>> appstatics = (List<Map<String,Object>>)restResponse.getAggBuckets("applicationsums");
  13. if(appstatics != null && appstatics.size() > 0) {
  14. List<ApplicationStatic> applicationStatics = new ArrayList<ApplicationStatic>(appstatics.size());
  15. ApplicationStatic applicationStatic = null;
  16. for (int i = 0; i < appstatics.size(); i++) {
  17. applicationStatic = new ApplicationStatic();
  18. Map<String, Object> map = appstatics.get(i);
  19. //应用名称
  20. String appName = (String) map.get("key");
  21. applicationStatic.setApplicationName(appName);
  22. //应用总访问量
  23. Long totalsize = ResultUtil.longValue( map.get("doc_count"),0l);
  24. applicationStatic.setTotalSize(totalsize);
  25. //获取成功数和失败数
  26. List<Map<String, Object>> appstatic = (List<Map<String, Object>>)ResultUtil.getAggBuckets(map, "successsums");
  27. /**
  28. "buckets": [
  29. {
  30. "key": 0,
  31. "doc_count": 30
  32. }
  33. ]
  34. */
  35. //key 0
  36. Long success = 0l;//成功数
  37. Long failed = 0l;//失败数
  38. for (int j = 0; j < appstatic.size(); j++) {
  39. Map<String, Object> stats = appstatic.get(j);
  40. Integer key = (Integer) stats.get("key");//成功和错误标识
  41. if (key == 0)//成功
  42. success = ResultUtil.longValue( stats.get("doc_count"),0l);
  43. else if (key == 1)//失败
  44. failed = ResultUtil.longValue( stats.get("doc_count"),0l);
  45. }
  46. applicationStatic.setSuccessCount(success);
  47. applicationStatic.setFailCount(failed);
  48. List<ApplicationPeriodStatic> applicationPeriodStatics = new ArrayList<ApplicationPeriodStatic>(4);
  49. ApplicationPeriodStatic applicationPeriodStatic = null;
  50. //获取响应时间分段统计信息
  51. Map<String, Map<String, Object>> appPeriodstatic = (Map<String, Map<String, Object>>)ResultUtil.getAggBuckets(map, "elapsed_ranges");
  52. //1秒
  53. Map<String, Object> period = appPeriodstatic.get("1秒");
  54. applicationPeriodStatic = new ApplicationPeriodStatic();
  55. applicationPeriodStatic.setPeriod("1秒");
  56. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  57. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),1000));
  58. applicationPeriodStatics.add(applicationPeriodStatic);
  59. //3秒
  60. period = appPeriodstatic.get("3秒");
  61. applicationPeriodStatic = new ApplicationPeriodStatic();
  62. applicationPeriodStatic.setPeriod("3秒");
  63. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  64. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),1000));
  65. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),3000));
  66. applicationPeriodStatics.add(applicationPeriodStatic);
  67. //5秒
  68. period = appPeriodstatic.get("5秒");
  69. applicationPeriodStatic = new ApplicationPeriodStatic();
  70. applicationPeriodStatic.setPeriod("5秒");
  71. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  72. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),3000));
  73. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),5000));
  74. applicationPeriodStatics.add(applicationPeriodStatic);
  75. //5秒以上
  76. period = appPeriodstatic.get("5秒以上");
  77. applicationPeriodStatic = new ApplicationPeriodStatic();
  78. applicationPeriodStatic.setPeriod("5秒以上");
  79. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  80. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),5000));
  81. applicationPeriodStatics.add(applicationPeriodStatic);
  82. applicationStatic.setApplicationPeriodStatics(applicationPeriodStatics);
  83. applicationStatics.add(applicationStatic);
  84. }
  85. //返回统计结果
  86. return applicationStatics;
  87. }
  88. return null;
  89. }
  90. }

3.1.3 执行测试用例

Java代码

  1. @Test
  2. public void testAppStatic(){
  3. TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
  4. traceExtraCriteria.setStartTime(1516304868072l);
  5. traceExtraCriteria.setEndTime(1516349516377l);
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
  7. //通过下面的方法先得到查询的json报文,然后再通过MapRestResponse查询遍历结果,调试的时候打开String response的注释
  8. //String response = clientUtil.executeRequest("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  9. //System.out.println(response);
  10. MapRestResponse restResponse = clientUtil.search("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  11. List<Map<String,Object>> appstatics = restResponse.getAggBuckets("applicationsums",new ESTypeReference<List<Map<String,Object>>>(){});
  12. int doc_count_error_upper_bound = restResponse.getAggAttribute("applicationsums","doc_count_error_upper_bound",int.class);
  13. int sum_other_doc_count = restResponse.getAggAttribute("applicationsums","sum_other_doc_count",int.class);
  14. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  15. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  16. for(int i = 0; i < appstatics.size(); i ++){
  17. Map<String,Object> map = appstatics.get(i);
  18. //应用名称
  19. String appName = (String)map.get("key");
  20. //应用总访问量
  21. int totalsize = (int)map.get("doc_count");
  22. //获取成功数和失败数
  23. List<Map<String,Object>> appstatic = ResultUtil.getAggBuckets(map ,"successsums",new ESTypeReference<List<Map<String,Object>>>(){});
  24. doc_count_error_upper_bound = ResultUtil.getAggAttribute(map ,"successsums","doc_count_error_upper_bound",int.class);
  25. sum_other_doc_count = ResultUtil.getAggAttribute(map ,"successsums","sum_other_doc_count",int.class);
  26. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  27. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  28. /**
  29. "buckets": [
  30. {
  31. "key": 0,
  32. "doc_count": 30
  33. }
  34. ]
  35. */
  36. //key 0
  37. int success = 0;//成功数
  38. int failed = 0;//失败数
  39. for(int j = 0; j < appstatic.size(); i ++){
  40. Map<String,Object> stats = appstatic.get(i);
  41. int key = (int) stats.get("key");//成功和错误标识
  42. if(key == 0)
  43. success = (int)stats.get("doc_count");
  44. else if(key == 1)
  45. failed = (int)stats.get("doc_count");
  46. }
  47. }
  48. }

3.1.4 获取元数据信息的测试方法

java代码

  1. @Test
  2. public void testAppStatic(){
  3. TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
  4. traceExtraCriteria.setStartTime(1516304868072l);
  5. traceExtraCriteria.setEndTime(1516349516377l);
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
  7. //通过下面的方法先得到查询的json报文,然后再通过MapRestResponse查询遍历结果,调试的时候打开String response的注释
  8. //String response = clientUtil.executeRequest("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  9. //System.out.println(response);
  10. MapRestResponse restResponse = clientUtil.search("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  11. List<Map<String,Object>> appstatics = restResponse.getAggBuckets("applicationsums",new ESTypeReference<List<Map<String,Object>>>(){});
  12. int doc_count_error_upper_bound = restResponse.getAggAttribute("applicationsums","doc_count_error_upper_bound",int.class);
  13. int sum_other_doc_count = restResponse.getAggAttribute("applicationsums","sum_other_doc_count",int.class);
  14. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  15. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  16. for(int i = 0; i < appstatics.size(); i ++){
  17. Map<String,Object> map = appstatics.get(i);
  18. //应用名称
  19. String appName = (String)map.get("key");
  20. //应用总访问量
  21. int totalsize = (int)map.get("doc_count");
  22. //获取成功数和失败数
  23. List<Map<String,Object>> appstatic = ResultUtil.getAggBuckets(map ,"successsums",new ESTypeReference<List<Map<String,Object>>>(){});
  24. doc_count_error_upper_bound = ResultUtil.getAggAttribute(map ,"successsums","doc_count_error_upper_bound",int.class);
  25. sum_other_doc_count = ResultUtil.getAggAttribute(map ,"successsums","sum_other_doc_count",int.class);
  26. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  27. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  28. /**
  29. "buckets": [
  30. {
  31. "key": 0,
  32. "doc_count": 30
  33. }
  34. ]
  35. */
  36. //key 0
  37. int success = 0;//成功数
  38. int failed = 0;//失败数
  39. for(int j = 0; j < appstatic.size(); i ++){
  40. Map<String,Object> stats = appstatic.get(i);
  41. int key = (int) stats.get("key");//成功和错误标识
  42. if(key == 0)
  43. success = (int)stats.get("doc_count");
  44. else if(key == 1)
  45. failed = (int)stats.get("doc_count");
  46. }
  47. }
  48. }

3.2 案例2 简单的term统计

3.2.1 定义dsl

建立dsl配置文件esmapper/testagg.xml,定义termAgg:

  1. <property name="termAgg">
  2. <![CDATA[
  3. {
  4. ## 设置查询条件
  5. #* 注释掉统计条件 *#
  6. "query": {
  7. "bool": {
  8. "filter": [
  9. {
  10. "term": {
  11. "applicationName.keyword": #[application]
  12. }
  13. },
  14. {
  15. "term": {
  16. "rpc.keyword": #[rpc]
  17. }
  18. },
  19. {"range": {
  20. "startTime": {
  21. "gte": #[startTime],
  22. "lt": #[endTime]
  23. }
  24. }
  25. }
  26. ]
  27. }
  28. },
  29. "size":0,
  30. ## 聚合查询
  31. "aggs": {
  32. "traces": {
  33. "terms": {
  34. "field": "rpc.keyword",
  35. "size":10000
  36. }
  37. }
  38. }
  39. }
  40. ]]>
  41. </property>

3.2.2 执行dsl

  1. @Test
  2. public void termAgg(){
  3. ClientInterface clientInterface = ElasticSearchHelper.getConfigRestClientUtil("esmapper/testagg.xml");
  4. //ESDatas<Map> traces = clientInterface.searchAll("trace-*",1000,Map.class);//获取总记录集合
  5. Map params = new HashMap();//聚合统计条件参数
  6. params.put("application","testweb");
  7. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");
  8. try {
  9. params.put("startTime",format.parse("1999-01-01 00:00:00").getTime());
  10. params.put("endTime",new Date().getTime());
  11. params.put("rpc","/testweb/jsp/logoutredirect.jsp");
  12. } catch (ParseException e) {
  13. e.printStackTrace();
  14. }
  15. //一行代码,执行每个服务的访问量总数统计
  16. ESAggDatas<LongAggHit> response = clientInterface.searchAgg("trace-*/_search",//从trace-开头的索引表中检索数据
  17. "termAgg", //配置在esmapper/testagg.xml中的dsl语句
  18. params, //dsl语句termAgg中需要的查询参数
  19. LongAggHit.class, //封装聚合统计中每个服务地址及服务访问量的地址
  20. "traces"); //term统计桶的名称,参见dsl语句
  21. List<LongAggHit> aggHitList = response.getAggDatas();//每个服务的访问量
  22. long totalSize = response.getTotalSize();//总访问量
  23. }

3.3 案例3 简单的cardinality统计

3.3.1 定义dsl

建立dsl配置文件esmapper/testagg.xml,定义candicateAgg:

  1. <property name="candicateAgg">
  2. <![CDATA[
  3. {
  4. ## 设置查询条件
  5. #* 注释掉统计条件
  6. "query": {
  7. "bool": {
  8. "filter": [
  9. {
  10. "term": {
  11. "applicationName.keyword": #[application]
  12. }
  13. },
  14. {
  15. "term": {
  16. "rpc.keyword": #[rpc]
  17. }
  18. },
  19. {"range": {
  20. "startTime": {
  21. "gte": #[startTime],
  22. "lt": #[endTime]
  23. }
  24. }
  25. }
  26. ]
  27. }
  28. },
  29. *#
  30. "size":0,
  31. ## 聚合查询
  32. "aggs": {
  33. "traces": {
  34. "cardinality" : {
  35. "field" : "rpc.keyword",
  36. "precision_threshold": 100
  37. }
  38. }
  39. }
  40. }
  41. ]]>
  42. </property>

3.3.2 执行dsl

  1. @Test
  2. public void candicateAgg(){
  3. ClientInterface clientInterface = ElasticSearchHelper.getConfigRestClientUtil("esmapper/testagg.xml");
  4. Map params = null;//单值聚合统计条件参数
  5. //一行代码,执行服务基数统计
  6. ESAggDatas<SingleLongAggHit> response = clientInterface.searchAgg("trace-*/_search","candicateAgg",params,SingleLongAggHit.class,"traces");
  7. SingleLongAggHit aggHitList = response.getSingleAggData();
  8. long value = aggHitList.getValue();
  9. long totalSize = response.getTotalSize();//总访问量
  10. }

4.相关资料

bboss elasticsearch交流:166471282