风暴兼容性Beta

Flink流与Apache Storm接口兼容,因此允许重用为Storm实现的代码。

您可以:

  • Topology在Flink 执行整个Storm 。
  • 在Flink流处理节目中使用Storm Spout/ Bolt作为源/算子。本文档介绍了如何在Flink中使用现有的Storm代码。

项目配置

支持Storm包含在flink-stormMaven模块中。代码驻留在org.apache.flink.storm包中。

pom.xml如果要在Flink中执行Storm代码,请将以下依赖项添加到您的

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-storm_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

请注意:不要添加storm-core为依赖项。它已包含在内flink-storm

请注意flink-storm不是提供的二进制Flink发行版的一部分。因此,您需要flink-storm在提交给Flink的JobManager的程序jar(也称为uber-jar或fat-jar)中包含类(及其依赖项)。字计数风暴flink-storm-examples/pom.xml的一个例子,如何正确地打包罐。

如果你想避免大尤伯杯罐子,你可以手动复制storm-core-0.9.4.jarjson-simple-1.1.jarflink-storm-1.7-SNAPSHOT.jar进入Flink的lib/每个群集节点的文件夹(之前在启动群集)。对于这种情况,仅将您自己的Spout和Bolt类(及其内部依赖项)包含在程序jar中就足够了。

执行Storm拓扑

Flink提供与Storm兼容的API(org.apache.flink.storm.api),它可以替代以下类:

  • StormSubmitter 取而代之 FlinkSubmitter
  • NimbusClientClient替换为FlinkClient
  • LocalCluster 取而代之 FlinkLocalCluster为了向Flink提交Storm拓扑,只需使用组装拓扑的Storm 客户端代码中的Flink替换来替换使用过的Storm类实际的运行时代码,即Spouts和Bolts,可以不加修改地使用如果拓扑在远程集群执行时,参数nimbus.hostnimbus.thrift.port被用作jobmanger.rpc.addressjobmanger.rpc.port分别。如果未指定参数,则取值flink-conf.yaml

  • Java

  1. TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
  2. // actual topology assembling code and used Spouts/Bolts can be used as-is
  3. builder.setSpout("source", new FileSpout(inputFilePath));
  4. builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
  5. builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
  6. builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
  7. Config conf = new Config();
  8. if(runLocal) { // submit to test cluster
  9. // replaces: LocalCluster cluster = new LocalCluster();
  10. FlinkLocalCluster cluster = new FlinkLocalCluster();
  11. cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
  12. } else { // submit to remote cluster
  13. // optional
  14. // conf.put(Config.NIMBUS_HOST, "remoteHost");
  15. // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
  16. // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
  17. FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
  18. }

在Flink流程序中嵌入Storm算子

作为替代方案,Spouts和Bolts可以嵌入到常规流处理节目中。Storm兼容层为每个提供了一个打包类,即SpoutWrapperBoltWrapperorg.apache.flink.storm.wrappers)。

每默认情况下,打包转换风暴输出元组Flink的元组类型(即,Tuple0Tuple25根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如,String代替Tuple1<String>)。

由于Flink无法推断Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的TypeInformation对象,TypeExtractor可以使用Flink

嵌入Spouts

要将Spout用作Flink源,请使用StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)Spout对象被传递给它的构造函数SpoutWrapper<OUT>,作为第一个参数addSource(…)泛型类型声明OUT指定源输出流的类型。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // stream has `raw` type (single field output streams only)
  3. DataStream<String> rawInput = env.addSource(
  4. new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
  5. TypeExtractor.getForClass(String.class)); // output type
  6. // process data stream
  7. [...]

如果Spout发出有限数量的元组,SpoutWrapper可以通过numberOfInvocations在其构造函数中设置参数来配置为自动终止这允许Flink程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动取消

嵌入螺栓

要使用Bolt作为Flink 算子,请使用DataStream.transform(String, TypeInformation, OneInputStreamOperator)Bolt对象被传递给它的构造函数BoltWrapper<IN,OUT>,作为最后一个参数transform(…)泛型类型声明IN并分别OUT指定 算子的输入和输出流的类型。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = env.readTextFile(localFilePath);
  3. DataStream<Tuple2<String, Integer>> counts = text.transform(
  4. "tokenizer", // operator name
  5. TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
  6. new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
  7. // do further processing
  8. [...]

嵌入式螺栓的命名属性访问

螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有a

  • POJO类型输入流或
  • 元组类型输入流并指定输入模式(即名称到索引映射)对于POJO输入类型,Flink通过反射访问字段。对于这种情况,Flink期望相应的公共成员变量或公共getter方法。例如,如果Bolt通过名称sentence(例如String s = input.getStringByField("sentence");访问字段,则输入POJO类必须具有成员变量public String sentence;或方法public String getSentence() { … };(注意驼峰式命名)。

对于Tuple输入类型,需要使用Storm的Fields指定输入模式对于这种情况,构造函数BoltWrapper需要另外一个参数:new BoltWrapper<Tuple1<String>, …>(…, new Fields("sentence"))输入类型是Tuple1<String>Fields("sentence")指定input.getStringByField("sentence")相当于input.getString(0)

有关示例请参阅BoltTokenizerWordCountPojoBoltTokenizerWordCountWithNames

配置喷口和螺栓

在Storm中,Spouts和Bolts可以配置一个全局分布的Map对象,该对象被赋予submitTopology(…)方法LocalClusterStormSubmitterMap是由拓扑旁边的用户提供的,并作为参数转发给呼叫Spout.open(…)Bolt.prepare(…)如果在Flink中使用FlinkTopologyBuilder执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,必须使用Flink的配置机制。可以在StreamExecutionEnvironmentvia中设置全局配置.getConfig().setGlobalJobParameters(…)Flink的常规Configuration课程可用于配置Spouts和Bolts。但是,Configuration不像Storm那样支持任意Keys数据类型(只String允许Keys)。因此,Flink还提供StormConfig了可以像raw一样使用的类,Map以提供与Storm的完全兼容性。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StormConfig config = new StormConfig();
  3. // set config values
  4. [...]
  5. // set global Storm configuration
  6. env.getConfig().setGlobalJobParameters(config);
  7. // assemble program with embedded Spouts and/or Bolts
  8. [...]

多输出流

Flink还可以处理Spout和Bolts的多个输出流的声明。如果在Flink中使用FlinkTopologyBuilder执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,输出流将是数据类型SplitStreamType<T>,必须使用DataStream.split(…)拆分SplitStream.select(…)Flink提供预定义输出选择StormStreamSelector<T>.split(…)已经。此外,SplitStreamTuple<T>可以使用除去打包类型SplitStreamMapper<T>

  1. [...]
  2. // get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
  3. DataStream<SplitStreamType<SomeType>> multiStream = ...
  4. SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
  5. // remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
  6. DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
  7. DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
  8. // do further processing on s1 and s2
  9. [...]

有关完整示例,请参阅SpoutSplitExample.java

Flink Extensions

有限的喷口

在Flink中,流处理源可以是有限的,即发出有限数量的记录并在发出最后一条记录后停止。但是,Spouts通常会发出无限的流。两种方法之间的桥接是FiniteSpout除了IRichSpout包含reachedEnd()方法之外接口,其中用户可以指定停止条件。用户可以通过实现此接口而不是(或另外)来创建有限Spout IRichSpout,并reachedEnd()另外实现该方法。SpoutWrapper配置为发出有限数量的元组的FiniteSpout接口相比接口允许实现更复杂的终止标准。

尽管有限的Spout不需要将Spouts嵌入到Flink流程序中或向Flink提交整个Storm拓扑,但有些情况下它们可能会派上用场:

  • 实现原生Spout的行为与有限Flink源相同,只需要很少的修改
  • 用户想要只处理一段时间; 之后,Spout可以自动停止
  • 将文件读入流中
  • 用于测试目的有限Spout的示例,仅发出10秒的记录:

  • Java

  1. public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
  2. [...] // implement open(), nextTuple(), ...
  3. private long starttime = System.currentTimeMillis();
  4. public boolean reachedEnd() {
  5. return System.currentTimeMillis() - starttime > 10000l;
  6. }
  7. }

Storm兼容性示例

您可以在Maven模块中找到更多示例flink-storm-examples有关不同版本的WordCount,请参阅README.md要运行示例,您需要组装正确的jar文件。flink-storm-examples-1.7-SNAPSHOT.jarno / not作业执行有效的jar文件(这仅仅是一个标准的Maven神器)。

有嵌入式喷口和螺栓,即例如罐WordCount-SpoutSource.jarWordCount-BoltTokenizer.jar分别。比较pom.xml看两个罐子是如何构建的。此外,整个Storm拓扑(WordCount-StormTopology.jar有一个例子

您可以通过运行这些示例中的每一个bin/flink run <jarname>.jar每个jar的清单文件中都包含正确的入口点类。