Scala API Extensions

In order to keep a fair amount of consistency between the Scala and Java APIs, someof the features that allow a high-level of expressiveness in Scala have been leftout from the standard APIs for both batch and streaming.

If you want to enjoy the full Scala experience you can choose to opt-in toextensions that enhance the Scala API via implicit conversions.

To use all the available extensions, you can just add a simple import for theDataSet API

  1. import org.apache.flink.api.scala.extensions._

or the DataStream API

  1. import org.apache.flink.streaming.api.scala.extensions._

Alternatively, you can import individual extensions a-là-carte to only use thoseyou prefer.

Accept partial functions

Normally, both the DataSet and DataStream APIs don’t accept anonymous patternmatching functions to deconstruct tuples, case classes or collections, like thefollowing:

  1. val data: DataSet[(Int, String, Double)] = // [...]
  2. data.map {
  3. case (id, name, temperature) => // [...]
  4. // The previous line causes the following compilation error:
  5. // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
  6. }

This extension introduces new methods in both the DataSet and DataStream Scala APIthat have a one-to-one correspondence in the extended API. These delegating methodsdo support anonymous pattern matching functions.

DataSet API

MethodOriginalExample
mapWithmap (DataSet)
  1. data.mapWith { case (, value) => value.toString}
mapPartitionWithmapPartition (DataSet)
  1. data.mapPartitionWith { case head #:: => head}
flatMapWithflatMap (DataSet)
  1. data.flatMapWith { case (, name, visitTimes) => visitTimes.map(name -> )}
filterWithfilter (DataSet)
  1. data.filterWith { case Train(, isOnTime) => isOnTime}
reduceWithreduce (DataSet, GroupedDataSet)
  1. data.reduceWith { case ((, amount1), (, amount2)) => amount1 + amount2}
reduceGroupWithreduceGroup (GroupedDataSet)
  1. data.reduceGroupWith { case id #:: value #:: => id -> value}
groupingBygroupBy (DataSet)
  1. data.groupingBy { case (id, , ) => id}
sortGroupWithsortGroup (GroupedDataSet)
  1. grouped.sortGroupWith(Order.ASCENDING) { case House(, value) => value}
combineGroupWithcombineGroup (GroupedDataSet)
  1. grouped.combineGroupWith { case header #:: amounts => amounts.sum}
projectingapply (JoinDataSet, CrossDataSet)
  1. data1.join(data2). whereClause(case (pk, ) => pk). isEqualTo(case (, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products }data1.cross(data2).projecting { case ((a, ), (, b) => a -> b}
projectingapply (CoGroupDataSet)
  1. data1.coGroup(data2). whereClause(case (pk, ) => pk). isEqualTo(case (, fk) => fk). projecting { case (head1 #:: , head2 #:: _) => head1 -> head2 }}

DataStream API

MethodOriginalExample
mapWithmap (DataStream)
  1. data.mapWith { case (, value) => value.toString}
flatMapWithflatMap (DataStream)
  1. data.flatMapWith { case (, name, visits) => visits.map(name -> )}
filterWithfilter (DataStream)
  1. data.filterWith { case Train(, isOnTime) => isOnTime}
keyingBykeyBy (DataStream)
  1. data.keyingBy { case (id, , ) => id}
mapWithmap (ConnectedDataStream)
  1. data.mapWith( map1 = case (, value) => value.toString, map2 = case (, , value, ) => value + 1)
flatMapWithflatMap (ConnectedDataStream)
  1. data.flatMapWith( flatMap1 = case (, json) => parse(json), flatMap2 = case (, , json, ) => parse(json))
keyingBykeyBy (ConnectedDataStream)
  1. data.keyingBy( key1 = case (, timestamp) => timestamp, key2 = case (id, , ) => id)
reduceWithreduce (KeyedStream, WindowedStream)
  1. data.reduceWith { case ((, sum1), (, sum2) => sum1 + sum2}
foldWithfold (KeyedStream, WindowedStream)
  1. data.foldWith(User(bought = 0)) { case (User(b), (, items)) => User(b + items.size)}
applyWithapply (WindowedStream)
  1. data.applyWith(0)( foldFunction = case (sum, amount) => sum + amount windowFunction = case (k, w, sum) => // […])
projectingapply (JoinedStream)
  1. data1.join(data2). whereClause(case (pk, ) => pk). isEqualTo(case (, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products }

For more information on the semantics of each method, please refer to theDataSet and DataStream API documentation.

To use this extension exclusively, you can add the following import:

  1. import org.apache.flink.api.scala.extensions.acceptPartialFunctions

for the DataSet extensions and

  1. import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions

The following snippet shows a minimal example of how to use these extensionmethods together (with the DataSet API):

  1. object Main {
  2. import org.apache.flink.api.scala.extensions._
  3. case class Point(x: Double, y: Double)
  4. def main(args: Array[String]): Unit = {
  5. val env = ExecutionEnvironment.getExecutionEnvironment
  6. val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
  7. ds.filterWith {
  8. case Point(x, _) => x > 1
  9. }.reduceWith {
  10. case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
  11. }.mapWith {
  12. case Point(x, y) => (x, y)
  13. }.flatMapWith {
  14. case (x, y) => Seq("x" -> x, "y" -> y)
  15. }.groupingBy {
  16. case (id, value) => id
  17. }
  18. }
  19. }