Apache NiFi 连接器

The NiFi connector is deprecated and will be removed with Flink 1.16.

Apache NiFi 连接器提供了可以读取和写入的 Source 和 Sink。 使用这个连接器,需要在工程中添加下面的依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-nifi</artifactId>
  4. <version>1.15.0</version>
  5. </dependency>

Copied to clipboard!

注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 这里

安装 Apache NiFi

安装 Apache NiFi 集群请参考 这里

Apache NiFi Source

该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。

NiFiSource(…) 类有两个构造方法。

  • NiFiSource(SiteToSiteConfig config) - 构造一个 NiFiSource(…) ,需要指定参数 SiteToSiteConfig ,采用默认的等待时间 1000 ms。

  • NiFiSource(SiteToSiteConfig config, long waitTimeMs) - 构造一个 NiFiSource(…),需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。

示例:

Java

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);

Scala

  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSource = new NiFiSource(clientConfig)

数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 “Data for Flink”,是 Apache NiFi Site-to-site 协议配置的一部分。

Apache NiFi Sink

该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。

NiFiSink(…) 类只有一个构造方法。

  • NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>) 构造一个 NiFiSink(…),需要指定 SiteToSiteConfigNiFiDataPacketBuilder 参数 ,NiFiDataPacketBuilder 可以将Flink数据转化成可以被NiFi识别的 NiFiDataPacket.

示例:

Java

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
  8. streamExecEnv.addSink(nifiSink);

Scala

  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
  8. streamExecEnv.addSink(nifiSink)

更多关于 Apache NiFi Site-to-Site Protocol 的信息请参考 这里