升级应用程序和 Flink 版本

Flink DataStream 程序通常设计为长时间运行,例如数周、数月甚至数年。与所有长时间运行的服务一样,Flink 流式应用程序需要维护,包括修复错误、实施改进或将应用程序迁移到更高版本的 Flink 集群。

本文档介绍了如何更新 Flink 流式应用程序以及如何将正在运行的流式应用程序迁移到不同的 Flink 集群。

API compatibility guarantees

The classes & methods of the Java/Scala APIs that are intended for users are annotated with the following stability annotations:

  • Public
  • PublicEvolving
  • Experimental

Annotations on a class also apply to all members of that class, unless otherwise annotated.

Any API without such an annotation is considered internal to Flink, with no guarantees being provided.

An API that is source compatible means that code written against the API will continue to compile against a later version.
An API that is binary compatible means that code compiled against the API will continue to run against a later version.

This table lists the source / binary compatibility guarantees for each annotation when upgrading to a particular release:

AnnotationMajor release
(Source / Binary)
Minor release
(Source / Binary)
Patch release
(Source / Binary)
Public///
PublicEvolving///
Experimental///

Example
Code written against a PublicEvolving API in 1.15.2 will continue to run in 1.15.3, without having to recompile the code.
That same code would have to be recompiled when upgrading to 1.16.0 though.

重启流式应用程序

升级流式应用程序或将应用程序迁移到不同集群的操作线基于 Flink 的 Savepoint 功能。Savepoint 是应用程序在特定时间点的状态的一致快照。 有两种方法可以从正在运行的流应用程序中获取 savepoint。

  • 获取 Savepoint 并继续处理。
  1. > ./bin/flink savepoint <jobID> [ Savepoint 的路径]

建议定期获取 Savepoint ,以便能够从之前的时间点重新启动应用程序。

  • 作获取 Savepoint 并停止应用程序。
  1. > ./bin/flink cancel -s [ Savepoint 的路径] <jobID>

这意味着应用程序在 Savepoint 完成后立即取消,即在 Savepoint 之后不进行其他 checkpoint。

给定从应用程序获取的 Savepoint ,可以从该 Savepoint 启动相同或兼容的应用程序(请参阅下面的 应用程序状态兼容性 部分)。从 Savepoint 启动应用程序意味着其算子的状态被初始化为 Savepoint 中保存的算子状态。这是通过使用 Savepoint 启动应用程序来完成的。

  1. > ./bin/flink run -d -s [ Savepoint 的路径] ~/application.jar

已启动应用程序的算子在获取 Savepoint 时使用原始应用程序(即获取 Savepoint 的应用程序)的算子状态进行初始化。启动的应用程序从此时开始继续处理。

Note: 即使 Flink 始终如一地恢复应用程序的状态,它也无法恢复对外部系统的写入。如果您从未停止应用程序的 Savepoint 恢复,这可能是一个问题。在这种情况下,应用程序可能在获取 Savepoint 后发出了数据。重新启动的应用程序可能(取决于您是否更改了应用程序逻辑)再次发出相同的数据。根据 SinkFunction 和存储系统,此行为的确切效果可能会有很大不同。在向 Cassandra 等键值存储进行幂等写入的情况下,发出两次的数据可能没问题,但在追加到 Kafka 等持久日志的情况下会出现问题。在任何情况下,您都应该仔细检查和测试重新启动的应用程序的行为。

应用程序状态兼容性

当升级应用程序以修复错误或改进应用程序时,通常目标是在保留其状态的同时替换正在运行的应用程序的应用程序逻辑。我们通过从原始应用程序获取的 Savepoint 启动升级的应用程序来做到这一点。但是,这只有在两个应用程序状态兼容的情况下才有效,这意味着升级应用程序的算子能够使用原始应用程序的算子的状态初始化他们的状态。

在本节中,我们将讨论如何修改应用程序以保持状态兼容。

DataStream API

匹配算子状态

当应用程序从 Savepoint 重新启动时,Flink 会将 Savepoint 中存储的算子状态与已启动应用程序的有状态算子进行匹配。匹配是基于算子 ID 完成的,算子 ID 也存储在 Savepoint 中。每个算子都有一个默认 ID,该 ID 源自算子在应用程序算子拓扑中的位置。因此,未修改的应用程序始终可以从其自己的 Savepoint 之一重新启动。但是,如果应用程序被修改,运营商的默认 ID 可能会发生变化。因此,只有明确指定了算子 ID,才能从 Savepoint 启动修改后的应用程序。为算子分配 ID 非常简单,使用 uid(String) 方法完成,如下所示:

  1. val mappedEvents: DataStream[(Int, Long)] = events
  2. .map(new MyStatefulMapFunc()).uid("mapper-1")

Note: 由于 Savepoint 中存储的算子 ID 和要启动的应用程序中的算子 ID 必须相等,因此强烈建议为将来可能升级的应用程序的所有算子分配唯一 ID。此建议适用于所有算子,即有和没有明确声明算子状态的算子,因为某些算子具有用户不可见的内部状态。在没有分配算子 ID 的情况下升级应用程序要困难得多,并且只能通过使用 setUidHash() 方法的低级解决方法来实现。

Important: 从 1.3.x 开始,链中的算子也是一样。

默认情况下, Savepoint 中存储的所有状态都必须与启动应用程序的算子匹配。但是,当从 Savepoint 启动应用程序时,用户可以明确同意跳过(从而丢弃)无法与算子匹配的状态。在 Savepoint 中未找到状态的有状态算子将使用其默认状态进行初始化。用户可以通过调用 ExecutionConfig#disableAutoGeneratedUIDs 来强制执行最佳实践,如果任何算子不包含自定义唯一 ID,则作业提交将失败。

有状态的算子和用户函数

升级应用程序时,用户功能和算子可以自由修改,有一个限制。无法更改算子状态的数据类型。这很重要,因为 Savepoint 的状态(当前)在加载到算子之前不能转换为不同的数据类型。因此,在升级应用程序时更改算子状态的数据类型会破坏应用程序状态的一致性,并防止升级后的应用程序从 Savepoint 重新启动。

算子状态可以是用户定义的或内部的。

  • 用户自定义算子状态: 在具有用户定义算子状态的函数中,状态的类型由用户明确定义。尽管无法更改算子状态的数据类型,但克服此限制的解决方法可以是定义具有不同数据类型的第二个状态,并实现将状态从原始状态迁移到新状态的逻辑。这种方法需要良好的迁移策略和对 key-partitioned state 行为的深刻理解。

  • 内部算子状态: 诸如窗口或连接算子之类的算子持有不向用户公开的内部算子状态。对于这些算子,内部状态的数据类型取决于算子的输入或输出类型。因此,更改相应的输入或输出类型会破坏应用程序状态的一致性并阻止升级。下表列出了具有内部状态的算子,并显示了状态数据类型与其输入和输出类型的关系。对于应用于 Keyed Stream 的算子,键类型 (KEY) 也始终是状态数据类型的一部分。

算子内部算子状态的数据类型
ReduceFunction[IOT]IOT (输入输出类型) [, KEY]
WindowFunction[IT, OT, KEY, WINDOW]IT (输入类型), KEY
AllWindowFunction[IT, OT, WINDOW]IT (输入类型)
JoinFunction[IT1, IT2, OT]IT1, IT2 (1. 和 2. 输入的类型), KEY
CoGroupFunction[IT1, IT2, OT]IT1, IT2 (1. 和 2. 输入的类型), KEY
内置聚合 (sum, min, max, minBy, maxBy)输入类型 [, KEY]

应用拓扑

除了改变一个或多个现有算子的逻辑外,还可以通过改变应用程序的拓扑结构来升级应用程序,即添加或删除算子、改变算子的并行度或修改算子链接行为。

通过更改拓扑来升级应用程序时,需要考虑一些事项以保持应用程序状态的一致性。

  • 添加或删除无状态算子: 除非是以下情况之一,否则这没有问题。
  • 添加有状态算子: 除非它接管另一个算子的状态,否则算子的状态将使用默认状态进行初始化。
  • 移除一个有状态的算子: 除非另一个算子接手,否则已移除算子的状态将丢失。启动升级后的应用程序时,您必须明确同意丢弃该状态。
  • 改变算子的输入输出类型: 在具有内部状态的算子之前或之后添加新算子时,您必须确保不修改有状态算子的输入或输出类型以保留内部算子状态的数据类型(详见上文)。
  • 更改算子链接: 算子可以链接在一起以提高性能。从 1.3.x 之后的 Savepoint 恢复时,可以在保持状态一致性的同时修改链。有可能会破坏链,从而将有状态的算子移出链。还可以将新的或现有的有状态算子附加或注入到链中,或者修改链中的算子顺序。但是,当将 Savepoint 升级到 1.3.x 时,最重要的是拓扑在链接方面没有改变。应为链中的所有算子分配一个 ID,如上面 匹配算子状态 部分所述。

Table API & SQL

由于 Table API 和 SQL 程序的声明性,底层算子拓扑和状态表示主要由 表规划器确定和优化。

请注意,查询和 Flink 版本的任何更改都可能导致状态不兼容。每个新的大-小 Flink 版本(例如 1.121.13)都 可能引入新的优化器规则或更专业的运行时算子来改变执行计划。 然而,社区试图保持补丁版本的状态兼容 (例如 1.13.11.13.2)。

有关详细信息, 请参阅 table state management section

本节介绍跨版本升级 Flink 以及在版本之间迁移作业的一般方法。

简而言之,此过程包括 2 个基本步骤:

  1. 在以前的旧 Flink 版本中为您要迁移的作业创建一个 Savepoint 。
  2. 在新的 Flink 版本下从之前使用的 Savepoint 恢复您的工作。

除了这两个基本步骤之外,还可能需要一些额外的步骤,具体取决于您想要更改 Flink 版本的方式。 在本指南中,我们区分了两种跨 Flink 版本升级的方法: 就地升级和卷影副本升级。

对于就地更新,在获取 Savepoint 后,您需要:

  1. 停止/取消所有正在运行的作业。
  2. 关闭运行旧 Flink 版本的集群。
  3. 将 Flink 升级到集群上的较新版本。
  4. 在新版本下重启集群。

对于卷影副本,您需要:

  1. 在从 Savepoint 恢复之前,除了旧的 Flink 安装之外,设置新的 Flink 版本的新安装。
  2. 使用新的 Flink 安装从 Savepoint 恢复。
  3. 如果一切正常,停止并关闭旧的 Flink 集群。

在下文中,我们将首先介绍成功进行作业迁移的先决条件, 然后详细介绍我们之前概述的步骤。

前提条件

在开始迁移之前,请检查您尝试迁移的作业是否遵循 savepoints 的最佳实践。

特别是,我们建议您检查是否为您的工作中的算子设置了明确的 uid

这是一个前提,如果您忘记分配 uid,恢复应该仍然有效。 如果您遇到这种情况不起作用,您可以调用 setUidHash(String hash) 手动将以前 Flink 版本中生成的遗留顶点 ID 添加到您的作业中。对于每个算子(在算子链中:仅头部算子) ,您必须分配 32 个字符的十六进制字符串, 表示您可以在 web ui 或日志中看到的算子的哈希值。

除了算子 uid,目前作业迁移还有两个硬性前提条件会导致迁移失败:

  1. RocksDB不支持迁移 semi-asynchronous 模式 Checkpoint 的状态。如果您的旧作业使用此模式,您仍然可以将作业更改为使用 fully-asynchronous 模式,然后再将 Savepoint 用作迁移的基础。

  2. 另一个重要的先决条件是所有 Savepoint 数据必须可以从新安装的相同(绝对)路径下访问。 这还包括访问从 Savepoint 文件内部引用的任何其他文件 (状态后端快照的输出), 包括但不限于使用 State Processor API

第 1 步:使用 Savepoint 停止现有作业

版本迁移的第一个主要步骤是获取 Savepoint 并停止 在旧 Flink 版本上运行的作业。

您可以使用以下命令执行此操作:

  1. $ bin/flink stop [--savepointPath :savepointPath] :jobId

更多详情,请阅读 savepoint documentation.

在这一步中,我们更新集群的框架版本。 基本上意味着用新版本替换 Flink 安装的内容。 此步骤可能取决于您在集群中运行 Flink 的方式(例如独立运行,…)。

如果您对在集群中安装 Flink 不熟悉,请阅读 deployment and cluster setup documentation.

作为作业迁移的最后一步,您从上面在更新的集群上采取的 Savepoint 恢复。 您可以使用以下命令执行此操作:

  1. $ bin/flink run -s :savepointPath [:runArgs]

更多详情,请查看savepoint documentation.

兼容性速查表

Savepoint 在 Flink 版本之间是兼容的,如下表所示:

创建于 \ 恢复于1.1.x1.2.x1.3.x1.4.x1.5.x1.6.x1.7.x1.8.x1.9.x1.10.x1.11.x1.12.x1.13.x1.14.x1.15.x1.16.x限制
1.1.xOOO从 Flink 1.1.x 迁移到 1.2.x+ 的作业的最大并行度目前固定为作业的并行度。 这意味着迁移后无法增加并行度。 在未来的错误修复版本中可能会删除此限制。
1.2.xOOOOOOOOOOOOOOO从 Flink 1.2.x 迁移到 Flink 1.3.x+ 时,不支持同时更改并行度。 迁移到 Flink 1.3.x+ 后, 用户必须先获取一个 Savepoint ,然后再更改并行度。

为 CEP 应用程序创建的 Savepoint 无法在 1.4.x+ 中恢复。

Flink 1.2 中包含 Scala TraversableSerializer 的 Savepoint 不再与 Flink 1.8 兼容, 因为此序列化程序中的更新。 您可以通过首先升级到 Flink 1.3 和 Flink 1.7 之间的版本,然后更新到 Flink 1.8 来绕过这个限制。
1.3.xOOOOOOOOOOOOOOM如果 Savepoint 包含 Scala 案例类,则从 Flink 1.3.0 迁移到 Flink 1.4.[0,1] 将失败。用户必须直接迁移到 1.4.2+。
1.4.xOOOOOOOOOOOOO
1.5.xOOOOOOOOOOOO在 1.6.x 到 1.6.2 和 1.7.0 版本中恢复使用 1.5.x 创建的广播状态存在一个已知问题:FLINK-11087. 升级到 1.6.x 或 1.7.x 系列的用户需要 直接迁移到次要版本分别高于 1.6.2 和 1.7.0。
1.6.xOOOOOOOOOOO
1.7.xOOOOOOOOOO
1.8.xOOOOOOOOO
1.9.xOOOOOOOO
1.10.xOOOOOOO
1.11.xOOOOOO
1.12.xOOOOO
1.13.xOOOODon’t upgrade from 1.12.x to 1.13.x with an unaligned checkpoint. Please use a savepoint for migrating.
1.14.xOOO
1.15.xOOFor Table API: 1.15.0 and 1.15.1 generated non-deterministic UIDs for operators that make it difficult/impossible to restore state or upgrade to next patch version. A new table.exec.uid.generation config option (with correct default behavior) disables setting a UID for new pipelines from non-compiled plans. Existing pipelines can set table.exec.uid.generation=ALWAYS if the 1.15.0/1 behavior was acceptable due to a stable environment. See FLINK-28861 for more information.
1.16.xO