RabbitMQ Connector

License of the RabbitMQ Connector

Flink’s RabbitMQ connector defines a Maven dependency on the “RabbitMQ AMQP Java Client”, is triple-licensed under the Mozilla Public License 1.1 (“MPL”), the GNU General Public License version 2 (“GPL”) and the Apache License version 2 (“ASL”).

Flink itself neither reuses source code from the “RabbitMQ AMQP Java Client” nor packages binaries from the “RabbitMQ AMQP Java Client”.

Users that create and publish derivative work based on Flink’s RabbitMQ connector (thereby re-distributing the “RabbitMQ AMQP Java Client”) must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 (“MPL”), the GNU General Public License version 2 (“GPL”) and the Apache License version 2 (“ASL”).

RabbitMQ Connector

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-rabbitmq_2.11</artifactId>
  4. <version>1.14.4</version>
  5. </dependency>

Copied to clipboard!

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

This connector provides a RMQSource class to consume messages from a RabbitMQ queue. This source provides three different levels of guarantees, depending on how it is configured with Flink:

  1. Exactly-once: In order to achieve exactly-once guarantees with the RabbitMQ source, the following is required -
  • Enable checkpointing: With checkpointing enabled, messages are only acknowledged (hence, removed from the RabbitMQ queue) when checkpoints are completed.
  • Use correlation ids: Correlation ids are a RabbitMQ application feature. You have to set it in the message properties when injecting messages into RabbitMQ. The correlation id is used by the source to deduplicate any messages that have been reprocessed when restoring from a checkpoint.
  • Non-parallel source: The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
  1. At-least-once: When checkpointing is enabled, but correlation ids are not used or the source is parallel, the source only provides at-least-once guarantees.

  2. No guarantee: If checkpointing isn’t enabled, the source does not have any strong delivery guarantees. Under this setting, instead of collaborating with Flink’s checkpointing, messages will be automatically acknowledged once the source receives and processes them.

Below is a code example for setting up an exactly-once RabbitMQ source. Inline comments explain which parts of the configuration can be ignored for more relaxed guarantees.

Java

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...);
  4. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build();
  9. final DataStream<String> stream = env
  10. .addSource(new RMQSource<String>(
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1); // non-parallel source is only required for exactly-once

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...)
  4. val connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build
  9. val stream = env
  10. .addSource(new RMQSource[String](
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1) // non-parallel source is only required for exactly-once

Quality of Service (QoS) / Consumer Prefetch

The RabbitMQ Source provides a simple way to set the basicQos on the source’s channel through the RMQConnectionConfig. Since there is one connection/ channel per-parallel source, this prefetch count will effectively be multiplied by the source’s parallelism for how many total unacknowledged messages can be sent to the job at one time. If more complex configuration is required, RMQSource#setupChannel(Connection) can be overridden and manually configured.

Java

  1. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  2. .setPrefetchCount(30_000)
  3. ...
  4. .build();

Scala

  1. val connectionConfig = new RMQConnectionConfig.Builder()
  2. .setPrefetchCount(30000)
  3. ...
  4. .build

The prefetch count is unset by default, meaning the RabbitMQ server will send unlimited messages. In production, it is best to set this value. For high volume queues and checkpointing enabled, some tuning may be required to reduce wasted cycles, as messages are only acknowledged on checkpoints if enabled.

More about QoS and prefetch can be found here and more about the options available in AMQP 0-9-1 here.

RabbitMQ Sink

This connector provides a RMQSink class for sending messages to a RabbitMQ queue. Below is a code example for setting up a RabbitMQ sink.

Java

  1. final DataStream<String> stream = ...
  2. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build();
  7. stream.addSink(new RMQSink<String>(
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema())); // serialization schema to turn Java objects to messages

Scala

  1. val stream: DataStream[String] = ...
  2. val connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build
  7. stream.addSink(new RMQSink[String](
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema)) // serialization schema to turn Java objects to messages

More about RabbitMQ can be found here.