Streaming File Sink

这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。

由于在流处理中输入可能是无限的,所以流处理的文件 sink 会将数据写入到桶中。如何分桶是可以配置的,一种有效的默认策略是基于时间的分桶,这种策略每个小时写入一个新的桶,这些桶各包含了无限输出流的一部分数据。

在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。

StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet

使用行编码输出格式

只需要配置一个输出路径和一个 Encoder。Encoder负责为每个文件的 OutputStream 序列化数据。

基本用法如下:

  1. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  2. import org.apache.flink.core.fs.Path;
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
  4. DataStream<String> input = ...;
  5. final StreamingFileSink<String> sink = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
  7. .build();
  8. input.addSink(sink);
  1. import org.apache.flink.api.common.serialization.SimpleStringEncoder
  2. import org.apache.flink.core.fs.Path
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
  4. val input: DataStream[String] = ...
  5. val sink: StreamingFileSink[String] = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
  7. .build()
  8. input.addSink(sink)

上面的代码创建了一个按小时分桶、按默认策略滚动的 sink。默认分桶器是DateTimeBucketAssigner,默认滚动策略是DefaultRollingPolicy。可以为 sink builder 自定义BucketAssignerRollingPolicy。更多配置操作以及分桶器和滚动策略的工作机制和相互影响请参考:StreamingFileSink

使用批量编码输出格式

上面的示例使用 Encoder 分别序列化每一个记录。除此之外,流式文件 sink 还支持批量编码的输出格式,比如 Apache Parquet。使用这种编码格式需要用 StreamingFileSink.forBulkFormat() 来代替 StreamingFileSink.forRowFormat() ,然后指定一个 BulkWriter.Factory

ParquetAvroWriters中包含了为各种类型创建 BulkWriter.Factory 的静态方法。

重要: 批量编码格式只能和 OnCheckpointRollingPolicy 结合使用,每次做 checkpoint 时滚动文件。

关于S3的重要内容

重要提示 1: 对于 S3,StreamingFileSink 只支持基于 Hadoop 的文件系统实现,不支持基于 Presto 的实现。如果想使用 StreamingFileSink 向 S3 写入数据并且将 checkpoint 放在基于 Presto 的文件系统,建议明确指定 “s3a://” (for Hadoop)作为sink的目标路径方案,并且为 checkpoint 路径明确指定 “s3p://” (for Presto)。如果 Sink 和 checkpoint 都使用 “s3://” 路径的话,可能会导致不可预知的行为,因为双方的实现都在“监听”这个路径。

重要提示 2: StreamingFileSink 使用 S3 的 Multi-part Upload(后续使用MPU代替)特性可以保证精确一次的语义。这个特性支持以独立的块(因此被称为”multi-part”)模式上传文件,当 MPU 的所有部分文件成功上传之后,可以合并成原始文件。对于失效的 MPUs,S3 提供了一个基于桶生命周期的规则,用户可以用这个规则来丢弃在指定时间内未完成的MPU。如果在一些部分文件还未上传时触发 savepoint,并且这个规则设置的比较严格,这意味着相关的 MPU在作业重启之前可能会超时。后续的部分文件没有写入到 savepoint, 那么在 Flink 作业从 savepoint 恢复时,会因为拿不到缺失的部分文件,导致任务失败并抛出异常。