Operators

Operators transform one or more DataStreams into a new DataStream. Programs can combinemultiple transformations into sophisticated dataflow topologies.

This section gives a description of the basic transformations, the effective physicalpartitioning after applying those as well as insights into Flink’s operator chaining.

DataStream Transformations

TransformationDescription
MapDataStream → DataStreamTakes one element and produces one element. A map function that doubles the values of the input stream:
  1. DataStream<Integer> dataStream = //…dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 value; }});
FlatMapDataStream → DataStreamTakes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
  1. dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } }});
FilterDataStream → DataStreamEvaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
  1. dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; }});
KeyByDataStream → KeyedStreamLogically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
  1. dataStream.keyBy("someKey") // Key by field "someKey"dataStream.keyBy(0) // Key by the first element of a Tuple
Attention A type cannot be a key if:- it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.- it is an array of any type.
ReduceKeyedStream → DataStreamA "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.A reduce function that creates a stream of partial sums:
  1. keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; }});
FoldKeyedStream → DataStreamA "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", …
  1. DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });
AggregationsKeyedStream → DataStreamRolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
  1. keyedStream.sum(0);keyedStream.sum("key");keyedStream.min(0);keyedStream.min("key");keyedStream.max(0);keyedStream.max("key");keyedStream.minBy(0);keyedStream.minBy("key");keyedStream.maxBy(0);keyedStream.maxBy("key");
WindowKeyedStream → WindowedStreamWindows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
  1. dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
WindowAllDataStream → AllWindowedStreamWindows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
  1. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Window ApplyWindowedStream → DataStreamAllWindowedStream → DataStreamApplies a general function to the window as a whole. Below is a function that manually sums the elements of a window.Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
  1. windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});
Window ReduceWindowedStream → DataStreamApplies a functional reduce function to the window and returns the reduced value.
  1. windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); }});
Window FoldWindowedStream → DataStreamApplies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
  1. windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; }});
Aggregations on windowsWindowedStream → DataStreamAggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
  1. windowedStream.sum(0);windowedStream.sum("key");windowedStream.min(0);windowedStream.min("key");windowedStream.max(0);windowedStream.max("key");windowedStream.minBy(0);windowedStream.minBy("key");windowedStream.maxBy(0);windowedStream.maxBy("key");
UnionDataStream → DataStreamUnion of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
  1. dataStream.union(otherStream1, otherStream2, …);
Window JoinDataStream,DataStream → DataStreamJoin two data streams on a given key and a common window.
  1. dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {…});
Interval JoinKeyedStream,KeyedStream → DataStreamJoin two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
  1. // this will join the two streams so that// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {…});
Window CoGroupDataStream,DataStream → DataStreamCogroups two data streams on a given key and a common window.
  1. dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {…});
ConnectDataStream,DataStream → ConnectedStreams"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
  1. DataStream<Integer> someStream = //…DataStream<String> otherStream = //…ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMapConnectedStreams → DataStreamSimilar to map and flatMap on a connected data stream
  1. connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; }});connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } }});
SplitDataStream → SplitStream Split the stream into two or more streams according to some criterion.
  1. SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }});
SelectSplitStream → DataStream Select one or more streams from a split stream.
  1. SplitStream<Integer> split;DataStream<Integer> even = split.select("even");DataStream<Integer> odd = split.select("odd");DataStream<Integer> all = split.select("even","odd");
IterateDataStream → IterativeStream → DataStream Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and appliesthe iteration body continuously. Elements that are greater than 0 are sent backto the feedback channel, and the rest of the elements are forwarded downstream.See iterations for a complete description.
  1. IterativeStream<Long> iteration = initialStream.iterate();DataStream<Long> iterationBody = iteration.map (/do something/);DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; }});iteration.closeWith(feedback);DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; }});
Extract TimestampsDataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time.
  1. stream.assignTimestamps (new TimeStampExtractor() {…});
TransformationDescription
MapDataStream → DataStreamTakes one element and produces one element. A map function that doubles the values of the input stream:
  1. dataStream.map { x => x 2 }
FlatMapDataStream → DataStreamTakes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
  1. dataStream.flatMap { str => str.split(" ") }
FilterDataStream → DataStreamEvaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
  1. dataStream.filter { != 0 }
KeyByDataStream → KeyedStreamLogically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.
  1. dataStream.keyBy("someKey") // Key by field "someKey"dataStream.keyBy(0) // Key by the first element of a Tuple
ReduceKeyedStream → DataStreamA "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. A reduce function that creates a stream of partial sums:
  1. keyedStream.reduce { + }
</p>
FoldKeyedStream → DataStreamA "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", …
  1. val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
AggregationsKeyedStream → DataStreamRolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
  1. keyedStream.sum(0)keyedStream.sum("key")keyedStream.min(0)keyedStream.min("key")keyedStream.max(0)keyedStream.max("key")keyedStream.minBy(0)keyedStream.minBy("key")keyedStream.maxBy(0)keyedStream.maxBy("key")
WindowKeyedStream → WindowedStreamWindows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.
  1. dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAllDataStream → AllWindowedStreamWindows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
  1. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window ApplyWindowedStream → DataStreamAllWindowedStream → DataStreamApplies a general function to the window as a whole. Below is a function that manually sums the elements of a window.Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
  1. windowedStream.apply { WindowFunction }// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply { AllWindowFunction }
Window ReduceWindowedStream → DataStreamApplies a functional reduce function to the window and returns the reduced value.
  1. windowedStream.reduce { + _ }
Window FoldWindowedStream → DataStreamApplies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
  1. val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windowsWindowedStream → DataStreamAggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
  1. windowedStream.sum(0)windowedStream.sum("key")windowedStream.min(0)windowedStream.min("key")windowedStream.max(0)windowedStream.max("key")windowedStream.minBy(0)windowedStream.minBy("key")windowedStream.maxBy(0)windowedStream.maxBy("key")
UnionDataStream → DataStreamUnion of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
  1. dataStream.union(otherStream1, otherStream2, …)
Window JoinDataStream,DataStream → DataStreamJoin two data streams on a given key and a common window.
  1. dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { }
Window CoGroupDataStream,DataStream → DataStreamCogroups two data streams on a given key and a common window.
  1. dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {}
ConnectDataStream,DataStream → ConnectedStreams"Connects" two data streams retaining their types, allowing for shared state between the two streams.
  1. someStream : DataStream[Int] = otherStream : DataStream[String] = val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMapConnectedStreams → DataStreamSimilar to map and flatMap on a connected data stream
  1. connectedStreams.map( ( : Int) => true, ( : String) => false)connectedStreams.flatMap( ( : Int) => true, ( : String) => false)
SplitDataStream → SplitStream Split the stream into two or more streams according to some criterion.
  1. val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") })
SelectSplitStream → DataStream Select one or more streams from a split stream.
  1. val even = split select "even"val odd = split select "odd"val all = split.select("even","odd")
IterateDataStream → IterativeStream → DataStream Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and appliesthe iteration body continuously. Elements that are greater than 0 are sent backto the feedback channel, and the rest of the elements are forwarded downstream.See iterations for a complete description.
  1. initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter( > 0), iterationBody.filter( <= 0)) }}
Extract TimestampsDataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time.
  1. stream.assignTimestamps { timestampExtractor }

Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:

  1. val data: DataStream[(Int, String, Double)] = // [...]
  2. data.map {
  3. case (id, name, temperature) => // [...]
  4. }

is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension.

The following transformations are available on data streams of Tuples:

TransformationDescription
ProjectDataStream → DataStreamSelects a subset of fields from the tuples
  1. DataStream<Tuple3<Integer, Double, String>> in = // […]DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Physical partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation,via the following functions.

TransformationDescription
Custom partitioningDataStream → DataStream Uses a user-defined Partitioner to select the target task for each element.
  1. dataStream.partitionCustom(partitioner, "someKey");dataStream.partitionCustom(partitioner, 0);
Random partitioningDataStream → DataStream Partitions elements randomly according to a uniform distribution.
  1. dataStream.shuffle();
Rebalancing (Round-robin partitioning)DataStream → DataStream Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
  1. dataStream.rebalance();
RescalingDataStream → DataStream Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example:Checkpoint barriers in data streams
  1. dataStream.rescale();
BroadcastingDataStream → DataStream Broadcasts elements to every partition.
  1. dataStream.broadcast();
TransformationDescription
Custom partitioningDataStream → DataStream Uses a user-defined Partitioner to select the target task for each element.
  1. dataStream.partitionCustom(partitioner, "someKey")dataStream.partitionCustom(partitioner, 0)
Random partitioningDataStream → DataStream Partitions elements randomly according to a uniform distribution.
  1. dataStream.shuffle()
Rebalancing (Round-robin partitioning)DataStream → DataStream Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
  1. dataStream.rebalance()
RescalingDataStream → DataStream Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example:Checkpoint barriers in data streams
  1. dataStream.rescale()
BroadcastingDataStream → DataStream Broadcasts elements to every partition.
  1. dataStream.broadcast()

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for betterperformance. Flink by default chains operators if this is possible (e.g., two subsequent maptransformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining inthe whole job. For more fine grained control, the following functions are available. Note thatthese functions can only be used right after a DataStream transformation as they refer to theprevious transformation. For example, you can use someStream.map(…).startNewChain(), butyou cannot use someStream.startNewChain().

A resource group is a slot in Flink, seeslots. You canmanually isolate operators in separate slots if desired.

TransformationDescription
Start new chainBegin a new chain, starting with this operator. The twomappers will be chained, and filter will not be chained tothe first mapper.
  1. someStream.filter(…).map(…).startNewChain().map(…);
Disable chainingDo not chain the map operator
  1. someStream.map(…).disableChaining();
Set slot sharing groupSet the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
  1. someStream.filter(…).slotSharingGroup("name");
TransformationDescription
Start new chainBegin a new chain, starting with this operator. The twomappers will be chained, and filter will not be chained tothe first mapper.
  1. someStream.filter(…).map(…).startNewChain().map(…)
Disable chainingDo not chain the map operator
  1. someStream.map(…).disableChaining()
Set slot sharing groupSet the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
  1. someStream.filter(…).slotSharingGroup("name")