分组聚合

Batch Streaming

像大多数数据系统一样,Apache Flink支持聚合函数;包括内置的和用户定义的。用户自定义函数在使用前必须在目录中注册。

聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”(平均)、“MAX”(最大)和 “MIN”(最小)。

  1. SELECT COUNT(*) FROM Orders

对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止。而且它们会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。

Apache Flink 支持标准的 GROUP BY 子句来聚合数据。

  1. SELECT COUNT(*)
  2. FROM Orders
  3. GROUP BY order_id

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。详情参见:查询配置

Flink 对于分组聚合提供了一系列性能优化的方法。更多参见:性能优化

DISTINCT 聚合

DISTINCT 聚合在聚合函数前去掉重复的数据。下面的示例计算 Orders 表中不同 order_ids 的数量,而不是总行数。

  1. SELECT COUNT(DISTINCT order_id) FROM Orders

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小大多数情况下取决于去重行的数量和分组持续的时间,持续时间较短的 group 窗口不会产生状态过大的问题。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。详情参见:查询配置

GROUPING SETS

Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。

  1. SELECT supplier_id, rating, COUNT(*) AS total
  2. FROM (VALUES
  3. ('supplier1', 'product1', 4),
  4. ('supplier1', 'product2', 3),
  5. ('supplier2', 'product3', 3),
  6. ('supplier2', 'product4', 4))
  7. AS Products(supplier_id, product_id, rating)
  8. GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())

结果:

  1. +-------------+--------+-------+
  2. | supplier_id | rating | total |
  3. +-------------+--------+-------+
  4. | supplier1 | 4 | 1 |
  5. | supplier1 | (NULL) | 2 |
  6. | (NULL) | (NULL) | 4 |
  7. | supplier1 | 3 | 1 |
  8. | supplier2 | 3 | 1 |
  9. | supplier2 | (NULL) | 2 |
  10. | supplier2 | 4 | 1 |
  11. +-------------+--------+-------+

GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。

对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于 Grouping Sets 的数量以及聚合函数的类型。可以提供一个合适的状态 time-to-live (TTL)配置来防止状态过大.注意:这可能会影响查询结果的正确性.详情参见:查询配置

ROLLUP

ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

例如:下面这个查询和上个例子是等效的。

  1. SELECT supplier_id, rating, COUNT(*)
  2. FROM (VALUES
  3. ('supplier1', 'product1', 4),
  4. ('supplier1', 'product2', 3),
  5. ('supplier2', 'product3', 3),
  6. ('supplier2', 'product4', 4))
  7. AS Products(supplier_id, product_id, rating)
  8. GROUP BY ROLLUP (supplier_id, rating)

CUBE

CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。

例如:下面两个查询是等效的。

  1. SELECT supplier_id, rating, product_id, COUNT(*)
  2. FROM (VALUES
  3. ('supplier1', 'product1', 4),
  4. ('supplier1', 'product2', 3),
  5. ('supplier2', 'product3', 3),
  6. ('supplier2', 'product4', 4))
  7. AS Products(supplier_id, product_id, rating)
  8. GROUP BY CUBE (supplier_id, rating, product_id)
  9. SELECT supplier_id, rating, product_id, COUNT(*)
  10. FROM (VALUES
  11. ('supplier1', 'product1', 4),
  12. ('supplier1', 'product2', 3),
  13. ('supplier2', 'product3', 3),
  14. ('supplier2', 'product4', 4))
  15. AS Products(supplier_id, product_id, rating)
  16. GROUP BY GROUPING SET (
  17. ( supplier_id, product_id, rating ),
  18. ( supplier_id, product_id ),
  19. ( supplier_id, rating ),
  20. ( supplier_id ),
  21. ( product_id, rating ),
  22. ( product_id ),
  23. ( rating ),
  24. ( )
  25. )

HAVING

HAVING 会删除 group 后不符合条件的行。 HAVINGWHERE 的不同点:WHEREGROUP BY 之前过滤单独的数据行。HAVING 过滤 GROUP BY 生成的数据行。 HAVING 条件中的每一列引用必须是明确的 grouping 列,除非它出现在聚合函数中。

  1. SELECT SUM(amount)
  2. FROM Orders
  3. GROUP BY users
  4. HAVING SUM(amount) > 50

即使没有 GROUP BY 子句,HAVING 的存在也会使查询变成一个分组查询。这与查询包含聚合函数但没有 GROUP BY 子句时的情况相同。查询认为所有被选中的行形成一个单一的组,并且 SELECT 列表和 HAVING 子句只能从聚合函数中引用列。如果 HAVING 条件为真,这样的查询将发出一条记录,如果不为真,则发出零条记录。