窗口去重

Streaming

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。

对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好。通常,窗口去重直接用于 窗口表值函数 上。另外,它可以用于基于 窗口表值函数 的操作。比如 窗口聚合窗口TopN窗口关联

窗口Top-N的语法和普通的Top-N相同,更多信息参见:去重文档。 除此之外,窗口去重需要 PARTITION BY 子句包含表的 window_startwindow_end 列。 否则优化器无法翻译。

Flink 使用 ROW_NUMBER() 移除重复数据,就像 窗口 Top-N 一样。理论上,窗口是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的。

下面展示了窗口去重的语法:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
  5. ORDER BY time_attr [asc|desc]) AS rownum
  6. FROM table_name) -- relation applied windowing TVF
  7. WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

参数说明:

  • ROW_NUMBER():为每一行分配一个唯一且连续的序号,从1开始。
  • PARTITION BY window_start, window_end [, col_key1...]: 指定分区字段,需要包含window_startwindow_end以及其他分区键。
  • ORDER BY time_attr [asc|desc]: 指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性事件时间属性。 Order by ASC 表示保留第一行,Order by DESC 表示保留最后一行。
  • WHERE (rownum = 1 | rownum <=1 | rownum < 2): 优化器通过 rownum = 1 | rownum <=1 | rownum < 2 来识别查询能否被翻译成窗口去重。

注意:必须严格遵循上述模式,否则优化器无法翻译查询。

示例

下面的示例展示了在10分钟的滚动窗口上保持最后一条记录。

  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> DESC Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. | name | type | null | key | extras | watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
  7. | price | DECIMAL(10, 2) | true | | | |
  8. | item | STRING | true | | | |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. | bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:05 | 4.00 | C |
  15. | 2020-04-15 08:07 | 2.00 | A |
  16. | 2020-04-15 08:09 | 5.00 | D |
  17. | 2020-04-15 08:11 | 3.00 | B |
  18. | 2020-04-15 08:13 | 1.00 | E |
  19. | 2020-04-15 08:17 | 6.00 | F |
  20. +------------------+-------+------+
  21. Flink SQL> SELECT *
  22. FROM (
  23. SELECT bidtime, price, item, supplier_id, window_start, window_end,
  24. ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownum
  25. FROM TABLE(
  26. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  27. ) WHERE rownum <= 1;
  28. +------------------+-------+------+-------------+------------------+------------------+--------+
  29. | bidtime | price | item | supplier_id | window_start | window_end | rownum |
  30. +------------------+-------+------+-------------+------------------+------------------+--------+
  31. | 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:00 | 2020-04-15 08:10 | 1 |
  32. | 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:10 | 2020-04-15 08:20 | 1 |
  33. +------------------+-------+------+-------------+------------------+------------------+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3)2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000

限制

在窗口表值函数后直接进行窗口去重的限制

目前,Flink 只支持在滚动窗口、滑动窗口和累积窗口的窗口表值函数后进行窗口去重。会话窗口的去重将在未来版本中支持。

根据时间属性排序的限制

目前,窗口去重只支持根据事件时间属性进行排序。根据处理时间排序将在未来版本中支持。