时间属性

Flink 可以基于几种不同的 时间 概念来处理数据。

  • 处理时间 指的是执行具体操作时的机器时间(也称作”挂钟时间”)
  • 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
  • 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

对于时间相关的更多信息,可以参考 事件时间和Watermark

本页面说明了如何在 Flink Table API & SQL 里面定义时间以及相关的操作。

时间属性介绍

像窗口(在 Table APISQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。

Table API 程序需要在 streaming environment 中指定时间属性:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
  3. // 或者:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
  3. // 或者:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) # default
  3. # 或者:
  4. # env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
  5. # env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

处理时间

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。

共有三种方法可以定义处理时间。

在创建表的 DDL 中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间。关于计算列,更多信息可以参考:CREATE TABLE DDL

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
  5. ) WITH (
  6. ...
  7. );
  8. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  9. FROM user_actions
  10. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在 DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。

  1. DataStream<Tuple2<String, String>> stream = ...;
  2. // 声明一个额外的字段作为时间属性字段
  3. Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
  4. WindowedTable windowedTable = table.window(
  5. Tumble.over(lit(10).minutes())
  6. .on($("user_action_time"))
  7. .as("userActionWindow"));
  1. val stream: DataStream[(String, String)] = ...
  2. // 声明一个额外的字段作为时间属性字段
  3. val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
  4. val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

使用 TableSource 定义

处理时间属性可以在实现了 DefinedProctimeAttributeTableSource 中定义。逻辑的时间属性会放在 TableSource 已有物理字段的最后

  1. // 定义一个由处理时间属性的 table source
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {"user_name" , "data"};
  6. TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
  7. return Types.ROW(names, types);
  8. }
  9. @Override
  10. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  11. // create stream
  12. DataStream<Row> stream = ...;
  13. return stream;
  14. }
  15. @Override
  16. public String getProctimeAttribute() {
  17. // 这个名字的列会被追加到最后,作为第三列
  18. return "user_action_time";
  19. }
  20. }
  21. // register table source
  22. tEnv.registerTableSource("user_actions", new UserActionSource());
  23. WindowedTable windowedTable = tEnv
  24. .from("user_actions")
  25. .window(Tumble
  26. .over(lit(10).minutes())
  27. .on($("user_action_time"))
  28. .as("userActionWindow"));
  1. // 定义一个由处理时间属性的 table source
  2. class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
  3. override def getReturnType = {
  4. val names = Array[String]("user_name" , "data")
  5. val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
  6. Types.ROW(names, types)
  7. }
  8. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  9. // create stream
  10. val stream = ...
  11. stream
  12. }
  13. override def getProctimeAttribute = {
  14. // 这个名字的列会被追加到最后,作为第三列
  15. "user_action_time"
  16. }
  17. }
  18. // register table source
  19. tEnv.registerTableSource("user_actions", new UserActionSource)
  20. val windowedTable = tEnv
  21. .from("user_actions")
  22. .window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

事件时间属性也有类似于处理时间的三种定义方式:在DDL中定义、在 DataStream 到 Table 转换时定义、用 TableSource 定义。

在 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。更多信息可以参考:CREATE TABLE DDL

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. user_action_time TIMESTAMP(3),
  5. -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
  6. WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  7. ) WITH (
  8. ...
  9. );
  10. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  11. FROM user_actions
  12. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在 DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。

在从 DataStreamTable 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段
  • 替换一个已经存在的字段。

不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。

  1. // Option 1:
  2. // 基于 stream 中的事件产生时间戳和 watermark
  3. DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  4. // 声明一个额外的逻辑字段作为事件时间属性
  5. Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
  6. // Option 2:
  7. // 从第一个字段获取事件时间,并且产生 watermark
  8. DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  9. // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
  10. Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
  11. // Usage:
  12. WindowedTable windowedTable = table.window(Tumble
  13. .over(lit(10).minutes())
  14. .on($("user_action_time"))
  15. .as("userActionWindow"));
  1. // Option 1:
  2. // 基于 stream 中的事件产生时间戳和 watermark
  3. val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  4. // 声明一个额外的逻辑字段作为事件时间属性
  5. val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
  6. // Option 2:
  7. // 从第一个字段获取事件时间,并且产生 watermark
  8. val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  9. // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
  10. val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
  11. // Usage:
  12. val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

使用 TableSource 定义

事件时间属性可以在实现了 DefinedRowTimeAttributesTableSource 中定义。getRowtimeAttributeDescriptors() 方法返回 RowtimeAttributeDescriptor 的列表,包含了描述事件时间属性的字段名字、如何计算事件时间、以及 watermark 生成策略等信息。

同时需要确保 getDataStream 返回的 DataStream 已经定义好了时间属性。 只有在定义了 StreamRecordTimestamp 时间戳分配器的时候,才认为 DataStream 是有时间戳信息的。 只有定义了 PreserveWatermarks watermark 生成策略的 DataStream 的 watermark 才会被保留。反之,则只有时间字段的值是生效的。

  1. // 定义一个有事件时间属性的 table source
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {"user_name", "data", "user_action_time"};
  6. TypeInformation[] types =
  7. new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
  8. return Types.ROW(names, types);
  9. }
  10. @Override
  11. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  12. // 构造 DataStream
  13. // ...
  14. // 基于 "user_action_time" 定义 watermark
  15. DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
  16. return stream;
  17. }
  18. @Override
  19. public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
  20. // 标记 "user_action_time" 字段是事件时间字段
  21. // 给 "user_action_time" 构造一个时间属性描述符
  22. RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
  23. "user_action_time",
  24. new ExistingField("user_action_time"),
  25. new AscendingTimestamps());
  26. List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
  27. return listRowtimeAttrDescr;
  28. }
  29. }
  30. // register the table source
  31. tEnv.registerTableSource("user_actions", new UserActionSource());
  32. WindowedTable windowedTable = tEnv
  33. .from("user_actions")
  34. .window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));
  1. // 定义一个有事件时间属性的 table source
  2. class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  3. override def getReturnType = {
  4. val names = Array[String]("user_name" , "data", "user_action_time")
  5. val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
  6. Types.ROW(names, types)
  7. }
  8. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  9. // 构造 DataStream
  10. // ...
  11. // 基于 "user_action_time" 定义 watermark
  12. val stream = inputStream.assignTimestampsAndWatermarks(...)
  13. stream
  14. }
  15. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
  16. // 标记 "user_action_time" 字段是事件时间字段
  17. // 给 "user_action_time" 构造一个时间属性描述符
  18. val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
  19. "user_action_time",
  20. new ExistingField("user_action_time"),
  21. new AscendingTimestamps)
  22. val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
  23. listRowtimeAttrDescr
  24. }
  25. }
  26. // register the table source
  27. tEnv.registerTableSource("user_actions", new UserActionSource)
  28. val windowedTable = tEnv
  29. .from("user_actions")
  30. .window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")