RabbitMQ 连接器

RabbitMQ 连接器的许可证

Flink 的 RabbitMQ 连接器依赖了 “RabbitMQ AMQP Java Client”,它基于三种协议下发行:Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”) 和 Apache License version 2 (“ASL”)。

Flink 自身既没有复用 “RabbitMQ AMQP Java Client” 的代码,也没有将 “RabbitMQ AMQP Java Client” 打二进制包。

如果用户发布的内容是基于 Flink 的 RabbitMQ 连接器的(进而重新发布了 “RabbitMQ AMQP Java Client” ),那么一定要注意这可能会受到 Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”)、Apache License version 2 (“ASL”) 协议的限制.

RabbitMQ 连接器

这个连接器可以访问 RabbitMQ 的数据流。使用这个连接器,需要在工程里添加下面的依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-rabbitmq</artifactId>
  4. <version>1.15.0</version>
  5. </dependency>

Copied to clipboard!

注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 这里.

安装 RabbitMQ

安装 RabbitMQ 请参考 RabbitMQ 下载页面。安装完成之后,服务会自动拉起,应用程序就可以尝试连接到 RabbitMQ 了。

RabbitMQ Source

RMQSource 负责从 RabbitMQ 中消费数据,可以配置三种不同级别的保证:

  1. 精确一次: 保证精确一次需要以下条件 -
  • 开启 checkpointing: 开启 checkpointing 之后,消息在 checkpoints 完成之后才会被确认(然后从 RabbitMQ 队列中删除).
  • 使用关联标识(Correlation ids): 关联标识是 RabbitMQ 的一个特性,消息写入 RabbitMQ 时在消息属性中设置。 从 checkpoint 恢复时有些消息可能会被重复处理,source 可以利用关联标识对消息进行去重。
  • 非并发 source: 为了保证精确一次的数据投递,source 必须是非并发的(并行度设置为1)。 这主要是由于 RabbitMQ 分发数据时是从单队列向多个消费者投递消息的。
  1. 至少一次: 在 checkpointing 开启的条件下,如果没有使用关联标识或者 source 是并发的, 那么 source 就只能提供至少一次的保证。

  2. 无任何保证: 如果没有开启 checkpointing,source 就不能提供任何的数据投递保证。 使用这种设置时,source 一旦接收到并处理消息,消息就会被自动确认。

下面是一个保证 exactly-once 的 RabbitMQ source 示例。 注释部分展示了更加宽松的保证应该如何配置。

Java

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...);
  4. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build();
  9. final DataStream<String> stream = env
  10. .addSource(new RMQSource<String>(
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1); // non-parallel source is only required for exactly-once

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...)
  4. val connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build
  9. val stream = env
  10. .addSource(new RMQSource[String](
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1) // non-parallel source is only required for exactly-once

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. # checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enable_checkpointing(...)
  4. connection_config = RMQConnectionConfig.Builder() \
  5. .set_host("localhost") \
  6. .set_port(5000) \
  7. ...
  8. .build()
  9. stream = env \
  10. .add_source(RMQSource(
  11. connection_config,
  12. "queueName",
  13. True,
  14. SimpleStringSchema(),
  15. )) \
  16. .set_parallelism(1)

服务质量 (QoS) / 消费者预取(Consumer Prefetch)

RabbitMQ Source 通过 RMQConnectionConfig 类提供了一种简单的方式,来设置 source channel 上的 basicQos(见下方示例)。要注意的是这里的 prefetch count 是对单个 channel 设置的,并且由于每个并发的 source 都持有一个 connection/channel,因此这个值实际上会乘以 source 的并行度,来表示同一时间可以向这个 job 总共发送多少条未确认的消息。如果需要更复杂的配置,可以通过重写 RMQSource#setupChannel(Connection) 方法来实现手动配置。

Java

  1. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  2. .setPrefetchCount(30_000)
  3. ...
  4. .build();

Scala

  1. val connectionConfig = new RMQConnectionConfig.Builder()
  2. .setPrefetchCount(30000)
  3. ...
  4. .build

RabbitMQ Source 默认情况下是不设置 prefetch count 的,这意味着 RabbitMQ 服务器将会无限制地向 source 发送消息。因此在生产环境中,最好要设置它。当消费海量数据的队列并且启用 checkpointing 时,消息只有在做完 checkpoint 后才会被确认,因此也许需要对 prefetch count 做一些调整来减少不必要的循环。

更多关于 QoS 以及 prefetch 相关的内容可以参考 这里. 更多关于在 AMQP 0-9-1 中可选的选项可以参考 这里.

RabbitMQ Sink

该连接器提供了一个 RMQSink 类,用来向 RabbitMQ 队列发送数据。下面是设置 RabbitMQ sink 的代码示例:

Java

  1. final DataStream<String> stream = ...
  2. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build();
  7. stream.addSink(new RMQSink<String>(
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema())); // serialization schema to turn Java objects to messages

Scala

  1. val stream: DataStream[String] = ...
  2. val connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build
  7. stream.addSink(new RMQSink[String](
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema)) // serialization schema to turn Java objects to messages

更多关于 RabbitMQ 的信息请参考 这里.