前言

最近遇到几个客户在HybridDB上做性能测试时,都遇到Count Distinct的性能调优问题。这里我们总结一下HybridDB中,对Count Distinct的几种处理方式。

我们以一个客户的案例来做说明。客户的典型的业务场景是,在用户行为日志中统计对应类别的行为数,类别有几千个,独立的行为的总量很多,有几千万;为分析行为,要查询一段时间内的基于类别的独立行为数,查询如下(test的建表语句见附录):

  1. select category, count(distinct actionId) as ct from test_user_log
  2. where receivetime between '2017-03-07 11:00:00' and '2017-03-07 12:00:00' group by category
  3. order by ct desc limit 10;

下面我们针对这个查询,来看一下Count Distinct是怎么处理的。

Count Distinct的基本处理方式

利用explain analyze命令,看一下这个查询执行过程的信息:

  1. test=# explain analyze select category, count(distinct actionId) as ct from test_user_log
  2. where receivetime between '2017-03-07 11:00:00' and '2017-03-07 12:00:00' group by category
  3. order by ct desc limit 10;
  4. QUERY PLAN
  5. --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  6. -------------------
  7. Limit (cost=0.00..431.00 rows=10 width=16)
  8. Gather Motion 16:1 (slice2; segments: 16) (cost=5968.98..5968.99 rows=1 width=40)
  9. Merge Key: ct
  10. Rows out: 745 rows at destination with 2469 ms to end, start offset by 77 ms.
  11. -> Sort (cost=5968.98..5968.99 rows=1 width=40)
  12. Sort Key: ct
  13. Rows out: Avg 46.6 rows x 16 workers. Max 55 rows (seg0) with 2461 ms to end, start offset by 85 ms.
  14. Executor memory: 58K bytes avg, 58K bytes max (seg0).
  15. Work_mem used: 58K bytes avg, 58K bytes max (seg0). Workfile: (0 spilling, 0 reused)
  16. -> GroupAggregate (cost=5968.94..5968.97 rows=1 width=40)
  17. Group By: public.test_user_log.category
  18. Rows out: Avg 46.6 rows x 16 workers. Max 55 rows (seg0) with 2460 ms to end, start offset by 85 ms.
  19. -> Sort (cost=5968.94..5968.94 rows=1 width=40)
  20. Sort Key: public.test_user_log.category
  21. Rows out: Avg 461.6 rows x 16 workers. Max 572 rows (seg4) with 2458 ms to end, start offset by 88 ms.
  22. Executor memory: 85K bytes avg, 145K bytes max (seg4).
  23. Work_mem used: 85K bytes avg, 145K bytes max (seg4). Workfile: (0 spilling, 0 reused)
  24. -> Redistribute Motion 16:16 (slice1; segments: 16) (cost=5960.60..5968.93 rows=1 width=40)
  25. Hash Key: public.test_user_log.category
  26. Rows out: Avg 461.6 rows x 16 workers at destination. Max 572 rows (seg4) with 2316 ms to first row, 2458 ms to end, start offset by 88 ms.
  27. -> GroupAggregate (cost=5960.60..5968.91 rows=1 width=40)
  28. Group By: public.test_user_log.category
  29. Rows out: Avg 461.6 rows x 16 workers. Max 472 rows (seg7) with 536 ms to first row, 2455 ms to end, start offset by 89 ms.
  30. Executor memory: 318587K bytes avg, 330108K bytes max (seg7).
  31. Work_mem used: 8544K bytes avg, 8544K bytes max (seg0).
  32. Work_mem wanted: 8414K bytes avg, 8472K bytes max (seg14) to lessen workfile I/O affecting 16 workers.
  33. -> Sort (cost=5960.60..5963.37 rows=70 width=64)
  34. Sort Key: public.test_user_log.category
  35. Rows out: Avg 367982.3 rows x 16 workers. Max 369230 rows (seg8) with 527 ms to first row, 625 ms to end, start offset by 90 ms.
  36. Executor memory: 61433K bytes avg, 61433K bytes max (seg0).
  37. Work_mem used: 61433K bytes avg, 61433K bytes max (seg0). Workfile: (0 spilling, 0 reused)
  38. -> Append (cost=0.00..5904.72 rows=70 width=64)
  39. Rows out: Avg 367982.3 rows x 16 workers. Max 369230 rows (seg8) with 2.710 ms to first row, 265 ms to end, start offset by 91 ms
  40. .
  41. -> Append-only Columnar Scan on test_user_log_1_prt_usual test_user_log (cost=0.00..0.00 rows=1 width=64)
  42. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::timestamp
  43. without time zone
  44. Rows out: 0 rows (seg0) with 0.542 ms to end, start offset by 93 ms.
  45. -> Append-only Columnar Scan on test_user_log_1_prt_157 test_user_log (cost=0.00..3134.45 rows=37 width=64)
  46. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::timestamp
  47. without time zone
  48. Rows out: Avg 367882.1 rows x 16 workers. Max 369131 rows (seg8) with 2.178 ms to first row, 132 ms to end, start offset by
  49. 91 ms.
  50. -> Append-only Columnar Scan on test_user_log_1_prt_158 test_user_log (cost=0.00..2770.27 rows=33 width=64)
  51. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::timestamp
  52. without time zone
  53. Rows out: Avg 100.2 rows x 16 workers. Max 124 rows (seg11) with 2.135 ms to first row, 73 ms to end, start offset by 394 m
  54. s.
  55. Slice statistics:
  56. Settings: effective_cache_size=8GB; gp_statistics_use_fkeys=on; optimizer=off
  57. Optimizer status: legacy query optimizer
  58. Total runtime: 2546.862 ms
  59. (51 rows)

可以发现,看似很简单的查询,处理流程却有点复杂。整个处理流程大致如下:

  1. Scan Columnar Scan + Append) -> Sort(category) -> Group by(category) -> Redistribute -> Sort(category) -> Group by(category) -> Sort -> Gather

从各个节点的实际执行时间的记录可以看出,主要的时间花在了前三步,因为这三步完成后,中间结果只有几百行了。我们重点关注这三个步骤。其实这几个步骤的逻辑比较好理解,扫描出来的数据,直接做排序,排序后,再把结果扫描一遍,同时进行聚合运算。

这里需要注意的一个细节是,查询的表test_user_log是按actionId做分布键的,相同的actionId都会分布在同一个节点上,所以每个节点本地按category做分组后,会在每个分组记录分组中出现的不同actionId值,最终的聚合的结果是category加上一个对应的actionId的计数。

这里有个疑问,其实category的唯一值很少(只有几百个),很适合利用Hash的方式做聚合呢,那么为什么没有选择Hash的方式而是采用了Sort的方式呢?

观察上述查询计划中test_user_log_1_prt_157这个表分区的中间结果估计((cost=0.00..3134.45 rows=37 width=64)),可以发现预估的结果只有37行,而实际是Rows out: Avg 367882.1 rows,即36万多,这说明表的统计信息不准确。

执行一下Analyze来更新统计信息:

  1. test=# analyze test_user_log_1_prt_157;
  2. analyze test_user_log_1_prt_158;ANALYZE
  3. test=# analyze test_user_log_1_prt_158;
  4. ANALYZE

更新统计信息后,再执行一下查询,查询计划果然发生了变化:排序聚合变成了Hash方式聚合!执行时间也由2546.862ms缩短到2099.144ms。

  1. QUERY PLAN
  2. --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  3. -------------------------
  4. Limit (cost=0.00..431.00 rows=10 width=16)
  5. Gather Motion 16:1 (slice2; segments: 16) (cost=320695.70..320695.92 rows=10 width=40)
  6. Merge Key: ct
  7. -> Limit (cost=320695.70..320695.72 rows=1 width=40)
  8. -> Sort (cost=320695.70..320695.94 rows=7 width=40)
  9. Sort Key (Limit): ct
  10. -> HashAggregate (cost=320691.23..320692.46 rows=7 width=40)
  11. Group By: partial_aggregation.category
  12. -> HashAggregate (cost=303756.92..311454.34 rows=38488 width=64)
  13. Group By: public.test_user_log.category, public.test_user_log.actionId
  14. -> Redistribute Motion 16:16 (slice1; segments: 16) (cost=280664.69..292980.55 rows=38488 width=64)
  15. Hash Key: public.test_user_log.category
  16. -> HashAggregate (cost=280664.69..280664.69 rows=38488 width=64)
  17. Group By: public.test_user_log.category, public.test_user_log.actionId
  18. -> Append (cost=0.00..236527.23 rows=367813 width=43)
  19. -> Append-only Columnar Scan on test_user_log_1_prt_usual test_user_log (cost=0.00..0.00 rows=1 width=64)
  20. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::tim
  21. estamp without time zone
  22. -> Append-only Columnar Scan on test_user_log_1_prt_157 test_user_log (cost=0.00..124267.69 rows=367813 width=42)
  23. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::tim
  24. estamp without time zone
  25. -> Append-only Columnar Scan on test_user_log_1_prt_158 test_user_log (cost=0.00..112259.54 rows=1 width=42)
  26. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00'::tim
  27. estamp without time zone

上面的计划主要流程如下。两次Group by(Hash(category,actionId))都是为了去除重复值,保证(category,actionId)组合字段值唯一。

  1. Scan Columnar Scan + Append) -> Group by(Hash(categoryactionId)) -> Redistribute(category) -> Group by(Hash(category, acitonId)) -> Group by(Hashcategory)) -> Sort -> Gather

那么还有其他可能的处理方式吗?我们知道,HybridDB支持新型的orca优化器,orca考虑更多的查询执行方式。我们下面试试使用orca来生成查询计划。

  1. test=# set optimizer=on;
  2. SET
  3. test=# explain analyze select category, count(distinct actionId) as ct from test_user_log
  4. where receivetime between '2017-03-07 11:00:00' and '2017-03-07 12:00:00' group by category
  5. order by ct desc limit 10;
  6. QUERY PLAN
  7. --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  8. -------------------------------
  9. Limit (cost=0.00..431.00 rows=1 width=16)
  10. Rows out: 10 rows with 2690 ms to end, start offset by 0.500 ms.
  11. -> Gather Motion 16:1 (slice2; segments: 16) (cost=0.00..431.00 rows=1 width=16)
  12. Merge Key: ct
  13. Rows out: 10 rows at destination with 2690 ms to end, start offset by 0.501 ms.
  14. -> Sort (cost=0.00..431.00 rows=1 width=16)
  15. Sort Key: ct
  16. Rows out: Avg 46.6 rows x 16 workers. Max 55 rows (seg0) with 2688 ms to end, start offset by 2.356 ms.
  17. Executor memory: 33K bytes avg, 33K bytes max (seg0).
  18. Work_mem used: 33K bytes avg, 33K bytes max (seg0). Workfile: (0 spilling, 0 reused)
  19. -> GroupAggregate (cost=0.00..431.00 rows=1 width=16)
  20. Group By: category
  21. Rows out: Avg 46.6 rows x 16 workers. Max 55 rows (seg0) with 2687 ms to first row, 2688 ms to end, start offset by 2.372 ms.
  22. -> Sort (cost=0.00..431.00 rows=1 width=16)
  23. Sort Key: category
  24. Rows out: Avg 461.6 rows x 16 workers. Max 572 rows (seg4) with 2687 ms to end, start offset by 3.104 ms.
  25. Executor memory: 85K bytes avg, 145K bytes max (seg4).
  26. Work_mem used: 85K bytes avg, 145K bytes max (seg4). Workfile: (0 spilling, 0 reused)
  27. -> Redistribute Motion 16:16 (slice1; segments: 16) (cost=0.00..431.00 rows=1 width=16)
  28. Hash Key: category
  29. Rows out: Avg 461.6 rows x 16 workers at destination. Max 572 rows (seg4) with 2442 ms to first row, 2687 ms to end, start offset by 3.113 ms
  30. .
  31. -> Result (cost=0.00..431.00 rows=1 width=16)
  32. Rows out: Avg 461.6 rows x 16 workers. Max 472 rows (seg7) with 1070 ms to first row, 2583 ms to end, start offset by 3.898 ms.
  33. -> GroupAggregate (cost=0.00..431.00 rows=1 width=16)
  34. Group By: category
  35. Rows out: Avg 461.6 rows x 16 workers. Max 472 rows (seg7) with 1070 ms to first row, 2583 ms to end, start offset by 3.898 ms.
  36. Executor memory: 316808K bytes avg, 328245K bytes max (seg7).
  37. Work_mem used: 8184K bytes avg, 8184K bytes max (seg0).
  38. Work_mem wanted: 8414K bytes avg, 8472K bytes max (seg14) to lessen workfile I/O affecting 16 workers.
  39. -> Sort (cost=0.00..431.00 rows=1 width=16)
  40. Sort Key: category, actionId
  41. Rows out: Avg 367982.3 rows x 16 workers. Max 369230 rows (seg8) with 1064 ms to first row, 1143 ms to end, start offset by
  42. 3.812 ms.
  43. Executor memory: 143353K bytes avg, 143353K bytes max (seg0).
  44. Work_mem used: 143353K bytes avg, 143353K bytes max (seg0). Workfile: (0 spilling, 0 reused)
  45. -> Sequence (cost=0.00..431.00 rows=1 width=24)
  46. Rows out: Avg 367982.3 rows x 16 workers. Max 369230 rows (seg8) with 1.905 ms to first row, 241 ms to end, start off
  47. set by 4.032 ms.
  48. -> Partition Selector for test_user_log (dynamic scan id: 1) (cost=10.00..100.00 rows=7 width=4)
  49. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00
  50. '::timestamp without time zone
  51. Partitions selected: 3 (out of 745)
  52. Rows out: 0 rows (seg0) with 0.013 ms to end, start offset by 4.033 ms.
  53. -> Dynamic Table Scan on test_user_log (dynamic scan id: 1) (cost=0.00..431.00 rows=1 width=24)
  54. Filter: receivetime >= '2017-03-07 11:00:00'::timestamp without time zone AND receivetime <= '2017-03-07 12:00:00
  55. '::timestamp without time zone
  56. Rows out: Avg 367982.3 rows x 16 workers. Max 369230 rows (seg8) with 1.891 ms to first row, 202 ms to end, sta
  57. rt offset by 4.046 ms.
  58. Partitions scanned: Avg 3.0 (out of 745) x 16 workers. Max 3 parts (seg0).
  59. Slice statistics:
  60. (slice0) Executor memory: 351K bytes.
  61. (slice1) * Executor memory: 152057K bytes avg x 16 workers, 152057K bytes max (seg0). Work_mem: 143353K bytes max, 8472K bytes wanted.
  62. (slice2) Executor memory: 363K bytes avg x 16 workers, 423K bytes max (seg4). Work_mem: 145K bytes max.
  63. Statement statistics:
  64. Memory used: 2047000K bytes
  65. Memory wanted: 34684K bytes
  66. Settings: effective_cache_size=8GB; gp_statistics_use_fkeys=on; optimizer=on
  67. Optimizer status: PQO version 1.609
  68. Total runtime: 2690.675 ms
  69. (54 rows)

使用orca生成的查询计划,又回到了使用Sort+Groupby的方式来做聚合(这是因为,我们使用Analyze只更新了子分区表的统计信息,而orca只会考虑主表上的统计信息,要想是orca的计划转为使用Hash方式,需要在主表上使用Analyze,这里我们不继续讨论)。而上述使用orca生成的计划,与使用缺省优化器有很大不同。orca的查询计划采用了下面的流程:

  1. Scan (Dynamic Scan) -> Sort (category, actionId) -> Group by (category) -> Redistribute -> Sort (category) -> Group by(category) -> Sort -> Gather

注意,第一次Sort用了(category, actionId)两个字段的组合,但后面的Group by时只适应了category一个字段!这是一种特殊的聚合方式。在做这种聚合时,对应一个不同的category,只需保留一个actionId的计数即可,而不是像在缺省优化器计划中那样,对每个不同的category,需要保留所有不同的actionId值,这样省去了建立类似Hash表的数据结构的时间。但由于Sort的时候用了两个字段,时间消耗比使用一个字段高,导致整个查询计划的性能不如缺省优化器产生的计划。

延伸

上面的讨论所举的例子中的表,正好是以Count Distinct的字段(即actionId)作为分布键的。如果以其他字段作为分布键,会产生不一样的查询计划,但基本原理都是类似的。

另外,我们没有涉及一个查询中涉及多个字段上有Count Distinct的情况,读者可以自行尝试。

附录

  • 建表语句
  1. create table test_user_log
  2. (
  3. actionId text,
  4. code text,
  5. receiveTime timestamp,
  6. gmtCreate timestamp,
  7. category text,
  8. version text,
  9. tag text,
  10. siteId int4
  11. )
  12. with (APPENDONLY=true, ORIENTATION=column, BLOCKSIZE=524288)
  13. distributed by (actionId)
  14. PARTITION BY RANGE (receivetime)
  15. (START ('2017-03-07') INCLUSIVE END ('2017-03-07') EXCLUSIVE EVERY (INTERVAL '1 hour' ), DEFAULT PARTITION usual);