Hadoop formats

Project Configuration

对 Hadoop 的支持位于 flink-hadoop-compatibility Maven 模块中。

将以下依赖添加到 pom.xml 中使用 hadoop

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.12</artifactId>
  4. <version>1.15.0</version>
  5. </dependency>

如果你想在本地运行你的 Flink 应用(例如在 IDE 中),你需要按照如下所示将 hadoop-client 依赖也添加到 pom.xml

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>2.8.5</version>
  5. <scope>provided</scope>
  6. </dependency>

Using Hadoop InputFormats

在 Flink 中使用 Hadoop InputFormats,必须首先使用 HadoopInputs 工具类的 readHadoopFilecreateHadoopInput 包装 Input Format。 前者用于从 FileInputFormat 派生的 Input Format,而后者必须用于通用的 Input Format。 生成的 InputFormat 可通过使用 ExecutionEnvironmen#createInput 创建数据源。

生成的 DataStream 包含 2 元组,其中第一个字段是键,第二个字段是从 Hadoop InputFormat 接收的值。

下面的示例展示了如何使用 Hadoop 的 TextInputFormat

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<Tuple2<LongWritable, Text>> input =
  3. env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
  4. LongWritable.class, Text.class, textPath));
  5. // 对数据进行一些处理。
  6. [...]

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val input: DataStream[(LongWritable, Text)] =
  3. env.createInput(HadoopInputs.readHadoopFile(
  4. new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
  5. // 对数据进行一些处理。
  6. [...]

Using Hadoop OutputFormats

Flink 为 Hadoop OutputFormats 提供了一个兼容性包装器。支持任何实现 org.apache.hadoop.mapred.OutputFormat 或扩展 org.apache.hadoop.mapreduce.OutputFormat 的类。 OutputFormat 包装器期望其输入数据是包含键和值的 2-元组的 DataSet。这些将由 Hadoop OutputFormat 处理。

下面的示例展示了如何使用 Hadoop 的 TextOutputFormat

Java

  1. // 获取我们希望发送的结果
  2. DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...];
  3. // 设置 the Hadoop TextOutputFormat。
  4. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  5. // 创建 Flink wrapper.
  6. new HadoopOutputFormat<Text, IntWritable>(
  7. // 设置 Hadoop OutputFormat 并指定 job。
  8. new TextOutputFormat<Text, IntWritable>(), job
  9. );
  10. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  11. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  12. // 使用 Hadoop TextOutputFormat 发送数据。
  13. hadoopResult.output(hadoopOF);

Scala

  1. // 获取我们希望发送的结果
  2. val hadoopResult: DataStream[(Text, IntWritable)] = [...]
  3. val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  4. new TextOutputFormat[Text, IntWritable],
  5. new JobConf)
  6. hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
  7. FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
  8. hadoopResult.output(hadoopOF)