状态后台

Data StreamAPI编写的程序通常以各种形式保存状态:

  • Windows会在触发数据元或聚合之前收集数据元或聚合
  • 转换函数可以使用键/值状态接口来存储值
  • 转换函数可以实现CheckpointedFunction接口以使其局部变量具有容错能力另请参阅流API指南中的状态部分

激活检查点时,检查点会持续保持此类状态,以防止数据丢失并始终如一地恢复。状态如何在内部表示,以及在检查点上如何以及在何处持续取决于所选择的状态后台

可用的状态后台

开箱即用,Flink捆绑了这些状态后台:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend如果没有配置其他任何内容,系统将使用MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend保存数据在内部作为Java堆的对象。键/值状态和窗口 算子包含存储值,触发器等的哈希表。

在检查点时,此状态后台将对状态进行SNAPSHOT,并将其作为检查点确认消息的一部分发送到JobManager(主服务器),JobManager也将其存储在其堆上。

可以将MemoryStateBackend配置为使用异步SNAPSHOT。虽然我们强烈建议使用异步SNAPSHOT来避免阻塞管道,但请注意,默认情况下,此函数目前处于启用状态。要禁用此函数,用户可以MemoryStateBackend在构造函数中将相应的布尔标志实例化为false(这应该仅用于调试),例如:

  1. new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

MemoryStateBackend的局限性:

  • 默认情况下,每个状态的大小限制为5MB。可以在MemoryStateBackend的构造函数中增加此值。
  • 无论配置的最大状态大小如何,状态都不能大于akka帧大小(请参阅配置)。
  • 聚合状态必须适合JobManager内存。鼓励MemoryStateBackend用于:

  • 本地开发和调试

  • 几乎没有状态的作业,例如仅包含一次记录函数的作业(Map,FlatMap,Filter,…)。Kafka消费者需要很少的状态。

FsStateBackend

所述FsStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010/Flink/检查点”或“文件:///数据/Flink/检查点”。

FsStateBackend将正在运行的数据保存在TaskManager的内存中。在检查点时,它将状态SNAPSHOT写入配置的文件系统和目录中的文件。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

FsStateBackend 默认使用异步SNAPSHOT,以避免在编写状态检查点时阻塞处理管道。要禁用此函数,用户可以FsStateBackend在构造函数集中使用相应的布尔标志来实例化a false,例如:

  1. new FsStateBackend(path, false);

鼓励FsStateBackend:

  • 具有大状态,长窗口,大键/值状态的作业。
  • 所有高可用性设置。

RocksDBStateBackend

所述RocksDBStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010/Flink/检查点”或“文件:///数据/Flink/检查点”。

RocksDBStateBackend将RocksDB数据库中的飞行中数据保存在(默认情况下)存储在TaskManager数据目录中。在检查点时,整个RocksDB数据库将被检查点到配置的文件系统和目录中。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

RocksDBStateBackend始终执行异步SNAPSHOT。

RocksDBStateBackend的局限性:

  • 由于RocksDB的JNI桥接API基于byte[],因此每个Keys和每个值的最大支持大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并 算子操作的状态(例如ListState)可以静默地累积>2 ^ 31字节的值大小,然后在下次检索时失败。这是目前RocksDBJNI的一个限制。我们鼓励RocksDBStateBackend:

  • 具有非常大的状态,长窗口,大键/值状态的作业。

  • 所有高可用性设置。请注意,您可以保存的状态量仅受可用磁盘空间量的限制。与将状态保持在内存中的FsStateBackend相比,这允许保持非常大的状态。但是,这也意味着使用此状态后台可以实现的最大吞吐量更低。对此后台的所有读/写都必须通过去/序列化来检索/存储状态对象,这比使用堆基表示正在进行的堆上表示更昂贵。

RocksDBStateBackend是目前唯一提供增量检查点的后台(见这里)。

配置状态后台

如果您不指定任何内容,则默认状态后台是JobManager。如果要为群集上的所有作业建立不同的默认值,可以通过在flink-conf.yaml中定义新的默认状态后台来实现可以基于每个作业覆盖默认状态后台,如下所示。

设置每个作业状态后台

每个作业状态后台StreamExecutionEnvironment在作业上设置,如下例所示:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

设置默认状态后台

可以flink-conf.yaml使用配置Keys在配置中配置默认状态后台state.backend

config条目的可能值包括jobmanager(MemoryStateBackend),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend),或实现状态后台工厂FsStateBackendFactory的类的完全限定类名,例如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryRocksDBStateBackend。

state.checkpoints.dir选项定义所有后台写入检查点数据和元数据文件的目录。您可以在此处找到有关检查点目录结构的更多详细信息

配置文件中的示例部分可能如下所示:

  1. # The backend that will be used to store operator state checkpoints
  2. state.backend: filesystem
  3. # Directory for storing checkpoints
  4. state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints