Hadoop 兼容 Beta

Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allowsreusing code that was implemented for Hadoop MapReduce.

You can:

This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to theConnecting to other systems guide for reading from Hadoop supported file systems.

Project Configuration

Support for Hadoop input/output formats is part of the flink-java andflink-scala Maven modules that are always required when writing Flink jobs.The code is located in org.apache.flink.api.java.hadoop andorg.apache.flink.api.scala.hadoop in an additional sub-package for themapred and mapreduce API.

Support for Hadoop Mappers and Reducers is contained in the flink-hadoop-compatibilityMaven module.This code resides in the org.apache.flink.hadoopcompatibilitypackage.

Add the following dependency to your pom.xml if you want to reuse Mappersand Reducers.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

See also how to configure hadoop dependencies.

Using Hadoop InputFormats

To use Hadoop InputFormats with Flink the format must first be wrappedusing either readHadoopFile or createHadoopInput of theHadoopInputs utility class.The former is used for input formats derivedfrom FileInputFormat while the latter has to be used for general purposeinput formats.The resulting InputFormat can be used to create a data source by usingExecutionEnvironmen#createInput.

The resulting DataSet contains 2-tuples where the first fieldis the key and the second field is the value retrieved from the HadoopInputFormat.

The following example shows how to use Hadoop’s TextInputFormat.

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<Tuple2<LongWritable, Text>> input =
  3. env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
  4. LongWritable.class, Text.class, textPath));
  5. // Do something with the data.
  6. [...]
  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. val input: DataSet[(LongWritable, Text)] =
  3. env.createInput(HadoopInputs.readHadoopFile(
  4. new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
  5. // Do something with the data.
  6. [...]

Using Hadoop OutputFormats

Flink provides a compatibility wrapper for Hadoop OutputFormats. Any classthat implements org.apache.hadoop.mapred.OutputFormat or extendsorg.apache.hadoop.mapreduce.OutputFormat is supported.The OutputFormat wrapper expects its input data to be a DataSet containing2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.

The following example shows how to use Hadoop’s TextOutputFormat.

  1. // Obtain the result we want to emit
  2. DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
  3. // Set up the Hadoop TextOutputFormat.
  4. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  5. // create the Flink wrapper.
  6. new HadoopOutputFormat<Text, IntWritable>(
  7. // set the Hadoop OutputFormat and specify the job.
  8. new TextOutputFormat<Text, IntWritable>(), job
  9. );
  10. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  11. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  12. // Emit data using the Hadoop TextOutputFormat.
  13. hadoopResult.output(hadoopOF);
  1. // Obtain your result to emit.
  2. val hadoopResult: DataSet[(Text, IntWritable)] = [...]
  3. val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  4. new TextOutputFormat[Text, IntWritable],
  5. new JobConf)
  6. hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
  7. FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
  8. hadoopResult.output(hadoopOF)

Using Hadoop Mappers and Reducers

Hadoop Mappers are semantically equivalent to Flink’s FlatMapFunctions and Hadoop Reducers are equivalent to Flink’s GroupReduceFunctions. Flink provides wrappers for implementations of Hadoop MapReduce’s Mapper and Reducer interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop’s mapred API (org.apache.hadoop.mapred) are supported.

The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.

Flink’s function wrappers are

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

and can be used as regular Flink FlatMapFunctions or GroupReduceFunctions.

The following example shows how to use Hadoop Mapper and Reducer functions.

  1. // Obtain data to process somehow.
  2. DataSet<Tuple2<LongWritable, Text>> text = [...]
  3. DataSet<Tuple2<Text, LongWritable>> result = text
  4. // use Hadoop Mapper (Tokenizer) as MapFunction
  5. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  6. new Tokenizer()
  7. ))
  8. .groupBy(0)
  9. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  10. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  11. new Counter(), new Counter()
  12. ));

Please note: The Reducer wrapper works on groups as defined by Flink’s groupBy() operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the JobConf.

Complete Hadoop WordCount Example

The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // Set up the Hadoop TextInputFormat.
  3. Job job = Job.getInstance();
  4. HadoopInputFormat<LongWritable, Text> hadoopIF =
  5. new HadoopInputFormat<LongWritable, Text>(
  6. new TextInputFormat(), LongWritable.class, Text.class, job
  7. );
  8. TextInputFormat.addInputPath(job, new Path(inputPath));
  9. // Read data using the Hadoop TextInputFormat.
  10. DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
  11. DataSet<Tuple2<Text, LongWritable>> result = text
  12. // use Hadoop Mapper (Tokenizer) as MapFunction
  13. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  14. new Tokenizer()
  15. ))
  16. .groupBy(0)
  17. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  18. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  19. new Counter(), new Counter()
  20. ));
  21. // Set up the Hadoop TextOutputFormat.
  22. HadoopOutputFormat<Text, LongWritable> hadoopOF =
  23. new HadoopOutputFormat<Text, LongWritable>(
  24. new TextOutputFormat<Text, LongWritable>(), job
  25. );
  26. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  27. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  28. // Emit data using the Hadoop TextOutputFormat.
  29. result.output(hadoopOF);
  30. // Execute Program
  31. env.execute("Hadoop WordCount");