算子

算子将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合成复杂的数据流拓扑。

本节介绍了基本转换,应用这些转换后的有效物理分区以及对Flink 算子链接的见解。

DataStream转换

转型描述
映射 DataStream→DataStream采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍:
  1. DataStream<Integer> dataStream = //…dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 value; }});
FlatMap DataStream→DataStream采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:
  1. dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } }});
Filter DataStream→DataStream计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:
  1. dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; }});
KeyBy DataStream→KeyedStream逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法此转换返回KeyedStream,其中包括使用被Keys化状态所需KeyedStream
  1. dataStream.keyBy("someKey") // Key by field "someKey"dataStream.keyBy(0) // Key by the first element of a Tuple
注意如果出现以下情况,则类型不能成为关键- 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。- 它是任何类型的数组。
Reduce KeyedStream→DataStream被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。reduce函数,用于创建部分和的流:
  1. keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; }});
折叠 KeyedStream→DataStream具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..
  1. DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });
聚合 KeyedStream→DataStream在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。
  1. keyedStream.sum(0);keyedStream.sum("key");keyedStream.min(0);keyedStream.min("key");keyedStream.max(0);keyedStream.max("key");keyedStream.minBy(0);keyedStream.minBy("key");keyedStream.maxBy(0);keyedStream.maxBy("key");
Window KeyedStream→WindowedStream可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关窗口的完整说明,请参见windows。
  1. dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
WindowAll DataStream→AllWindowedStreamWindows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关窗口的完整说明,请参见windows。警告:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中。
  1. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Window Apply WindowedStream→DataStream AllWindowedStream→DataStream将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。
  1. windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});
Window Reduce WindowedStream→DataStream将函数缩减函数应用于窗口并返回缩小的值。
  1. windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); }});
Window Fold WindowedStream→DataStream将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:
  1. windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; }});
Windows上的聚合 WindowedStream→DataStream聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。
  1. windowedStream.sum(0);windowedStream.sum("key");windowedStream.min(0);windowedStream.min("key");windowedStream.max(0);windowedStream.max("key");windowedStream.minBy(0);windowedStream.minBy("key");windowedStream.maxBy(0);windowedStream.maxBy("key");
Union DataStream →DataStream两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。
  1. dataStream.union(otherStream1, otherStream2, …);
Window Join DataStream,DataStream→DataStream在给定Keys和公共窗口上连接两个数据流。
  1. dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {…});
Interval Join KeyedStream,KeyedStream→DataStream在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
  1. // this will join the two streams so that// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {…});
Window CoGroup DataStream,DataStream→DataStream在给定Keys和公共窗口上对两个数据流进行Cogroup。
  1. dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {…});
连接 DataStream,DataStream→ConnectedStreams“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。
  1. DataStream<Integer> someStream = //…DataStream<String> otherStream = //…ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap,CoFlatMap ConnectedStreams→DataStream类似于连接数据流上的map和flatMap
  1. connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; }});connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } }});
拆分 DataStream→SplitStream根据某些标准将流拆分为两个或更多个流。
  1. SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }});
选择 SplitStream→DataStream从拆分流中选择一个或多个流。
  1. SplitStream<Integer> split;DataStream<Integer> even = split.select("even");DataStream<Integer> odd = split.select("odd");DataStream<Integer> all = split.select("even","odd");
迭代 DataStream→IterativeStream→DataStream通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅迭代
  1. IterativeStream<Long> iteration = initialStream.iterate();DataStream<Long> iterationBody = iteration.map (/do something/);DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; }});iteration.closeWith(feedback);DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; }});
提取时间戳 DataStream→DataStream从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看活动时间
  1. stream.assignTimestamps (new TimeStampExtractor() {…});
TransformationDescription
MapDataStream → DataStreamTakes one element and produces one element. A map function that doubles the values of the input stream:
  1. dataStream.map { x => x 2 }
FlatMapDataStream → DataStreamTakes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
  1. dataStream.flatMap { str => str.split(" ") }
FilterDataStream → DataStreamEvaluates a boolean function for each element and retains those for which the function returns true.A filter that filters out zero values:
  1. dataStream.filter { != 0 }
KeyByDataStream → KeyedStreamLogically partitions a stream into disjoint partitions, each partition containing elements of the same key.Internally, this is implemented with hash partitioning. See keys on how to specify keys.This transformation returns a KeyedStream.
  1. dataStream.keyBy("someKey") // Key by field "someKey"dataStream.keyBy(0) // Key by the first element of a Tuple
ReduceKeyedStream → DataStreamA "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value andemits the new value.A reduce function that creates a stream of partial sums:
  1. keyedStream.reduce { + }
</p>
FoldKeyedStream → DataStreamA "rolling" fold on a keyed data stream with an initial value.Combines the current element with the last folded value andemits the new value.A fold function that, when applied on the sequence (1,2,3,4,5),emits the sequence "start-1", "start-1-2", "start-1-2-3", …
  1. val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
AggregationsKeyedStream → DataStreamRolling aggregations on a keyed data stream. The difference between minand minBy is that min returns the minimum value, whereas minBy returnsthe element that has the minimum value in this field (same for max and maxBy).
  1. keyedStream.sum(0)keyedStream.sum("key")keyedStream.min(0)keyedStream.min("key")keyedStream.max(0)keyedStream.max("key")keyedStream.minBy(0)keyedStream.minBy("key")keyedStream.maxBy(0)keyedStream.maxBy("key")
WindowKeyedStream → WindowedStreamWindows can be defined on already partitioned KeyedStreams. Windows group the data in eachkey according to some characteristic (e.g., the data that arrived within the last 5 seconds).See windows for a description of windows.
  1. dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAllDataStream → AllWindowedStreamWindows can be defined on regular DataStreams. Windows group all the stream eventsaccording to some characteristic (e.g., the data that arrived within the last 5 seconds).See windows for a complete description of windows.WARNING: This is in many cases a non-parallel transformation. All records will begathered in one task for the windowAll operator.
  1. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window ApplyWindowedStream → DataStreamAllWindowedStream → DataStreamApplies a general function to the window as a whole. Below is a function that manually sums the elements of a window.Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
  1. windowedStream.apply { WindowFunction }// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply { AllWindowFunction }
Window ReduceWindowedStream → DataStreamApplies a functional reduce function to the window and returns the reduced value.
  1. windowedStream.reduce { + _ }
Window FoldWindowedStream → DataStreamApplies a functional fold function to the window and returns the folded value.The example function, when applied on the sequence (1,2,3,4,5),folds the sequence into the string "start-1-2-3-4-5":
  1. val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windowsWindowedStream → DataStreamAggregates the contents of a window. The difference between minand minBy is that min returns the minimum value, whereas minBy returnsthe element that has the minimum value in this field (same for max and maxBy).
  1. windowedStream.sum(0)windowedStream.sum("key")windowedStream.min(0)windowedStream.min("key")windowedStream.max(0)windowedStream.max("key")windowedStream.minBy(0)windowedStream.minBy("key")windowedStream.maxBy(0)windowedStream.maxBy("key")
UnionDataStream → DataStreamUnion of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data streamwith itself you will get each element twice in the resulting stream.
  1. dataStream.union(otherStream1, otherStream2, …)
Window JoinDataStream,DataStream → DataStreamJoin two data streams on a given key and a common window.
  1. dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { }
Window CoGroupDataStream,DataStream → DataStreamCogroups two data streams on a given key and a common window.
  1. dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {}
ConnectDataStream,DataStream → ConnectedStreams"Connects" two data streams retaining their types, allowing for shared state betweenthe two streams.
  1. someStream : DataStream[Int] = otherStream : DataStream[String] = val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMapConnectedStreams → DataStreamSimilar to map and flatMap on a connected data stream
  1. connectedStreams.map( ( : Int) => true, ( : String) => false)connectedStreams.flatMap( ( : Int) => true, ( : String) => false)
SplitDataStream → SplitStreamSplit the stream into two or more streams according to some criterion.
  1. val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") })
SelectSplitStream → DataStreamSelect one or more streams from a split stream.
  1. val even = split select "even"val odd = split select "odd"val all = split.select("even","odd")
IterateDataStream → IterativeStream → DataStreamCreates a "feedback" loop in the flow, by redirecting the output of one operatorto some previous operator. This is especially useful for defining algorithms thatcontinuously update a model. The following code starts with a stream and appliesthe iteration body continuously. Elements that are greater than 0 are sent backto the feedback channel, and the rest of the elements are forwarded downstream.See iterations for a complete description.
  1. initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter( > 0), iterationBody.filter( <= 0)) }}
Extract TimestampsDataStream → DataStreamExtracts timestamps from records in order to work with windowsthat use event time semantics.See Event Time.
  1. stream.assignTimestamps { timestampExtractor }

Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:

  1. val data: DataStream[(Int, String, Double)] = // [...]
  2. data.map {
  3. case (id, name, temperature) => // [...]
  4. }

is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension.

以下转换可用于元组的数据流:

转型描述
Project DataStream→DataStream从元组中选择字段的子集
  1. DataStream<Tuple3<Integer, Double, String>> in = // […]DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理分区

Flink还通过以下函数对转换后的精确流分区进行低级控制(如果需要)。

转型描述
自定义分区 DataStream→DataStream使用用户定义的分区程序为每个数据元选择目标任务。
  1. dataStream.partitionCustom(partitioner, "someKey");dataStream.partitionCustom(partitioner, 0);
随机分区 DataStream→DataStream根据均匀分布随机分配数据元。
  1. dataStream.shuffle();
Rebalance (循环分区) DataStream→DataStream分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。
  1. dataStream.rebalance();
重新调整 DataStream→DataStream分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:数据流中的检查点障碍
  1. dataStream.rescale();
广播 DataStream→DataStream向每个分区广播数据元。
  1. dataStream.broadcast();
TransformationDescription
Custom partitioningDataStream → DataStreamUses a user-defined Partitioner to select the target task for each element.
  1. dataStream.partitionCustom(partitioner, "someKey")dataStream.partitionCustom(partitioner, 0)
Random partitioningDataStream → DataStreamPartitions elements randomly according to a uniform distribution.
  1. dataStream.shuffle()
Rebalancing (Round-robin partitioning)DataStream → DataStreamPartitions elements round-robin, creating equal load per partition. Useful for performanceoptimization in the presence of data skew.
  1. dataStream.rebalance()
RescalingDataStream → DataStreamPartitions elements, round-robin, to a subset of downstream operations. This isuseful if you want to have pipelines where you, for example, fan out fromeach parallel instance of a source to a subset of several mappers to distribute loadbut don't want the full rebalance that rebalance() would incur. This would require onlylocal data transfers instead of transferring data over network, depending onother configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sendselements depends on the degree of parallelism of both the upstream and downstream operation.For example, if the upstream operation has parallelism 2 and the downstream operationhas parallelism 4, then one upstream operation would distribute elements to twodownstream operations while the other upstream operation would distribute to the othertwo downstream operations. If, on the other hand, the downstream operation has parallelism2 while the upstream operation has parallelism 4 then two upstream operations woulddistribute to one downstream operation while the other two upstream operations woulddistribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or severaldownstream operations will have a differing number of inputs from upstream operations.</p>Please see this figure for a visualization of the connection pattern in the aboveexample:</p>Checkpoint barriers in data streams
  1. dataStream.rescale()
BroadcastingDataStream → DataStreamBroadcasts elements to every partition.
  1. dataStream.broadcast()

任务链和资源组

链接两个后续转换意味着将它们共同定位在同一个线程中以获得更好的性能。如果可能的话,Flink默认链算子(例如,两个后续的映射转换)。如果需要,API可以对链接进行细粒度控制:

使用StreamExecutionEnvironment.disableOperatorChaining()如果要禁用整个工作链。对于更细粒度的控制,可以使用以下函数。请注意,这些函数只能在DataStream转换后立即使用,因为它们引用了前一个转换。例如,您可以使用someStream.map(…).startNewChain(),但不能使用someStream.startNewChain()

资源组是Flink中的一个插槽,请参阅插槽如果需要,您可以在单独的插槽中手动隔离算子

转型描述
开始新的链条从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。
  1. someStream.filter(…).map(…).startNewChain().map(…);
禁用链接不要链接Map 算子
  1. someStream.map(…).disableChaining();
设置插槽共享组设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。
  1. someStream.filter(…).slotSharingGroup("name");
TransformationDescription
Start new chainBegin a new chain, starting with this operator. The twomappers will be chained, and filter will not be chained tothe first mapper.
  1. someStream.filter(…).map(…).startNewChain().map(…)
Disable chainingDo not chain the map operator
  1. someStream.map(…).disableChaining()
Set slot sharing groupSet the slot sharing group of an operation. Flink will put operations with the sameslot sharing group into the same slot while keeping operations that don't have theslot sharing group in other slots. This can be used to isolate slots. The slot sharinggroup is inherited from input operations if all input operations are in the same slotsharing group.The name of the default slot sharing group is "default", operations can explicitlybe put into this group by calling slotSharingGroup("default").
  1. someStream.filter(…).slotSharingGroup("name")