Basic API Concepts

Flink programs are regular programs that implement transformations on distributed collections(e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating).Collections are initially created from sources (e.g., by reading from files, kafka topics, or from local, in-memorycollections). Results are returned via sinks, which may for example write the data to(distributed) files, or to standard output (for example, the command line terminal).Flink programs run in a variety of contexts, standalone, or embedded in other programs.The execution can happen in a local JVM, or on clusters of many machines.

Depending on the type of data sources, i.e. bounded or unbounded sources, you would eitherwrite a batch program or a streaming program where the DataSet API is used for batchand the DataStream API is used for streaming. This guide will introduce the basic conceptsthat are common to both APIs but please see ourStreaming Guide andBatch Guide for concrete information aboutwriting programs with each API.

NOTE: When showing actual examples of how the APIs can be used we will useStreamingExecutionEnvironment and the DataStream API. The concepts are exactly the samein the DataSet API, just replace by ExecutionEnvironment and DataSet.

DataSet and DataStream

Flink has the special classes DataSet and DataStream to represent data in a program. Youcan think of them as immutable collections of data that can contain duplicates. In the caseof DataSet the data is finite while for a DataStream the number of elements can be unbounded.

These collections differ from regular Java collections in some key ways. First, theyare immutable, meaning that once they are created you cannot add or remove elements. You can alsonot simply inspect the elements inside.

A collection is initially created by adding a source in a Flink program and new collections arederived from these by transforming them using API methods such as map, filter and so on.

Flink programs look like regular programs that transform collections of data.Each program consists of the same basic parts:

  • Obtain an execution environment,
  • Load/create the initial data,
  • Specify transformations on this data,
  • Specify where to put the results of your computations,
  • Trigger the program execution

We will now give an overview of each of those steps, please refer to the respective sections formore details. Note that all core classes of the Java DataSet API are found in the packageorg.apache.flink.api.javawhile the classes of the Java DataStream API can be found inorg.apache.flink.streaming.api.

The StreamExecutionEnvironment is the basis for all Flink programs. You canobtain one using these static methods on StreamExecutionEnvironment:

  1. getExecutionEnvironment()
  2. createLocalEnvironment()
  3. createRemoteEnvironment(String host, int port, String... jarFiles)

Typically, you only need to use getExecutionEnvironment(), since thiswill do the right thing depending on the context: if you are executingyour program inside an IDE or as a regular Java program it will createa local environment that will execute your program on your local machine. Ifyou created a JAR file from your program, and invoke it through thecommand line, the Flink cluster managerwill execute your main method and getExecutionEnvironment() will returnan execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methodsto read from files using various methods: you can just read them line by line,as CSV files, or using completely custom data input formats. To just reada text file as a sequence of lines, you can use:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = env.readTextFile("file:///path/to/file");

This will give you a DataStream on which you can then apply transformations to create newderived DataStreams.

You apply transformations by calling methods on DataStream with a transformationfunctions. For example, a map transformation looks like this:

  1. DataStream<String> input = ...;
  2. DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
  3. @Override
  4. public Integer map(String value) {
  5. return Integer.parseInt(value);
  6. }
  7. });

This will create a new DataStream by converting every String in the originalcollection to an Integer.

Once you have a DataStream containing your final results, you can write it to an outside systemby creating a sink. These are just some example methods for creating a sink:

  1. writeAsText(String path)
  2. print()

We will now give an overview of each of those steps, please refer to the respective sections formore details. Note that all core classes of the Scala DataSet API are found in the packageorg.apache.flink.api.scalawhile the classes of the Scala DataStream API can be found inorg.apache.flink.streaming.api.scala.

The StreamExecutionEnvironment is the basis for all Flink programs. You canobtain one using these static methods on StreamExecutionEnvironment:

  1. getExecutionEnvironment()
  2. createLocalEnvironment()
  3. createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

Typically, you only need to use getExecutionEnvironment(), since thiswill do the right thing depending on the context: if you are executingyour program inside an IDE or as a regular Java program it will createa local environment that will execute your program on your local machine. Ifyou created a JAR file from your program, and invoke it through thecommand line, the Flink cluster managerwill execute your main method and getExecutionEnvironment() will returnan execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methodsto read from files using various methods: you can just read them line by line,as CSV files, or using completely custom data input formats. To just reada text file as a sequence of lines, you can use:

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val text: DataStream[String] = env.readTextFile("file:///path/to/file")

This will give you a DataStream on which you can then apply transformations to create newderived DataStreams.

You apply transformations by calling methods on DataSet with a transformationfunctions. For example, a map transformation looks like this:

  1. val input: DataSet[String] = ...
  2. val mapped = input.map { x => x.toInt }

This will create a new DataStream by converting every String in the originalcollection to an Integer.

Once you have a DataStream containing your final results, you can write it to an outside systemby creating a sink. These are just some example methods for creating a sink:

  1. writeAsText(path: String)
  2. print()

Once you specified the complete program you need to trigger the program execution by callingexecute() on the StreamExecutionEnvironment.Depending on the type of the ExecutionEnvironment the execution will be triggered on your localmachine or submit your program for execution on a cluster.

The execute() method is returning a JobExecutionResult, this contains executiontimes and accumulator results.

Please see the Streaming Guidefor information about streaming data sources and sink and for more in-depths informationabout the supported transformations on DataStream.

Check out the Batch Guidefor information about batch data sources and sink and for more in-depths informationabout the supported transformations on DataSet.

Lazy Evaluation

All Flink programs are executed lazily: When the program’s main method is executed, the data loadingand transformations do not happen directly. Rather, each operation is created and added to theprogram’s plan. The operations are actually executed when the execution is explicitly triggered byan execute() call on the execution environment. Whether the program is executed locallyor on a cluster depends on the type of execution environment

The lazy evaluation lets you construct sophisticated programs that Flink executes as oneholistically planned unit.

Specifying Keys

Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined ona collection of elements. Other transformations (Reduce, GroupReduce,Aggregate, Windows) allow data being grouped on a key before they areapplied.

A DataSet is grouped as

  1. DataSet<...> input = // [...]
  2. DataSet<...> reduced = input
  3. .groupBy(/*define key here*/)
  4. .reduceGroup(/*do something*/);

while a key can be specified on a DataStream using

  1. DataStream<...> input = // [...]
  2. DataStream<...> windowed = input
  3. .keyBy(/*define key here*/)
  4. .window(/*window specification*/);

The data model of Flink is not based on key-value pairs. Therefore,you do not need to physically pack the data set types into keys andvalues. Keys are “virtual”: they are defined as functions over theactual data to guide the grouping operator.

NOTE: In the following discussion we will use the DataStream API and keyBy.For the DataSet API you just have to replace by DataSet and groupBy.

Define keys for Tuples

The simplest case is grouping Tuples on one or morefields of the Tuple:

  1. DataStream<Tuple3<Integer,String,Long>> input = // [...]
  2. KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
  1. val input: DataStream[(Int, String, Long)] = // [...]
  2. val keyed = input.keyBy(0)

The tuples are grouped on the first field (the one ofInteger type).

  1. DataStream<Tuple3<Integer,String,Long>> input = // [...]
  2. KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
  1. val input: DataSet[(Int, String, Long)] = // [...]
  2. val grouped = input.groupBy(0,1)

Here, we group the tuples on a composite key consisting of the first and thesecond field.

A note on nested Tuples: If you have a DataStream with a nested tuple, such as:

  1. DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

Specifying keyBy(0) will cause the system to use the full Tuple2 as a key (with the Integer and Float being the key). If you want to “navigate” into the nested Tuple2, you have to use field expression keys which are explained below.

Define keys using Field Expressions

You can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping.

Field expressions make it very easy to select fields in (nested) composite types such as Tuple and POJO types.

In the example below, we have a WC POJO with two fields “word” and “count”. To group by the field word, we just pass its name to the keyBy() function.

  1. // some ordinary POJO (Plain old Java Object)
  2. public class WC {
  3. public String word;
  4. public int count;
  5. }
  6. DataStream<WC> words = // [...]
  7. DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

Field Expression Syntax:

  • Select POJO fields by their field name. For example "user" refers to the “user” field of a POJO type.

  • Select Tuple fields by their field name or 0-offset field index. For example "f0" and "5" refer to the first and sixth field of a Java Tuple type, respectively.

  • You can select nested fields in POJOs and Tuples. For example "user.zip" refers to the “zip” field of a POJO which is stored in the “user” field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as "f1.user.zip" or "user.f3.1.zip".

  • You can select the full type using the "*" wildcard expressions. This does also work for types which are not Tuple or POJO types.

Field Expression Example:

  1. public static class WC {
  2. public ComplexNestedClass complex; //nested POJO
  3. private int count;
  4. // getter / setter for private field (count)
  5. public int getCount() {
  6. return count;
  7. }
  8. public void setCount(int c) {
  9. this.count = c;
  10. }
  11. }
  12. public static class ComplexNestedClass {
  13. public Integer someNumber;
  14. public float someFloat;
  15. public Tuple3<Long, Long, String> word;
  16. public IntWritable hadoopCitizen;
  17. }

These are valid field expressions for the example code above:

  • "count": The count field in the WC class.

  • "complex": Recursively selects all fields of the field complex of POJO type ComplexNestedClass.

  • "complex.word.f2": Selects the last field of the nested Tuple3.

  • "complex.hadoopCitizen": Selects the Hadoop IntWritable type.

In the example below, we have a WC POJO with two fields “word” and “count”. To group by the field word, we just pass its name to the keyBy() function.

  1. // some ordinary POJO (Plain old Java Object)
  2. class WC(var word: String, var count: Int) {
  3. def this() { this("", 0L) }
  4. }
  5. val words: DataStream[WC] = // [...]
  6. val wordCounts = words.keyBy("word").window(/*window specification*/)
  7. // or, as a case class, which is less typing
  8. case class WC(word: String, count: Int)
  9. val words: DataStream[WC] = // [...]
  10. val wordCounts = words.keyBy("word").window(/*window specification*/)

Field Expression Syntax:

  • Select POJO fields by their field name. For example "user" refers to the “user” field of a POJO type.

  • Select Tuple fields by their 1-offset field name or 0-offset field index. For example "_1" and "5" refer to the first and sixth field of a Scala Tuple type, respectively.

  • You can select nested fields in POJOs and Tuples. For example "user.zip" refers to the “zip” field of a POJO which is stored in the “user” field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as "_2.user.zip" or "user._4.1.zip".

  • You can select the full type using the "_" wildcard expressions. This does also work for types which are not Tuple or POJO types.

Field Expression Example:

  1. class WC(var complex: ComplexNestedClass, var count: Int) {
  2. def this() { this(null, 0) }
  3. }
  4. class ComplexNestedClass(
  5. var someNumber: Int,
  6. someFloat: Float,
  7. word: (Long, Long, String),
  8. hadoopCitizen: IntWritable) {
  9. def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
  10. }

These are valid field expressions for the example code above:

  • "count": The count field in the WC class.

  • "complex": Recursively selects all fields of the field complex of POJO type ComplexNestedClass.

  • "complex.word._3": Selects the last field of the nested Tuple3.

  • "complex.hadoopCitizen": Selects the Hadoop IntWritable type.

Define keys using Key Selector Functions

An additional way to define keys are “key selector” functions. A key selector functiontakes a single element as input and returns the key for the element. The key can be of any type and be derived from deterministic computations.

The following example shows a key selector function that simply returns the field of an object:

  1. // some ordinary POJO
  2. public class WC {public String word; public int count;}
  3. DataStream<WC> words = // [...]
  4. KeyedStream<WC> keyed = words
  5. .keyBy(new KeySelector<WC, String>() {
  6. public String getKey(WC wc) { return wc.word; }
  7. });
  1. // some ordinary case class
  2. case class WC(word: String, count: Int)
  3. val words: DataStream[WC] = // [...]
  4. val keyed = words.keyBy( _.word )

Specifying Transformation Functions

Most transformations require user-defined functions. This section lists different waysof how they can be specified

Implementing an interface

The most basic way is to implement one of the provided interfaces:

  1. class MyMapFunction implements MapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };
  4. data.map(new MyMapFunction());

Anonymous classes

You can pass a function as an anonymous class:

  1. data.map(new MapFunction<String, Integer> () {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. });

Java 8 Lambdas

Flink also supports Java 8 Lambdas in the Java API.

  1. data.filter(s -> s.startsWith("http://"));
  1. data.reduce((i1,i2) -> i1 + i2);

Rich functions

All transformations that require a user-defined function caninstead take as argument a rich function. For example, instead of

  1. class MyMapFunction implements MapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };

you can write

  1. class MyMapFunction extends RichMapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };

and pass the function as usual to a map transformation:

  1. data.map(new MyMapFunction());

Rich functions can also be defined as an anonymous class:

  1. data.map (new RichMapFunction<String, Integer>() {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. });

Lambda Functions

As already seen in previous examples all operations accept lambda functions for describingthe operation:

  1. val data: DataSet[String] = // [...]
  2. data.filter { _.startsWith("http://") }
  1. val data: DataSet[Int] = // [...]
  2. data.reduce { (i1,i2) => i1 + i2 }
  3. // or
  4. data.reduce { _ + _ }

Rich functions

All transformations that take as argument a lambda function caninstead take as argument a rich function. For example, instead of

  1. data.map { x => x.toInt }

you can write

  1. class MyMapFunction extends RichMapFunction[String, Int] {
  2. def map(in: String):Int = { in.toInt }
  3. };

and pass the function to a map transformation:

  1. data.map(new MyMapFunction())

Rich functions can also be defined as an anonymous class:

  1. data.map (new RichMapFunction[String, Int] {
  2. def map(in: String):Int = { in.toInt }
  3. })

Rich functions provide, in addition to the user-defined function (map,reduce, etc), four methods: open, close, getRuntimeContext, andsetRuntimeContext. These are useful for parameterizing the function(see Passing Parameters to Functions),creating and finalizing local state, accessing broadcast variables (seeBroadcast Variables), and for accessing runtimeinformation such as accumulators and counters (seeAccumulators and Counters), and informationon iterations (see Iterations).

Supported Data Types

Flink places some restrictions on the type of elements that can be in a DataSet or DataStream.The reason for this is that the system analyzes the types to determineefficient execution strategies.

There are six different categories of data types:

  • Java Tuples and Scala Case Classes
  • Java POJOs
  • Primitive Types
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

Tuples and Case Classes

Tuples are composite types that contain a fixed number of fields with various types.The Java API provides classes from Tuple1 up to Tuple25. Every field of a tuplecan be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of atuple can be accessed directly using the field’s name as tuple.f4, or using the generic getter methodtuple.getField(int position). The field indices start at 0. Note that this stands in contrastto the Scala tuples, but it is more consistent with Java’s general indexing.

  1. DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
  2. new Tuple2<String, Integer>("hello", 1),
  3. new Tuple2<String, Integer>("world", 2));
  4. wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
  5. @Override
  6. public Integer map(Tuple2<String, Integer> value) throws Exception {
  7. return value.f1;
  8. }
  9. });
  10. wordCounts.keyBy(0); // also valid .keyBy("f0")

Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as _1 for the first field. Case class fields are accessed by their name.

  1. case class WordCount(word: String, count: Int)
  2. val input = env.fromElements(
  3. WordCount("hello", 1),
  4. WordCount("world", 2)) // Case Class Data Set
  5. input.keyBy("word")// key by field expression "word"
  6. val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
  7. input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs

Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:

  • The class must be public.

  • It must have a public constructor without arguments (default constructor).

  • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().

  • The type of a field must be supported by a registered serializer.

POJOs are generally represented with a PojoTypeInfo and serialized with the PojoSerializer (using Kryo as configurable fallback).The exception is when the POJOs are actually Avro types (Avro Specific Records) or produced as “Avro Reflect Types”. In that case the POJO’s are represented by an AvroTypeInfo and serialized with the AvroSerializer.You can also register your own custom serializer if required; see Serialization for further information.

Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.

The following example shows a simple POJO with two public fields.

  1. public class WordWithCount {
  2. public String word;
  3. public int count;
  4. public WordWithCount() {}
  5. public WordWithCount(String word, int count) {
  6. this.word = word;
  7. this.count = count;
  8. }
  9. }
  10. DataStream<WordWithCount> wordCounts = env.fromElements(
  11. new WordWithCount("hello", 1),
  12. new WordWithCount("world", 2));
  13. wordCounts.keyBy("word"); // key by field expression "word"
  1. class WordWithCount(var word: String, var count: Int) {
  2. def this() {
  3. this(null, -1)
  4. }
  5. }
  6. val input = env.fromElements(
  7. new WordWithCount("hello", 1),
  8. new WordWithCount("world", 2)) // Case Class Data Set
  9. input.keyBy("word")// key by field expression "word"

Primitive Types

Flink supports all Java and Scala primitive types such as Integer, String, and Double.

General Class Types

Flink supports most Java and Scala classes (API and custom).Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other nativeresources. Classes that follow the Java Beans conventions work well in general.

All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.Flink treats these data types as black boxes and is not able to access their content (e.g., for efficient sorting). General types are de/serialized using the serialization framework Kryo.

Values

Value types describe their serialization and deserialization manually. Instead of going through ageneral purpose serialization framework, they provide custom code for those operations by means ofimplementing the org.apache.flinktypes.Value interface with the methods read and write. Usinga Value type is reasonable when general purpose serialization would be highly inefficient. Anexample would be a data type that implements a sparse vector of elements as an array. Knowing thatthe array is mostly zero, one can use a special encoding for the non-zero elements, while thegeneral purpose serialization would simply write all array elements.

The org.apache.flinktypes.CopyableValue interface supports manual internal cloning logic in asimilar way.

Flink comes with pre-defined Value types that correspond to basic data types. (ByteValue,ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue,BooleanValue). These Value types act as mutable variants of the basic data types: Their value canbe altered, allowing programmers to reuse objects and take pressure off the garbage collector.

Hadoop Writables

You can use types that implement the org.apache.hadoop.Writable interface. The serialization logicdefined in the write()and readFields() methods will be used for serialization.

Special Types

You can use special types, including Scala’s Either, Option, and Try.The Java API has its own custom implementation of Either.Similarly to Scala’s Either, it represents a value of two possible types, Left or Right.Either can be useful for error handling or operators that need to output two different types of records.

Type Erasure & Type Inference

Note: This Section is only relevant for Java.

The Java compiler throws away much of the generic type information after compilation. This isknown as type erasure in Java. It means that at runtime, an instance of an object does not knowits generic type any more. For example, instances of DataStream<String> and DataStream<Long> look thesame to the JVM.

Flink requires type information at the time when it prepares the program for execution (when themain method of the program is called). The Flink Java API tries to reconstruct the type informationthat was thrown away in various ways and store it explicitly in the data sets and operators. You canretrieve the type via DataStream.getType(). The method returns an instance of TypeInformation,which is Flink’s internal way of representing types.

The type inference has its limits and needs the “cooperation” of the programmer in some cases.Examples for that are methods that create data sets from collections, such asExecutionEnvironment.fromCollection(), where you can pass an argument that describes the type. Butalso generic functions like MapFunction<I, O> may need extra type information.

TheResultTypeQueryableinterface can be implemented by input formats and functions to tell the APIexplicitly about their return type. The input types that the functions are invoked with canusually be inferred by the result types of the previous operations.

Accumulators & Counters

Accumulators are simple constructs with an add operation and a final accumulated result,which is available after the job ended.

The most straightforward accumulator is a counter: You can increment it using theAccumulator.add(V value) method. At the end of the job Flink will sum up (merge) all partialresults and send the result to the client. Accumulators are useful during debugging or if youquickly want to find out more about your data.

Flink currently has the following built-in accumulators. Each of them implements theAccumulatorinterface.

  • IntCounter,LongCounterand DoubleCounter:See below for an example using a counter.
  • Histogram:A histogram implementation for a discrete number of bins. Internally it is just a map from Integerto Integer. You can use this to compute distributions of values, e.g. the distribution ofwords-per-line for a word count program.How to use accumulators:

First you have to create an accumulator object (here a counter) in the user-defined transformationfunction where you want to use it.

  1. private IntCounter numLines = new IntCounter();

Second you have to register the accumulator object, typically in the open() method of therich function. Here you also define the name.

  1. getRuntimeContext().addAccumulator("num-lines", this.numLines);

You can now use the accumulator anywhere in the operator function, including in the open() andclose() methods.

  1. this.numLines.add(1);

The overall result will be stored in the JobExecutionResult object which isreturned from the execute() method of the execution environment(currently this only works if the execution waits for thecompletion of the job).

  1. myJobExecutionResult.getAccumulatorResult("num-lines")

All accumulators share a single namespace per job. Thus you can use the same accumulator indifferent operator functions of your job. Flink will internally merge all accumulators with the samename.

A note on accumulators and iterations: Currently the result of accumulators is only available afterthe overall job has ended. We plan to also make the result of the previous iteration available in thenext iteration. You can useAggregatorsto compute per-iteration statistics and base the termination of iterations on such statistics.

Custom accumulators:

To implement your own accumulator you simply have to write your implementation of the Accumulatorinterface. Feel free to create a pull request if you think your custom accumulator should be shippedwith Flink.

You have the choice to implement eitherAccumulatoror SimpleAccumulator.

Accumulator<V,R> is most flexible: It defines a type V for the value to add, and aresult type R for the final result. E.g. for a histogram, V is a number and R is a histogram. SimpleAccumulator is for the cases where both types are the same, e.g. for counters.