时态表(Temporal Tables)

时态表(Temporal Table)代表基于表的(参数化)视图概念,该表记录变更历史,该视图返回表在某个特定时间点的内容。

变更表可以是跟踪变化的历史记录表(例如数据库变更日志),也可以是有具体更改的维表(例如数据库表)。

对于记录变更历史的表,Flink 可以追踪这些变化,并且允许查询这张表在某个特定时间点的内容。在 Flink 中,这类表由时态表函数(Temporal Table Function)表示。

对于变化的维表,Flink 允许查询这张表在处理时的内容,在 Flink 中,此类表由时态表(Temporal Table)表示。

设计初衷

与记录变更历史的表相关

假设我们有表 RatesHistory 如下所示。

  1. SELECT * FROM RatesHistory;
  2. rowtime currency rate
  3. ======= ======== ======
  4. 09:00 US Dollar 102
  5. 09:00 Euro 114
  6. 09:00 Yen 1
  7. 10:45 Euro 116
  8. 11:15 Euro 119
  9. 11:49 Pounds 108

RatesHistory 代表一个兑换日元货币汇率表(日元汇率为1),该表是不断增长的 append-only 表。例如,欧元日元09:0010:45 的汇率为 114。从 10:4511:15,汇率为 116

假设我们要输出 10:58 的所有当前汇率,则需要以下 SQL 查询来计算结果表:

  1. SELECT *
  2. FROM RatesHistory AS r
  3. WHERE r.rowtime = (
  4. SELECT MAX(rowtime)
  5. FROM RatesHistory AS r2
  6. WHERE r2.currency = r.currency
  7. AND r2.rowtime <= TIME '10:58');

子查询确定对应货币的最大时间小于或等于所需时间。外部查询列出具有最大时间戳的汇率。

下表显示了这种计算的结果。我们的示例中,在 10:58 时表的内容,考虑了 10:45欧元的更新,但未考虑 11:15 时的欧元更新和英镑的新值。

  1. rowtime currency rate
  2. ======= ======== ======
  3. 09:00 US Dollar 102
  4. 09:00 Yen 1
  5. 10:45 Euro 116

时态表的概念旨在简化此类查询,加快其执行速度,并减少 Flink 的状态使用。时态表是 append-only 表上的参数化视图,该视图将 append-only 表的行解释为表的变更日志,并在特定时间点提供该表的版本。将 append-only 表解释为变更日志需要指定主键属性和时间戳属性。主键确定哪些行将被覆盖,时间戳确定行有效的时间。

在上面的示例中,currencyRatesHistory 表的主键,而 rowtime 是时间戳属性。

在 Flink 中,这由时态表函数表示。

与维表变化相关

另一方面,某些用例需要连接变化的维表,该表是外部数据库表。

假设 LatestRates 是一个被物化的最新汇率表。LatestRates 是物化的 RatesHistory 历史。那么 LatestRates 表在 10:58 的内容将是:

  1. 10:58> SELECT * FROM LatestRates;
  2. currency rate
  3. ======== ======
  4. US Dollar 102
  5. Yen 1
  6. Euro 116

12:00LatestRates 表的内容将是:

  1. 12:00> SELECT * FROM LatestRates;
  2. currency rate
  3. ======== ======
  4. US Dollar 102
  5. Yen 1
  6. Euro 119
  7. Pounds 108

在 Flink 中,这由时态表表示。

时态表函数

为了访问时态表中的数据,必须传递一个时间属性,该属性确定将要返回的表的版本。 Flink 使用表函数的 SQL 语法提供一种表达它的方法。

定义后,时态表函数将使用单个时间参数 timeAttribute 并返回一个行集合。 该集合包含相对于给定时间属性的所有现有主键的行的最新版本。

假设我们基于 RatesHistory 表定义了一个时态表函数,我们可以通过以下方式查询该函数 Rates(timeAttribute)

  1. SELECT * FROM Rates('10:15');
  2. rowtime currency rate
  3. ======= ======== ======
  4. 09:00 US Dollar 102
  5. 09:00 Euro 114
  6. 09:00 Yen 1
  7. SELECT * FROM Rates('11:00');
  8. rowtime currency rate
  9. ======= ======== ======
  10. 09:00 US Dollar 102
  11. 10:45 Euro 116
  12. 09:00 Yen 1

Rates(timeAttribute) 的每个查询都将返回给定 timeAttributeRates 状态。

注意:当前 Flink 不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能在 join 中使用。上面的示例用于为函数 Rates(timeAttribute) 返回内容提供直观信息。

另请参阅有关用于持续查询的 join 页面,以获取有关如何与时态表 join 的更多信息。

定义时态表函数

以下代码段说明了如何从 append-only 表中创建时态表函数。

  1. import org.apache.flink.table.functions.TemporalTableFunction;
  2. (...)
  3. // 获取 stream 和 table 环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  6. // 提供一个汇率历史记录表静态数据集
  7. List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
  8. ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
  9. ratesHistoryData.add(Tuple2.of("Euro", 114L));
  10. ratesHistoryData.add(Tuple2.of("Yen", 1L));
  11. ratesHistoryData.add(Tuple2.of("Euro", 116L));
  12. ratesHistoryData.add(Tuple2.of("Euro", 119L));
  13. // 用上面的数据集创建并注册一个示例表
  14. // 在实际设置中,应使用自己的表替换它
  15. DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
  16. Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());
  17. tEnv.createTemporaryView("RatesHistory", ratesHistory);
  18. // 创建和注册时态表函数
  19. // 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
  20. TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
  21. tEnv.registerFunction("Rates", rates); // <==== (2)
  1. // 获取 stream 和 table 环境
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val tEnv = StreamTableEnvironment.create(env)
  4. // 提供一个汇率历史记录表静态数据集
  5. val ratesHistoryData = new mutable.MutableList[(String, Long)]
  6. ratesHistoryData.+=(("US Dollar", 102L))
  7. ratesHistoryData.+=(("Euro", 114L))
  8. ratesHistoryData.+=(("Yen", 1L))
  9. ratesHistoryData.+=(("Euro", 116L))
  10. ratesHistoryData.+=(("Euro", 119L))
  11. // 用上面的数据集创建并注册一个示例表
  12. // 在实际设置中,应使用自己的表替换它
  13. val ratesHistory = env
  14. .fromCollection(ratesHistoryData)
  15. .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
  16. tEnv.createTemporaryView("RatesHistory", ratesHistory)
  17. // 创建和注册时态表函数
  18. // 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
  19. val rates = ratesHistory.createTemporalTableFunction($"r_proctime", $"r_currency") // <==== (1)
  20. tEnv.registerFunction("Rates", rates) // <==== (2)

(1)创建了一个 rates 时态表函数, 这使我们可以在 Table API 中使用 rates 函数。

(2)在表环境中注册名称为 Rates 的函数,这使我们可以在 SQL 中使用 Rates 函数。

时态表

注意 仅 Blink planner 支持此功能。

为了访问时态表中的数据,当前必须使用 LookupableTableSource 定义一个 TableSource。Flink 使用 SQL:2011 中提出的 FOR SYSTEM_TIME AS OF 的 SQL 语法查询时态表。

假设我们定义了一个时态表 LatestRates,我们可以通过以下方式查询此表:

  1. SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
  2. currency rate
  3. ======== ======
  4. US Dollar 102
  5. Euro 114
  6. Yen 1
  7. SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
  8. currency rate
  9. ======== ======
  10. US Dollar 102
  11. Euro 116
  12. Yen 1

注意:当前,Flink 不支持以固定时间直接查询时态表。目前,时态表只能在 join 中使用。上面的示例用于为时态表 LatestRates 返回内容提供直观信息。

另请参阅有关用于持续查询的 join 页面,以获取有关如何与时态表 join 的更多信息。

定义时态表

  1. // 获取 stream 和 table 环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
  4. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  5. // or TableEnvironment tEnv = TableEnvironment.create(settings);
  6. // 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
  7. // 'currency' 列是 HBase 表中的 rowKey
  8. tEnv.executeSql(
  9. "CREATE TABLE LatestRates (" +
  10. " currency STRING," +
  11. " fam1 ROW<rate DOUBLE>" +
  12. ") WITH (" +
  13. " 'connector' = 'hbase-1.4'," +
  14. " 'table-name' = 'Rates'," +
  15. " 'zookeeper.quorum' = 'localhost:2181'" +
  16. ")");
  1. // 获取 stream 和 table 环境
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val settings = EnvironmentSettings.newInstance().build()
  4. val tEnv = StreamTableEnvironment.create(env, settings)
  5. // or val tEnv = TableEnvironment.create(settings)
  6. // 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
  7. // 'currency' 列是 HBase 表中的 rowKey
  8. tEnv.executeSql(
  9. s"""
  10. |CREATE TABLE LatestRates (
  11. | currency STRING,
  12. | fam1 ROW<rate DOUBLE>
  13. |) WITH (
  14. | 'connector' = 'hbase-1.4',
  15. | 'table-name' = 'Rates',
  16. | 'zookeeper.quorum' = 'localhost:2181'
  17. |)
  18. |""".stripMargin)

另请参阅有关如何定义 LookupableTableSource 的页面。