Apache Kafka Connector

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.The Flink Kafka Consumer integrates with Flink’s checkpointing mechanism to provideexactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka’s consumer groupoffset tracking, but tracks and checkpoints these offsets internally as well.

Please pick a package (maven artifact id) and class name for your use-case and environment.For most users, the FlinkKafkaConsumer08 (part of flink-connector-kafka) is appropriate.

Maven DependencySupported sinceConsumer and Producer Class nameKafka versionNotes
flink-connector-kafka-0.8_2.111.0.0FlinkKafkaConsumer08 FlinkKafkaProducer080.8.xUses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.111.0.0FlinkKafkaConsumer09 FlinkKafkaProducer090.9.xUses the new Consumer API Kafka.
flink-connector-kafka-0.10_2.111.2.0FlinkKafkaConsumer010 FlinkKafkaProducer0100.10.xThis connector supports Kafka messages with timestamps both for producing and consuming.
flink-connector-kafka-0.11_2.111.4.0FlinkKafkaConsumer011 FlinkKafkaProducer0110.11.xSince 0.11.x Kafka does not support scala 2.10. This connector supports Kafka transactional messaging to provide exactly once semantic for the producer.
flink-connector-kafka_2.111.7.0FlinkKafkaConsumer FlinkKafkaProducer>= 1.0.0 This universal Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11_2.11 and flink-connector-kafka-0.10_2.11 respectively.

Then, import the connector in your maven project:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

Note that the streaming connectors are currently not part of the binary distribution.See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file must be set to the machine’s IP address.

Kafka 1.0.0+ Connector

Starting with Flink 1.7, there is a new universal Kafka connector that does not track a specific Kafka major version.Rather, it tracks the latest version of Kafka at the time of the Flink release.

If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector.If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8), you should use the connector corresponding to the broker version.

Compatibility

The universal Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker.It is compatible with broker versions 0.11.0 or newer, depending on the features used.For details on Kafka compatibility, please refer to the Kafka documentation.

Migrating Kafka Connector from 0.11 to universal

In order to perform the migration, see the upgrading jobs and Flink versions guideand:

  • Use Flink 1.9 or newer for the whole process.
  • Do not upgrade the Flink and operators at the same time.
  • Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (uid):
  • Use stop with savepoint feature to take the savepoint (for example by using stop —withSavepoint)CLI command.

Usage

To use the universal Kafka connector add a dependency to it:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

Then instantiate the new source (FlinkKafkaConsumer) and sink (FlinkKafkaProducer).The API is backward compatible with the Kafka 0.11 connector,except of dropping specific Kafka version from the module and class names.

Kafka Consumer

Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09 for Kafka 0.9.0.x versions, etc.or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions). It provides access to one or more Kafka topics.

The constructor accepts the following arguments:

  • The topic name / list of topic names
  • A DeserializationSchema / KafkaDeserializationSchema for deserializing the data from Kafka
  • Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “zookeeper.connect” (comma separated list of Zookeeper servers) (only required for Kafka 0.8)
    • “group.id” the id of the consumer groupExample:
  1. Properties properties = new Properties();
  2. properties.setProperty("bootstrap.servers", "localhost:9092");
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181");
  5. properties.setProperty("group.id", "test");
  6. DataStream<String> stream = env
  7. .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181")
  5. properties.setProperty("group.id", "test")
  6. stream = env
  7. .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
  8. .print()

The DeserializationSchema

The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. TheDeserializationSchema allows users to specify such a schema. The T deserialize(byte[] message)method gets called for each Kafka message, passing the value from Kafka.

It is usually helpful to start from the AbstractDeserializationSchema, which takes care of describing theproduced Java/Scala type to Flink’s type system. Users that implement a vanilla DeserializationSchema needto implement the getProducedType(…) method themselves.

For accessing the key, value and metadata of the Kafka message, the KafkaDeserializationSchema hasthe following deserialize method T deserialize(ConsumerRecord<byte[], byte[]> record).

For convenience, Flink provides the following schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) which creates a schema based on a Flink’s TypeInformation. This is useful if the data is both written and read by Flink. This schema is a performant Flink-specific alternative to other generic serialization approaches.

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) which turns the serialized JSON into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/…)(). The KeyValue objectNode contains a “key” and “value” field which contain all fields, as well as an optional “metadata” field that exposes the offset/partition/topic for this message.

  • AvroDeserializationSchema which reads data serialized with Avro format using a statically provided schema. It can infer the schema from Avro generated classes (AvroDeserializationSchema.forSpecific(…)) or it can work with GenericRecords with a manually provided schema (with AvroDeserializationSchema.forGeneric(…)). This deserialization schema expects that the serialized records DO NOT contain embedded schema.

    • There is also a version of this schema available that can lookup the writer’s schema (schema which was used to write the record) inConfluent Schema Registry. Using these deserialization schemarecord will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided( either through ConfluentRegistryAvroDeserializationSchema.forGeneric(…) or ConfluentRegistryAvroDeserializationSchema.forSpecific(…)).To use this deserialization schema one has to add the following additional dependency:
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro-confluent-registry</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

When encountering a corrupted message that cannot be deserialized for any reason, thereare two options - either throwing an exception from the deserialize(…) methodwhich will cause the job to fail and be restarted, or returning null to allowthe Flink Kafka consumer to silently skip the corrupted message. Note thatdue to the consumer’s fault tolerance (see below sections for more details),failing the job on the corrupted message will let the consumer attemptto deserialize the message again. Therefore, if deserialization still fails, theconsumer will fall into a non-stop restart and fail loop on that corruptedmessage.

Kafka Consumers Start Position Configuration

The Flink Kafka Consumer allows configuring how the start position for Kafkapartitions are determined.

Example:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
  3. myConsumer.setStartFromEarliest(); // start from the earliest record possible
  4. myConsumer.setStartFromLatest(); // start from the latest record
  5. myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
  6. myConsumer.setStartFromGroupOffsets(); // the default behaviour
  7. DataStream<String> stream = env.addSource(myConsumer);
  8. ...
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val myConsumer = new FlinkKafkaConsumer08[String](...)
  3. myConsumer.setStartFromEarliest() // start from the earliest record possible
  4. myConsumer.setStartFromLatest() // start from the latest record
  5. myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
  6. myConsumer.setStartFromGroupOffsets() // the default behaviour
  7. val stream = env.addSource(myConsumer)
  8. ...

All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position.

  • setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.
  • setStartFromEarliest() / setStartFromLatest(): Start from the earliest / latest record. Under these modes, committed offsets in Kafka will be ignored and not used as starting positions.
  • setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record. Under this mode, committed offsets in Kafka will be ignored and not used as starting positions.

You can also specify the exact offsets the consumer should start from for each partition:

  1. Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
  5. myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
  1. val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
  5. myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

The above example configures the consumer to start from the specified offsets forpartitions 0, 1, and 2 of topic myTopic. The offset values should be thenext record that the consumer should read for each partition. Note thatif the consumer needs to read a partition which does not have a specifiedoffset within the provided offsets map, it will fallback to the defaultgroup offsets behaviour (i.e. setStartFromGroupOffsets()) for thatparticular partition.

Note that these start position configuration methods do not affect the start position when the job isautomatically restored from a failure or manually restored using a savepoint.On restore, the start position of each Kafka partition is determined by theoffsets stored in the savepoint or checkpoint(please see the next section for information about checkpointing to enablefault tolerance for the consumer).

Kafka Consumers and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint allits Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restorethe streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that werestored in the checkpoint.

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // checkpoint every 5000 msecs
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // checkpoint every 5000 msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

Kafka Consumers Topic and Partition Discovery

Partition discovery

The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them withexactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when thejob starts running) will be consumed from the earliest possible offset.

By default, partition discovery is disabled. To enable it, set a non-negative valuefor flink.partition-discovery.interval-millis in the provided properties config,representing the discovery interval in milliseconds.

Limitation When the consumer is restored from a savepoint from Flink versionsprior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would failwith an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x andthen restore again from that.

Topic discovery

At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on thetopic names using regular expressions. See the below for an example:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "localhost:9092");
  4. properties.setProperty("group.id", "test");
  5. FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
  6. java.util.regex.Pattern.compile("test-topic-[0-9]"),
  7. new SimpleStringSchema(),
  8. properties);
  9. DataStream<String> stream = env.addSource(myConsumer);
  10. ...
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val properties = new Properties()
  3. properties.setProperty("bootstrap.servers", "localhost:9092")
  4. properties.setProperty("group.id", "test")
  5. val myConsumer = new FlinkKafkaConsumer08[String](
  6. java.util.regex.Pattern.compile("test-topic-[0-9]"),
  7. new SimpleStringSchema,
  8. properties)
  9. val stream = env.addSource(myConsumer)
  10. ...

In the above example, all topics with names that match the specified regular expression(starting with test-topic- and ending with a single digit) will be subscribed by the consumerwhen the job starts running.

To allow the consumer to discover dynamically created topics after the job started running,set a non-negative value for flink.partition-discovery.interval-millis. This allowsthe consumer to discover partitions of new topics with names that also match the specifiedpattern.

Kafka Consumers Offset Committing Behaviour Configuration

The Flink Kafka Consumer allows configuring the behaviour of how offsetsare committed back to Kafka brokers (or Zookeeper in 0.8). Note that theFlink Kafka Consumer does not rely on the committed offsets for faulttolerance guarantees. The committed offsets are only a means to exposethe consumer’s progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending onwhether or not checkpointing is enabled for the job.

  • Checkpointing disabled: if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients. Therefore, to disable or enable offset committing, simply set the enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms keys to appropriate values in the provided Properties configuration.

  • Checkpointing enabled: if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the setCommitOffsetsOnCheckpoints(boolean) method on the consumer (by default, the behaviour is true). Note that in this scenario, the automatic periodic offset committing settings in Properties is completely ignored.

Kafka Consumers and Timestamp Extraction/Watermark Emission

In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based onspecial records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink KafkaConsumer allows the specification of an AssignerWithPeriodicWatermarks or an AssignerWithPunctuatedWatermarks.

You can specify your custom timestamp extractor/watermark emitter as describedhere, or use one from thepredefined ones. After doing so, youcan pass it to your consumer in the following way:

  1. Properties properties = new Properties();
  2. properties.setProperty("bootstrap.servers", "localhost:9092");
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181");
  5. properties.setProperty("group.id", "test");
  6. FlinkKafkaConsumer08<String> myConsumer =
  7. new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
  8. myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
  9. DataStream<String> stream = env
  10. .addSource(myConsumer)
  11. .print();
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181")
  5. properties.setProperty("group.id", "test")
  6. val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
  7. myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
  8. stream = env
  9. .addSource(myConsumer)
  10. .print()

Internally, an instance of the assigner is executed per Kafka partition.When such an assigner is specified, for each record read from Kafka, theextractTimestamp(T element, long previousElementTimestamp) is called to assign a timestamp to the record andthe Watermark getCurrentWatermark() (for periodic) or theWatermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (for punctuated) is called to determineif a new watermark should be emitted and with which timestamp.

Note: If a watermark assigner depends on records read from Kafka to advance its watermarks(which is commonly the case), all topics and partitions need to have a continuous stream of records.Otherwise, the watermarks of the whole application cannot advance and all time-based operations,such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior.A Flink improvement is planned to prevent this from happening (see FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions).In the meanwhile, a possible workaround is to send heartbeat messages to all consumed partitions that advance the watermarks of idle partitions.

Kafka Producer

Flink’s Kafka Producer is called FlinkKafkaProducer011 (or 010 for Kafka 0.10.0.x versions, etc. or just FlinkKafkaProducer for Kafka >= 1.0.0 versions).It allows writing a stream of records to one or more Kafka topics.

Example:

  1. DataStream<String> stream = ...;
  2. FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
  3. "localhost:9092", // broker list
  4. "my-topic", // target topic
  5. new SimpleStringSchema()); // serialization schema
  6. // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
  7. // this method is not available for earlier Kafka versions
  8. myProducer.setWriteTimestampToKafka(true);
  9. stream.addSink(myProducer);
  1. val stream: DataStream[String] = ...
  2. val myProducer = new FlinkKafkaProducer011[String](
  3. "localhost:9092", // broker list
  4. "my-topic", // target topic
  5. new SimpleStringSchema) // serialization schema
  6. // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
  7. // this method is not available for earlier Kafka versions
  8. myProducer.setWriteTimestampToKafka(true)
  9. stream.addSink(myProducer)

The above examples demonstrate the basic usage of creating a Flink Kafka Producerto write streams to a single Kafka target topic. For more advanced usages, thereare other constructor variants that allow providing the following:

  • Providing custom properties: The producer allows providing a custom properties configuration for the internal KafkaProducer. Please refer to the Apache Kafka documentation for details on how to configure Kafka Producers.
  • Custom partitioner: To assign records to specific partitions, you can provide an implementation of a FlinkKafkaPartitioner to the constructor. This partitioner will be called for each record in the stream to determine which exact partition of the target topic the record should be sent to. Please see Kafka Producer Partitioning Scheme for more details.
  • Advanced serialization schema: Similar to the consumer, the producer also allows using an advanced serialization schema called KeyedSerializationSchema, which allows serializing the key and value separately. It also allows to override the target topic, so that one producer instance can send data to multiple topics.

Kafka Producer Partitioning Scheme

By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will usea FlinkFixedPartitioner that maps each Flink Kafka Producer parallel subtask to a single Kafka partition(i.e., all records received by a sink subtask will end up in the same Kafka partition).

A custom partitioner can be implemented by extending the FlinkKafkaPartitioner class. AllKafka versions’ constructors allow providing a custom partitioner when instantiating the producer.Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioneris not part of the producer’s checkpointed state.

It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partitionthe written records by their attached key (as determined for each record using the provided serialization schema).To do this, provide a null custom partitioner when instantiating the producer. It is importantto provide null as the custom partitioner; as explained above, if a custom partitioner is not specifiedthe FlinkFixedPartitioner is used instead.

Kafka Producers and Fault Tolerance

Kafka 0.8

Before 0.9 Kafka did not provide any mechanisms to guarantee at-least-once or exactly-once semantics.

Kafka 0.9 and 0.10

With Flink’s checkpointing enabled, the FlinkKafkaProducer09 and FlinkKafkaProducer010can provide at-least-once delivery guarantees.

Besides enabling Flink’s checkpointing, you should also configure the settermethods setLogFailuresOnly(boolean) and setFlushOnCheckpoint(boolean) appropriately.

  • setLogFailuresOnly(boolean): by default, this is set to false. Enabling this will let the producer only log failures instead of catching and rethrowing them. This essentially accounts the record to have succeeded, even if it was never written to the target Kafka topic. This must be disabled for at-least-once.
  • setFlushOnCheckpoint(boolean): by default, this is set to true. With this enabled, Flink’s checkpoints will wait for any on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before succeeding the checkpoint. This ensures that all records before the checkpoint have been written to Kafka. This must be enabled for at-least-once.

In conclusion, the Kafka producer by default has at-least-once guarantees for versions0.9 and 0.10, with setLogFailureOnly set to false and setFlushOnCheckpoint setto true.

Note: By default, the number of retries is set to “0”. This means that when setLogFailuresOnly is set to false,the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoidduplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,we recommend setting the number of retries to a higher value.

Note: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once deliveryinto a Kafka topic.

Kafka 0.11 and newer

With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provideexactly-once delivery guarantees.

Besides enabling Flink’s checkpointing, you can also choose three different modes of operatingchosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions):

  • Semantic.NONE: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
  • Semantic.AT_LEAST_ONCE (default setting): similar to setFlushOnCheckpoint(true) in FlinkKafkaProducer010. This guarantees that no records will be lost (although they can be duplicated).
  • Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
Caveats

Semantic.EXACTLY_ONCE mode relies on the ability to commit transactionsthat were started before taking a checkpoint, after recovering from the said checkpoint. If the timebetween Flink application crash and completed restart is larger than Kafka’s transaction timeoutthere will be data loss (Kafka will automatically abort transactions that exceeded timeout time).Having this in mind, please configure your transaction timeout appropriately to your expected downtimes.

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property willnot allow to set transaction timeouts for the producers larger than it’s value.FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to1 hour, thus transaction.max.timeout.ms should be increased before using theSemantic.EXACTLY_ONCE mode.

In read_committed mode of KafkaConsumer, any transactions that were not finished(neither aborted nor completed) will block all reads from the given Kafka topic past anyun-finished transaction. In other words after following sequence of events:

  • User started transaction1 and written some records using it
  • User started transaction2 and written some further records using it
  • User committed transaction2Even if records from transaction2 are already committed, they will not be visible tothe consumers until transaction1 is committed or aborted. This has two implications:
  • First of all, during normal working of Flink applications, user can expect a delay in visibility of the records produced into Kafka topics, equal to average time between completed checkpoints.
  • Secondly in case of Flink application failure, topics into which this application was writing, will be blocked for the readers until the application restarts or the configured transaction timeout time will pass. This remark only applies for the cases when there are multiple agents/applications writing to the same Kafka topic.

Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducersper each FlinkKafkaProducer011 instance. One of each of those producers is used per onecheckpoint. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer011will throw an exception and will fail the whole application. Please configure max pool size and maxnumber of concurrent checkpoints accordingly.

Note: Semantic.EXACTLY_ONCE takes all possible measures to not leave any lingering transactionsthat would block the consumers from reading from Kafka topic more then it is necessary. However in theevent of failure of Flink application before first checkpoint, after restarting such application thereis no information in the system about previous pool sizes. Thus it is unsafe to scale down Flinkapplication before first checkpoint completes, by factor larger than FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR.

Since Apache Kafka 0.10+, Kafka’s messages can carrytimestamps, indicatingthe time the event has occurred (see “event time” in Apache Flink) or the time when the messagehas been written to the Kafka broker.

The FlinkKafkaConsumer010 will emit records with the timestamp attached, if the time characteristic in Flink is set to TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)).

The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in “Kafka Consumers and Timestamp Extraction/Watermark Emission” using the assignTimestampsAndWatermarks method are applicable.

There is no need to define a timestamp extractor when using the timestamps from Kafka. The previousElementTimestamp argument of the extractTimestamp() method contains the timestamp carried by the Kafka message.

A timestamp extractor for a Kafka consumer would look like this:

  1. public long extractTimestamp(Long element, long previousElementTimestamp) {
  2. return previousElementTimestamp;
  3. }

The FlinkKafkaProducer010 only emits the record timestamp, if setWriteTimestampToKafka(true) is set.

  1. FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
  2. config.setWriteTimestampToKafka(true);

Kafka Connector metrics

Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyzethe behavior of the connector.The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation.

In addition to these metrics, all consumers expose the current-offsets and committed-offsets for each topic partition.The current-offsets refers to the current offset in the partition. This refers to the offset of the last element thatwe retrieved and emitted successfully. The committed-offsets is the last committed offset.

The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointingis disabled, offsets are committed periodically.With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees.

The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference betweenthe committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consumingthe data slower from the topic than new data is added, the lag will increase and the consumer will fall behind.For large production deployments we recommend monitoring that metric to avoid increasing latency.

Enabling Kerberos Authentication (for versions 0.9+ and above only)

Flink provides first-class support through the Kafka connector to authenticate to a Kafka installationconfigured for Kerberos. Simply configure Flink in flink-conf.yaml to enable Kerberos authentication for Kafka like so:

  • Configure Kerberos credentials by setting the following -
    • security.kerberos.login.use-ticket-cache: By default, this is true and Flink will attempt to use Kerberos credentials in ticket caches managed by kinit. Note that when using the Kafka connector in Flink jobs deployed on YARN, Kerberos authorization using ticket caches will not work. This is also the case when deploying using Mesos, as authorization using ticket cache is not supported for Mesos deployments.
    • security.kerberos.login.keytab and security.kerberos.login.principal: To use Kerberos keytabs instead, set values for both of these properties.
  • Append KafkaClient to security.kerberos.login.contexts: This tells Flink to provide the configured Kerberos credentials to the Kafka login context to be used for Kafka authentication.Once Kerberos-based Flink security is enabled, you can authenticate to Kafka with either the Flink Kafka Consumer or Producerby simply including the following two settings in the provided properties configuration that is passed to the internal Kafka client:
  • Set security.protocol to SASL_PLAINTEXT (default NONE): The protocol used to communicate to Kafka brokers.When using standalone Flink deployment, you can also use SASL_SSL; please see how to configure the Kafka client for SSL here.
  • Set sasl.kerberos.service.name to kafka (default kafka): The value for this should match the sasl.kerberos.service.name used for Kafka broker configurations.A mismatch in service name between client and server configuration will cause the authentication to fail.

For more information on Flink configuration for Kerberos security, please see here.You can also find here further details on how Flink internally setups Kerberos-based security.

Troubleshooting

If you have a problem with Kafka when using Flink, keep in mind that Flink only wrapsKafkaConsumer orKafkaProducerand your problem might be independent of Flink and sometimes can be solved by upgrading Kafka brokers,reconfiguring Kafka brokers or reconfiguring KafkaConsumer or KafkaProducer in Flink.Some examples of common problems are listed below.

Data loss

Depending on your Kafka configuration, even after Kafka acknowledgeswrites you can still experience data loss. In particular keep in mind about the following propertiesin Kafka config:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

Default values for the above options can easily lead to data loss.Please refer to the Kafka documentation for more explanation.

UnknownTopicOrPartitionException

One possible cause of this error is when a new leader election is taking place,for example after or during restarting a Kafka broker.This is a retriable exception, so Flink job should be able to restart and resume normal operation.It also can be circumvented by changing retries property in the producer settings.However this might cause reordering of messages,which in turn if undesired can be circumvented by setting max.in.flight.requests.per.connection to 1.