Apache Pulsar Connector

Flink provides an Apache Pulsar source connector for reading data from Pulsar topics with exactly-once guarantees.

Dependency

You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar source connector supports Pulsar transactions, it is recommended to use Pulsar 2.8.0 or higher releases. For details on Pulsar compatibility, please refer to the PIP-72.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-pulsar_2.11</artifactId>
  4. <version>1.14.4</version>
  5. </dependency>

Copied to clipboard!

Flink’s streaming connectors are not currently part of the binary distribution. See how to link with them for cluster execution here.

Pulsar Source

This part describes the Pulsar source based on the new data source API.

If you want to use the legacy SourceFunction or on Flink 1.13 or lower releases, just use the StreamNative’s pulsar-flink.

Usage

Pulsar source provides a builder class for constructing an instance of PulsarSource. The code snippet below shows how to build a PulsarSource to consume messages from the earliest cursor of topic “persistent://public/default/my-topic”, with Exclusive subscription my-subscription and deserialize the raw payload of the messages as strings.

  1. PulsarSource<String> pulsarSource = PulsarSource.builder()
  2. .setServiceUrl(serviceUrl)
  3. .setAdminUrl(adminUrl)
  4. .setStartCursor(StartCursor.earliest())
  5. .setTopics("my-topic")
  6. .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
  7. .setSubscriptionName("my-subscription")
  8. .setSubscriptionType(SubscriptionType.Exclusive)
  9. .build();
  10. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

The following properties are required for building a PulsarSource:

  • Pulsar service url, configured by setServiceUrl(String)
  • Pulsar service http url (aka. admin url), configured by setAdminUrl(String)
  • Pulsar subscription name, configured by setSubscriptionName(String)
  • Topics / partitions to subscribe, see the following Topic-partition subscription for more details.
  • Deserializer to parse Pulsar messages, see the following Deserializer for more details.

Topic-partition Subscription

Pulsar source provide two ways of topic-partition subscription:

  • Topic list, subscribing messages from all partitions in a list of topics. For example:

    1. PulsarSource.builder().setTopics("some-topic1", "some-topic2")
    2. // Partition 0 and 2 of topic "topic-a"
    3. PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
  • Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:

    1. PulsarSource.builder().setTopicPattern("topic-*")

Flexible Topic Naming

Since Pulsar 2.0, all topic names internally have the form {persistent|non-persistent}://tenant/namespace/topic. Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity). The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.

Topic propertyDefault
topic typepersistent
tenantpublic
namespacedefault

This table lists a mapping relationship between your input topic name and translated topic name:

Input topic nameTranslated topic name
my-topicpersistent://public/default/my-topic
my-tenant/my-namespace/my-topicpersistent://my-tenant/my-namespace/my-topic

For non-persistent topics, you need to continue to specify the entire topic name, as the default-based rules do not apply for non-partitioned topics. Thus, you cannot use a short name like non-persistent://my-topic and would need to use non-persistent://public/default/my-topic instead.

Subscribing Pulsar Topic Partition

Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.

For example, if a simple-string topic with 3 partitions is created under the sample tenant with flink namespace. The topics on Pulsar would be:

Topic namePartitioned
persistent://sample/flink/simple-stringY
persistent://sample/flink/simple-string-partition-0N
persistent://sample/flink/simple-string-partition-1N
persistent://sample/flink/simple-string-partition-2N

You can directly consume messages from the topic partitions by using the non-partitioned topic names above. For example, use PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") would consume the partitions 1 and 2 of the sample/flink/simple-string topic.

RegexSubscriptionMode for Topic Pattern

Pulsar connector extracts the topic type (persistent or non-persistent) from the given topic pattern. For example, PulsarSource.builder().setTopicPattern("non-persistent://my-topic*") would be non-persistent. The topic type would be persistent if you do not provide the topic type in the regular expression.

To consume both persistent and non-persistent topics based on the topic pattern, you can use setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics). Pulsar connector would filter the available topics by the RegexSubscriptionMode.

Deserializer

A deserializer (Deserialization schema) is required for parsing Pulsar messages. The deserializer is configured by setDeserializationSchema(PulsarDeserializationSchema). The PulsarDeserializationSchema defines how to deserialize a Pulsar Message<byte[]>.

If only the raw payload of a message (message data in bytes) is needed, you can use the predefined PulsarDeserializationSchema. Pulsar connector provides three types of implementation.

  • Decode the message by using Pulsar’s Schema.

    1. // Primitive types
    2. PulsarDeserializationSchema.pulsarSchema(Schema)
    3. // Struct types (JSON, Protobuf, Avro, etc.)
    4. PulsarDeserializationSchema.pulsarSchema(Schema, Class)
    5. // KeyValue type
    6. PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
  • Decode the message by using Flink’s DeserializationSchema

    1. PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
  • Decode the message by using Flink’s TypeInformation

    1. PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)

Pulsar Message<byte[]> contains some extra properties, such as message key, message publish time, message time, application defined key/value pairs that will be attached to the message, etc. These properties could be acquired by the Message<byte[]> interface.

If you want to deserialize the Pulsar message by these properties, you need to implement PulsarDeserializationSchema. And ensure that the TypeInformation from the PulsarDeserializationSchema.getProducedType() must be correct. Flink would use this TypeInformation for passing the messages to downstream operators.

Pulsar Subscriptions

A Pulsar subscription is a named configuration rule that determines how messages are delivered to Flink readers. The subscription name is required for consuming messages. Pulsar connector supports four subscription types:

There is no difference between Exclusive and Failover in the Pulsar connector. When a Flink reader crashes, all (non-acknowledged and subsequent) messages are redelivered to the available Flink readers.

By default, if no subscription type is defined, Pulsar source uses Shared subscription.

  1. // Shared subscription with name "my-shared"
  2. PulsarSource.builder().setSubscriptionName("my-shared")
  3. // Exclusive subscription with name "my-exclusive"
  4. PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)

If you want to use Key_Shared subscription type on the Pulsar connector. Ensure that you provide a RangeGenerator implementation. The RangeGenerator generates a set of key hash ranges so that a respective reader subtask will only dispatch messages where the hash of the message key is contained in the specified range.

Pulsar connector would use a UniformRangeGenerator which would divides the range by the Flink source parallelism if no RangeGenerator is provided in the Key_Shared subscription type.

Starting Position

Pulsar source is able to consume messages starting from different positions by setStartCursor(StartCursor). Built-in start cursors include:

  • Start from the earliest available message in the topic.

    1. StartCursor.earliest()
  • Start from the latest available message in the topic.

    1. StartCursor.latest()
  • Start from a specified message between the earliest and the latest. Pulsar connector would consume from the latest available message if the message id doesn’t exist.

    The start message is included in consuming result.

    1. StartCursor.fromMessageId(MessageId)
  • Start from a specified message between the earliest and the latest. Pulsar connector would consume from the latest available message if the message id doesn’t exist.

    Include or exclude the start message by using the second boolean parameter.

    1. StartCursor.fromMessageId(MessageId, boolean)
  • Start from the specified message time by Message<byte[]>.getPublishTime().

    1. StartCursor.fromMessageTime(long)

Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID (MessageId) of the message is ordered in that sequence. MessageId contains some extra information (the ledger, entry, partition) on how the message is stored, you can create a MessageId by using DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex).

Boundedness

Pulsar source supports streaming and batch running modes. By default, the PulsarSource is set to run in the streaming mode.

In streaming mode, Pulsar source never stops until a Flink job fails or is cancelled. However, you can set Pulsar source stopping at a stop position by using setUnboundedStopCursor(StopCursor). The Pulsar source will finish when all partitions reach their specified stop positions.

You can use setBoundedStopCursor(StopCursor) to specify a stop position so that the Pulsar source can run in the batch mode. When all partitions have reached their stop positions, the source will finish.

Built-in stop cursors include:

  • Connector will never stop consuming.

    1. StopCursor.never()
  • Stop at the latest available message in Pulsar when the connector starts consuming.

    1. StopCursor.latest()
  • Stop when connector meet a given message, or stop at a message which is produced after this given message.

    1. StopCursor.atMessageId(MessageId)
  • Stop but include the given message in consuming result.

    1. StopCursor.afterMessageId(MessageId)
  • Stop at the specified message time by Message<byte[]>.getPublishTime().

    1. StopCursor.atPublishTime(long);

StopCursor.atEventTime(long) is now deprecated.

Source Configurable Options

In addition to configuration options described above, you can set arbitrary options for PulsarClient, PulsarAdmin, Pulsar Consumer and PulsarSource by using setConfig(ConfigOption<T>, T) and setConfig(Configuration).

PulsarClient Options

Pulsar connector use the client API to create the Consumer instance. Pulsar connector extracts most parts of Pulsar’s ClientConfigurationData, which is required for creating a PulsarClient, as Flink configuration options in PulsarOptions.

KeyDefaultTypeDescription
pulsar.client.authParamMap
(none)MapParameters for the authentication plugin.
pulsar.client.authParams
(none)StringParameters for the authentication plugin.

Example:
key1:val1,key2:val2
pulsar.client.authPluginClassName
(none)StringName of the authentication plugin.
pulsar.client.concurrentLookupRequest
5000IntegerThe number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker. It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created PulsarClient
pulsar.client.connectionTimeoutMs
10000IntegerDuration (in ms) of waiting for a connection to a broker to be established.
If the duration passes without a response from a broker, the connection attempt is dropped.
pulsar.client.connectionsPerBroker
1IntegerThe maximum number of connections that the client library will open to a single broker.
By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.
pulsar.client.enableBusyWait
falseBooleanOption to enable busy-wait settings.
This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.
pulsar.client.enableTransaction
falseBooleanIf transaction is enabled, start the transactionCoordinatorClient with PulsarClient.
pulsar.client.initialBackoffIntervalNanos
100000000LongDefault duration (in nanoseconds) for a backoff interval.
pulsar.client.keepAliveIntervalSeconds
30IntegerInterval (in seconds) for keeping connection between the Pulsar client and broker alive.
pulsar.client.listenerName
(none)StringConfigure the listenerName that the broker will return the corresponding advertisedListener.
pulsar.client.maxBackoffIntervalNanos
60000000000LongThe maximum duration (in nanoseconds) for a backoff interval.
pulsar.client.maxLookupRedirects
20IntegerThe maximum number of times a lookup-request redirections to a broker.
pulsar.client.maxLookupRequest
50000IntegerThe maximum number of lookup requests allowed on each broker connection to prevent overload on the broker. It should be greater than maxConcurrentLookupRequests. Requests that inside maxConcurrentLookupRequests are already sent to broker, and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.
pulsar.client.maxNumberOfRejectedRequestPerConnection
50IntegerThe maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed and the client creates a new connection to connect to a different broker.
pulsar.client.memoryLimitBytes
0LongThe limit (in bytes) on the amount of direct memory that will be allocated by this client instance.
Note: at this moment this is only limiting the memory for producers. Setting this to 0 will disable the limit.
pulsar.client.numIoThreads
1IntegerThe number of threads used for handling connections to brokers.
pulsar.client.numListenerThreads
1IntegerThe number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers that are using a listener model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering.
pulsar.client.operationTimeoutMs
30000IntegerOperation timeout (in ms). Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval. If the operation is not completed during this interval, the operation will be marked as failed.
pulsar.client.proxyProtocol
SNI

Enum

Protocol type to determine the type of proxy routing when a client connects to the proxy using pulsar.client.proxyServiceUrl.

Possible values:
  • “SNI”
pulsar.client.proxyServiceUrl
(none)StringProxy-service URL when a client connects to the broker via the proxy. The client can choose the type of proxy-routing.
pulsar.client.requestTimeoutMs
60000IntegerMaximum duration (in ms) for completing a request.
pulsar.client.serviceUrl
(none)StringService URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
pulsar.client.sslProvider
(none)StringThe name of the security provider used for SSL connections. The default value is the default security provider of the JVM.
pulsar.client.statsIntervalSeconds
60LongInterval between each stats info.
  • Stats is activated with positive statsInterval
  • Set statsIntervalSeconds to 1 second at least.
pulsar.client.tlsAllowInsecureConnection
falseBooleanWhether the Pulsar client accepts untrusted TLS certificate from the broker.
pulsar.client.tlsCiphers
List<String>A list of cipher suites. This is a named combination of authentication, encryption, MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol. By default all the available cipher suites are supported.
pulsar.client.tlsHostnameVerificationEnable
falseBooleanWhether to enable TLS hostname verification. It allows to validate hostname verification when a client connects to the broker over TLS. It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker’s host name. It follows RFC 2818, 3.1. Server Identity hostname verification.
pulsar.client.tlsProtocols
List<String>The SSL protocol used to generate the SSLContext. By default, it is set TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.
pulsar.client.tlsTrustCertsFilePath
(none)StringPath to the trusted TLS certificate file.
pulsar.client.tlsTrustStorePassword
(none)StringThe store password for the key store file.
pulsar.client.tlsTrustStorePath
(none)StringThe location of the trust store file.
pulsar.client.tlsTrustStoreType
“JKS”StringThe file format of the trust store file.
pulsar.client.useKeyStoreTls
falseBooleanIf TLS is enabled, whether use the KeyStore type as the TLS configuration parameter. If it is set to false, it means to use the default pem type configuration.
pulsar.client.useTcpNoDelay
trueBooleanWhether to use the TCP no-delay flag on the connection to disable Nagle algorithm.
No-delay features ensures that packets are sent out on the network as soon as possible, and it is critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput. Therefore, if latency is not a concern, it is recommended to set the useTcpNoDelay flag to false.
By default, it is set to true.

PulsarAdmin Options

The admin API is used for querying topic metadata and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the configuration options with the client API. The configuration options listed here are only used in the admin API. They are also defined in PulsarOptions.

KeyDefaultTypeDescription
pulsar.admin.adminUrl
(none)StringThe Pulsar service HTTP URL for the admin endpoint. For example, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.
pulsar.admin.autoCertRefreshTime
300000IntegerThe auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.
pulsar.admin.connectTimeout
60000IntegerThe connection time out (in ms) for the PulsarAdmin client.
pulsar.admin.readTimeout
60000IntegerThe server response read timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestTimeout
300000IntegerThe server request timeout (in ms) for the PulsarAdmin client for any request.

Pulsar Consumer Options

In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios. Flink’s Pulsar connector uses the Consumer API. It extracts most parts of Pulsar’s ConsumerConfigurationData as Flink configuration options in PulsarSourceOptions.

KeyDefaultTypeDescription
pulsar.consumer.ackReceiptEnabled
falseBooleanAcknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.
pulsar.consumer.ackTimeoutMillis
0LongThe timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).
pulsar.consumer.acknowledgementsGroupTimeMicros
100000LongGroup a consumer acknowledgment for a specified time (in μs). By default, a consumer uses 100μs grouping time to send out acknowledgments to a broker. If the group time is set to 0, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
falseBooleanBuffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull is true. Otherwise, it marks them for redelivery.
pulsar.consumer.autoUpdatePartitionsIntervalSeconds
60IntegerThe interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.
pulsar.consumer.consumerName
(none)StringThe consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.
pulsar.consumer.cryptoFailureAction
FAIL

Enum

The consumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD: silently acknowledge but do not deliver messages to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

Fail to decompress the messages.
If messages contain batch messages, a client is not be able to retrieve individual messages in batch.
The delivered encrypted message contains EncryptionContext which contains encryption and compression information in. You can use an application to decrypt the consumed message payload.

Possible values:
  • “FAIL”
  • “DISCARD”
  • “CONSUME”
pulsar.consumer.deadLetterPolicy.deadLetterTopic
(none)StringName of the dead topic where the failed messages are sent.
pulsar.consumer.deadLetterPolicy.maxRedeliverCount
0IntegerThe maximum number of times that a message are redelivered before being sent to the dead letter queue.
pulsar.consumer.deadLetterPolicy.retryLetterTopic
(none)StringName of the retry topic where the failed messages are sent.
pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis
60000LongIf a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).
pulsar.consumer.maxPendingChunkedMessage
10IntegerThe consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages’ chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.
For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.
Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged. This behavior can be controlled by the pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull option.
pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
50000IntegerThe maximum total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
pulsar.consumer.negativeAckRedeliveryDelayMicros
60000000LongDelay (in μs) to wait before redelivering messages that failed to be processed.
When an application uses Consumer.negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
pulsar.consumer.poolMessages
falseBooleanEnable pooling of messages and the underlying data buffers.
pulsar.consumer.priorityLevel
0IntegerPriority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,…
In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits. Otherwise, the broker considers consumers on the next priority level.

Example 1
If a subscription has consumer A with priorityLevel 0 and consumer B with priorityLevel 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.
Example 2
Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1
The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
pulsar.consumer.properties
MapA name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.consumer.readCompacted
falseBooleanIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.
A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.
Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).
Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
pulsar.consumer.receiverQueueSize
1000IntegerSize of a consumer’s receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
pulsar.consumer.replicateSubscriptionState
falseBooleanIf replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.
pulsar.consumer.retryEnable
falseBooleanIf enabled, the consumer will automatically retry messages.
pulsar.consumer.subscriptionInitialPosition
Latest

Enum

Initial position at which to set cursor when subscribing to a topic at first time.

Possible values:
  • “Latest”
  • “Earliest”
pulsar.consumer.subscriptionMode
Durable

Enum

Select the subscription mode to be used when subscribing to the topic.
  • Durable: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.
  • NonDurable: Lightweight subscription mode that doesn’t have a durable cursor associated


Possible values:
  • “Durable”
  • “NonDurable”
pulsar.consumer.subscriptionName
(none)StringSpecify the subscription name for this consumer. This argument is required when constructing the consumer.
pulsar.consumer.subscriptionType
Shared

Enum

Subscription type.

Four subscription types are available:
  • Exclusive
  • Failover
  • Shared
  • Key_Shared


Possible values:
  • “Exclusive”
  • “Shared”
  • “Failover”
  • “Key_Shared”
pulsar.consumer.tickDurationMillis
1000LongGranularity (in ms) of the ack-timeout redelivery.
A greater (for example, 1 hour) tickDurationMillis reduces the memory overhead to track messages.

PulsarSource Options

The configuration options below are mainly used for customizing the performance and message acknowledgement behavior. You can just leave them alone if you do not meet any performance issues.

KeyDefaultTypeDescription
pulsar.source.autoCommitCursorInterval
5000LongThis option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription. We would automatically commit the cursor using the given period (in ms).
pulsar.source.enableAutoAcknowledgeMessage
falseBooleanFlink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to true.
The source would use pulsar client’s internal mechanism and commit cursor in two ways.
  • For Key_Shared and Shared subscription, the cursor would be committed once the message is consumed.
  • For Exclusive and Failover subscription, the cursor would be committed in a given interval.
pulsar.source.maxFetchRecords
100IntegerThe maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchTime.
pulsar.source.maxFetchTime
10000LongThe maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchRecords.
pulsar.source.partitionDiscoveryIntervalMs
30000LongThe interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.
pulsar.source.transactionTimeoutMillis
10800000LongThis option is used in Shared or Key_Shared subscription. You should configure this option when you do not enable the pulsar.source.enableAutoAcknowledgeMessage option.
The value (in ms) should be greater than the checkpoint interval.
pulsar.source.verifyInitialOffsets
WARN_ON_MISMATCH

Enum

Upon (re)starting the source, check whether the expected message can be read. If failure is enabled, the application fails. Otherwise, it logs a warning. A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.

Possible values:
  • “FAIL_ON_MISMATCH”
  • “WARN_ON_MISMATCH”

Dynamic Partition Discovery

To handle scenarios like topic scaling-out or topic creation without restarting the Flink job, Pulsar source can be configured to periodically discover new partitions under provided topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for the option PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS:

  1. // discover new partitions per 10 seconds
  2. PulsarSource.builder()
  3. .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
  • Partition discovery is enabled by default. Pulsar connector would query the topic metadata every 30 seconds.
  • You need to set the partition discovery interval to a negative value to disable this feature.
  • The partition discovery would be disabled in batch mode even if you set this option with a non-negative value.

Event Time and Watermarks

By default, the message uses the timestamp embedded in Pulsar Message<byte[]> as the event time. You can define your own WatermarkStrategy to extract the event time from the message, and emit the watermark downstream:

  1. env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")

This documentation describes details about how to define a WatermarkStrategy.

Message Acknowledgement

When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully. Pulsar connector supports four subscription types, which makes the acknowledgement behaviors variety among different subscriptions.

Acknowledgement on Exclusive and Failover Subscription Types

Exclusive and Failover subscription types support cumulative acknowledgment. In these subscription types, Flink only needs to acknowledge the latest successfully consumed message. All the message before the given message are marked with a consumed status.

Pulsar source acknowledges the current consuming message when checkpoints are completed, to ensure the consistency between Flink’s checkpoint state and committed position on Pulsar brokers.

If checkpointing is disabled, Pulsar source periodically acknowledges messages. You can set the acknowledgement period by using the PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL option.

Pulsar source does NOT rely on committed positions for fault tolerance. Acknowledging messages is only for exposing the progress of consumer and monitoring on these two subscription types.

Acknowledgement on Shared and Key_Shared Subscription Types

Shared and Key_Shared subscription types need to acknowledge messages one by one. You can acknowledge a message in a transaction and commit it to Pulsar.

You should enable transaction in the Pulsar borker.conf file when using these two subscription types in connector:

  1. transactionCoordinatorEnabled=true

Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time. A shorter checkpoint interval would increase the consuming performance. You can change the transaction timeout by using the PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS option.

If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE to true. The message would be immediately acknowledged after consuming. We can not promise consistency in this scenario.

All acknowledgements in a transaction are recorded in the Pulsar broker side.

Upgrading to the Latest Connector Version

The generic upgrade steps are outlined in upgrading jobs and Flink versions guide. The Pulsar connector does not store any state on the Flink side. The Pulsar connector pushes and stores all the states on the Pulsar side. For Pulsar, you additionally need to know these limitations:

  • Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
  • Always use a newer Pulsar client with Pulsar connector for consuming message from Pulsar.

Troubleshooting

If you have a problem with Pulsar when using Flink, keep in mind that Flink only wraps PulsarClient or PulsarAdmin and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers, reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.

Messages can be delayed on low volume topics

When the Pulsar source connector reads from a low volume topic, users might observe a 10 seconds delay between messages. Pulsar buffers messages from topics by default. Before emitting to downstream operators, the number of buffered records must be equal or larger than PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS. If the data volume is low, it could be that filling up the number of buffered records takes longer than PULSAR_MAX_FETCH_TIME (default to 10 seconds). If that’s the case, it means that only after this time has passed the messages will be emitted.

To avoid this behaviour, you need to change either the buffered records or the waiting time.