DataStream API Tutorial

In this guide we will start from scratch and go from setting up a Flink project to runninga streaming analysis program on a Flink cluster.

Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going toread this channel in Flink and count the number of bytes that each user edits withina given window of time. This is easy enough to implement in a few minutes using Flink, but it willgive you a good foundation from which to start building more complex analysis programs on your own.

Setting up a Maven Project

We are going to use a Flink Maven Archetype for creating our project structure. Pleasesee Java API Quickstart for more detailsabout this. For our purposes, the command to run is this:

  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.9.0 \
  5. -DgroupId=wiki-edits \
  6. -DartifactId=wiki-edits \
  7. -Dversion=0.1 \
  8. -Dpackage=wikiedits \
  9. -DinteractiveMode=false

You can edit the groupId, artifactId and package if you like. With the above parameters,Maven will create a project structure that looks like this:

  1. $ tree wiki-edits
  2. wiki-edits/
  3. ├── pom.xml
  4. └── src
  5. └── main
  6. ├── java
  7. └── wikiedits
  8. ├── BatchJob.java
  9. └── StreamingJob.java
  10. └── resources
  11. └── log4j.properties

There is our pom.xml file that already has the Flink dependencies added in the root directory andseveral example Flink programs in src/main/java. We can delete the example programs, sincewe are going to start from scratch:

  1. $ rm wiki-edits/src/main/java/wikiedits/*.java

As a last step we need to add the Flink Wikipedia connector as a dependency so that we canuse it in our program. Edit the dependencies section of the pom.xml so that it looks like this:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.11</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.11</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-wikiedits_2.11</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

Notice the flink-connector-wikiedits2.11 dependency that was added. (This example andthe Wikipedia connector were inspired by the _Hello Samza example of Apache Samza.)

It’s coding time. Fire up your favorite IDE and import the Maven project or open a text editor andcreate the file src/main/java/wikiedits/WikipediaAnalysis.java:

  1. package wikiedits;
  2. public class WikipediaAnalysis {
  3. public static void main(String[] args) throws Exception {
  4. }
  5. }

The program is very basic now, but we will fill it in as we go. Note that I’ll not giveimport statements here since IDEs can add them automatically. At the end of this section I’ll showthe complete code with import statements if you simply want to skip ahead and enter that in youreditor.

The first step in a Flink program is to create a StreamExecutionEnvironment(or ExecutionEnvironment if you are writing a batch job). This can be used to set executionparameters and create sources for reading from external systems. So let’s go ahead and addthis to the main method:

  1. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

Next we will create a source that reads from the Wikipedia IRC log:

  1. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

This creates a DataStream of WikipediaEditEvent elements that we can further process. Forthe purposes of this example we are interested in determining the number of added or removedbytes that each user causes in a certain time window, let’s say five seconds. For this we firsthave to specify that we want to key the stream on the user name, that is to say that operationson this stream should take the user name into account. In our case the summation of edited bytes in the windowsshould be per unique user. For keying a Stream we have to provide a KeySelector, like this:

  1. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  2. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  3. @Override
  4. public String getKey(WikipediaEditEvent event) {
  5. return event.getUser();
  6. }
  7. });

This gives us a Stream of WikipediaEditEvent that has a String key, the user name.We can now specify that we want to have windows imposed on this stream and compute aresult based on elements in these windows. A window specifies a slice of a Streamon which to perform a computation. Windows are required when computing aggregationson an infinite stream of elements. In our example we will saythat we want to aggregate the sum of edited bytes for every five seconds:

  1. DataStream<Tuple2<String, Long>> result = keyedEdits
  2. .timeWindow(Time.seconds(5))
  3. .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
  4. @Override
  5. public Tuple2<String, Long> createAccumulator() {
  6. return new Tuple2<>("", 0L);
  7. }
  8. @Override
  9. public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
  10. accumulator.f0 = value.getUser();
  11. accumulator.f1 += value.getByteDiff();
  12. return accumulator;
  13. }
  14. @Override
  15. public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
  16. return accumulator;
  17. }
  18. @Override
  19. public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
  20. return new Tuple2<>(a.f0, a.f1 + b.f1);
  21. }
  22. });

The first call, .timeWindow(), specifies that we want to have tumbling (non-overlapping) windowsof five seconds. The second call specifies a Aggregate transformation on each window slice foreach unique key. In our case we start from an initial value of ("", 0L) and add to it the bytedifference of every edit in that time window for a user. The resulting Stream now containsa Tuple2<String, Long> for every user which gets emitted every five seconds.

The only thing left to do is print the stream to the console and start execution:

  1. result.print();
  2. see.execute();

That last call is necessary to start the actual Flink job. All operations, such as creatingsources, transformations and sinks only build up a graph of internal operations. Only whenexecute() is called is this graph of operations thrown on a cluster or executed on your localmachine.

The complete code so far is this:

  1. package wikiedits;
  2. import org.apache.flink.api.common.functions.AggregateFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
  10. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
  11. public class WikipediaAnalysis {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  14. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
  15. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  16. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  17. @Override
  18. public String getKey(WikipediaEditEvent event) {
  19. return event.getUser();
  20. }
  21. });
  22. DataStream<Tuple2<String, Long>> result = keyedEdits
  23. .timeWindow(Time.seconds(5))
  24. .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
  25. @Override
  26. public Tuple2<String, Long> createAccumulator() {
  27. return new Tuple2<>("", 0L);
  28. }
  29. @Override
  30. public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
  31. accumulator.f0 = value.getUser();
  32. accumulator.f1 += value.getByteDiff();
  33. return accumulator;
  34. }
  35. @Override
  36. public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
  37. return accumulator;
  38. }
  39. @Override
  40. public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
  41. return new Tuple2<>(a.f0, a.f1 + b.f1);
  42. }
  43. });
  44. result.print();
  45. see.execute();
  46. }
  47. }

You can run this example in your IDE or on the command line, using Maven:

  1. $ mvn clean package
  2. $ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

The first command builds our project and the second executes our main class. The output should besimilar to this:

  1. 1> (Fenix down,114)
  2. 6> (AnomieBOT,155)
  3. 8> (BD2412bot,-3690)
  4. 7> (IgnorantArmies,49)
  5. 3> (Ckh3111,69)
  6. 5> (Slade360,0)
  7. 7> (Narutolovehinata5,2195)
  8. 6> (Vuyisa2001,79)
  9. 4> (Ms Sarah Welch,269)
  10. 4> (KasparBot,-245)

The number in front of each line tells you on which parallel instance of the print sink the outputwas produced.

This should get you started with writing your own Flink programs. To learn moreyou can check out our guidesabout basic concepts and theDataStream API. Stickaround for the bonus exercise if you want to learn about setting up a Flink cluster onyour own machine and writing results to Kafka.

Bonus Exercise: Running on a Cluster and Writing to Kafka

Please follow our local setup tutorial for setting up a Flink distributionon your machine and refer to the Kafka quickstartfor setting up a Kafka installation before we proceed.

As a first step, we have to add the Flink Kafka connector as a dependency so that we canuse the Kafka sink. Add this to the pom.xml file in the dependencies section:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

Next, we need to modify our program. We’ll remove the print() sink and instead use aKafka sink. The new code looks like this:

  1. result
  2. .map(new MapFunction<Tuple2<String,Long>, String>() {
  3. @Override
  4. public String map(Tuple2<String, Long> tuple) {
  5. return tuple.toString();
  6. }
  7. })
  8. .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

The related classes also need to be imported:

  1. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.common.functions.MapFunction;

Note how we first transform the Stream of Tuple2<String, Long> to a Stream of String usinga MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,we create a Kafka sink. You might have to adapt the hostname and port to your setup. "wiki-result"is the name of the Kafka stream that we are going to create next, before running our program.Build the project using Maven because we need the jar file for running on the cluster:

  1. $ mvn clean package

The resulting jar file will be in the target subfolder: target/wiki-edits-0.1.jar. We’ll usethis later.

Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Goto the location where you installed Flink and start a local cluster:

  1. $ cd my/flink/directory
  2. $ bin/start-cluster.sh

We also have to create the Kafka Topic, so that our program can write to it:

  1. $ cd my/kafka/directory
  2. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results

Now we are ready to run our jar file on the local Flink cluster:

  1. $ cd my/flink/directory
  2. $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

The output of that command should look similar to this, if everything went according to plan:

  1. 03/08/2016 15:09:27 Job execution switched to status RUNNING.
  2. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
  3. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
  4. 03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED
  5. 03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
  6. 03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
  7. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

You can see how the individual operators start running. There are only two, becausethe operations after the window get folded into one operation for performance reasons. In Flinkwe call this chaining.

You can observe the output of the program by inspecting the Kafka topic using the Kafkaconsole consumer:

  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result

You can also check out the Flink dashboard which should be running at http://localhost:8081.You get an overview of your cluster resources and running jobs:

JobManager Overview

If you click on your running job you will get a view where you can inspect individual operationsand, for example, see the number of processed elements:

Example Job View

This concludes our little tour of Flink. If you have any questions, please don’t hesitate to ask on our Mailing Lists.