Pulsar Java client

可以使用 Pulsar 的 Java 客户端来创建消息的 生产者消费者读者,以及完成Pulsar 管理的任务。 The current version of the Java client is 2.9.2.

在利用 Java 客户端所创建的生产者消费者读者中所提供的方法都是线程安全的。

Javadoc for the Pulsar client is divided into two domains by package as follows.

说明Maven Artifact
org.apache.pulsar.client.api生产者和消费者 APIorg.apache.pulsar:pulsar-client:2.9.2
org.apache.pulsar.client.adminJava 管理APIorg.apache.pulsar:pulsar-client-admin:2.9.2
org.apache.pulsar.client.allIncludes both pulsar-client and pulsar-client-admin

Both pulsar-client and pulsar-client-admin are shaded packages and they shade dependencies independently. 因此,同时使用 pulsar-clientpulsar-client-admin 的应用程序会有冗余的 shade 类。 如果您引入了新的依赖项,但忘记更新 shade 规则,就会引起问题。

In this case, you can use pulsar-client-all, which shades dependencies only one time and reduces the size of dependencies.
org.apache.pulsar:pulsar-client-all:2.9.2

This document focuses only on the client API for producing and consuming messages on Pulsar topics. For how to use the Java admin client, see Pulsar admin interface.

安装

The latest version of the Pulsar Java client library is available via Maven Central. To use the latest version, add the pulsar-client library to your build configuration.

Maven

If you use Maven, add the following information to the pom.xml file.

  1. <!-- in your <properties> block -->
  2. <pulsar.version>2.9.2</pulsar.version>
  3. <!-- in your <dependencies> block -->
  4. <dependency>
  5. <groupId>org.apache.pulsar</groupId>
  6. <artifactId>pulsar-client</artifactId>
  7. <version>${pulsar.version}</version>
  8. </dependency>

Gradle

If you use Gradle, add the following information to the build.gradle file.

  1. def pulsarVersion = '2.9.2'
  2. dependencies {
  3. compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
  4. }

连接 URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme. The default port is 6650. The following is an example of localhost.

  1. pulsar://localhost:6650

If you have multiple brokers, the URL is as follows.

  1. pulsar://localhost:6550,localhost:6651,localhost:6652

A URL for a production Pulsar cluster is as follows.

  1. pulsar://pulsar.us-west.example.com:6650

If you use TLS authentication, the URL is as follows.

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Client

You can instantiate a PulsarClient object using just a URL for the target Pulsar cluster like this:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();

If you have multiple brokers, you can initiate a PulsarClient like this:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
  3. .build();

默认的broker URL是单机集群。

如果运行的是 standalone 模式的集群,则默认可以通过 pulsar://localhost:6650 这个 URL 来访问 broker。

If you create a client, you can use the loadConf configuration. The following parameters are available in loadConf.

类型配置项
说明
默认值
String

serviceUrl |Service URL provider for Pulsar service | None String | authPluginClassName | Name of the authentication plugin | None String | authParams | String represents parameters for the authentication plugin

Example
key1:val1,key2:val2|None long|operationTimeoutMs|Operation timeout |30000 long|statsIntervalSeconds|Interval between each stats info

Stats is activated with positive statsInterval

Set statsIntervalSeconds to 1 second at least |60 int|numIoThreads| The number of threads used for handling connections to brokers | 1 int|numListenerThreads|The number of threads used for handling message listeners. 所有的消费者和读者共享监听器线程池,并使用 “监听器”模式获取消息。 对于某个消费者来说,为了保证有序性,总在同一线程中调用监听器。 如果想用多线程处理单个 topic,需要采用 shared 订阅模式,并为这个订阅创建多个消费者。 This does not ensure ordering.| 1 boolean|useTcpNoDelay|Whether to use TCP no-delay flag on the connection to disable Nagle algorithm |true boolean |useTls |Whether to use TLS encryption on the connection| false string | tlsTrustCertsFilePath |Path to the trusted TLS certificate file|None boolean|tlsAllowInsecureConnection|Whether the Pulsar client accepts untrusted TLS certificate from broker | false boolean | tlsHostnameVerificationEnable | Whether to enable TLS hostname verification|false int|concurrentLookupRequest|The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker|5000 int|maxLookupRequest|The maximum number of lookup requests allowed on each broker connection to prevent overload on broker | 50000 int|maxNumberOfRejectedRequestPerConnection|The maximum number of rejected requests of a broker in a certain time frame (30 seconds) after the current connection is closed and the client creates a new connection to connect to a different broker|50 int|keepAliveIntervalSeconds|Seconds of keeping alive interval for each client broker connection|30 int|connectionTimeoutMs|Duration of waiting for a connection to a broker to be established

If the duration passes without a response from a broker, the connection attempt is dropped|10000 int|requestTimeoutMs|Maximum duration for completing a request |60000 int|defaultBackoffIntervalNanos| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100); long|maxBackoffIntervalNanos|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30) SocketAddress|socks5ProxyAddress|SOCKS5 proxy address | None String|socks5ProxyUsername|SOCKS5 proxy username | None String|socks5ProxyPassword|SOCKS5 proxy password | None

Check out the Javadoc for the PulsarClient class for a full list of configurable parameters.

除了客户端级别的配置,还有应用于 生产者消费者的特定配置,具体如下面章节所述。

生产者(Producer)

在Pulsar中,生产者写消息到主题中。 一旦你实例化一个PulsarClient 客户端对象(在如上z章节),你可以创建一个Producer 生产者用于特定的主题

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic("my-topic")
  3. .create();
  4. // 然后你就可以发送消息到指定的broker 和topic上:
  5. producer.send("My message".getBytes());

By default, producers produce messages that consist of byte arrays. You can produce different types by specifying a message schema.

  1. Producer<String> stringProducer = client.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .create();
  4. stringProducer.send("My message");

当不再需要生产者、消费者和客户端时,请确保将其关闭。

  1. producer.close();
  2. consumer.close();
  3. client.close();

关闭操作也可以是异步的:

  1. producer.closeAsync()
  2. .thenRun(() -> System.out.println("Producer closed"))
  3. .exceptionally((ex) -> {
  4. System.err.println("Failed to close producer: " + ex);
  5. return null;
  6. });

配置Producer(生产者)

If you instantiate a Producer object by specifying only a topic name as the example above, use the default configuration for producer.

If you create a producer, you can use the loadConf configuration. The following parameters are available in loadConf.

Type | Name|

说明

| Default |—-|—-|—-|—- String| topicName| Topic name| null| String|producerName|Producer name| null long|sendTimeoutMs|Message send timeout in ms.

If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.|30000 boolean|blockIfQueueFull|If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.

If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.|false int|maxPendingMessages|The maximum size of a queue holding pending messages.

For example, a message waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.|1000 int|maxPendingMessagesAcrossPartitions|The maximum number of pending messages across partitions.

Use the setting to lower the max pending messages for each partition ({@link #setMaxPendingMessages(int)}) if the total number exceeds the configured value.|50000 MessageRoutingMode|messageRoutingMode|Message routing logic for producers on partitioned topics.

Apply the logic only when setting no key on messages.

Available options are as follows:

  • pulsar.RoundRobinDistribution: round robin

  • pulsar.UseSinglePartition: publish all messages to a single partition

  • pulsar.CustomPartition: a custom partitioning scheme|pulsar.RoundRobinDistribution HashingScheme|hashingScheme|Hashing function determining the partition where you publish a particular message (partitioned topics only).

    Available options are as follows:

  • pulsar.JavaStringHash: the equivalent of String.hashCode() in Java

  • pulsar.Murmur3_32Hash: applies the Murmur3 hashing function

  • pulsar.BoostHash: applies the hashing function from C++’s Boost library |HashingScheme.JavaStringHash ProducerCryptoFailureAction|cryptoFailureAction|Producer should take action when encryption fails.

  • FAIL: if encryption fails, unencrypted messages fail to send.

  • SEND: if encryption fails, unencrypted messages are sent. |ProducerCryptoFailureAction.FAIL long|batchingMaxPublishDelayMicros|Batching time period of sending messages.|TimeUnit.MILLISECONDS.toMicros(1) int|batchingMaxMessages|The maximum number of messages permitted in a batch.|1000 boolean|batchingEnabled|Enable batching of messages. |true CompressionType|compressionType|Message data compression type used by a producer.

    Available options:

  • LZ4

  • ZLIB

  • ZSTD

  • SNAPPY| No compression

    You can configure parameters if you do not want to use the default configuration.

    For a full list, see the Javadoc for the ProducerBuilder class. The following is an example.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic("my-topic")
    3. .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    4. .sendTimeout(10, TimeUnit.SECONDS)
    5. .blockIfQueueFull(true)
    6. .create();

    消息路由

    当使用分区主题时,当你使用生产者发布消息时你可以指定路由模式。 For more information on specifying a routing mode using the Java client, see the Partitioned Topics cookbook.

    异步发送

    You can publish messages asynchronously using the Java client. With async send, the producer puts the message in a blocking queue and returns it immediately. Then the client library sends the message to the broker in the background. If the queue is full (max size configurable), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer.

    The following is an example.

    1. producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
    2. System.out.println("Message with ID " + msgId + " successfully sent");
    3. });

    As you can see from the example above, async send operations return a MessageId wrapped in a CompletableFuture.

    配置消息

    In addition to a value, you can set additional items on a given message:

    1. producer.newMessage()
    2. .key("my-message-key")
    3. .value("my-async-message".getBytes())
    4. .property("my-key", "my-value")
    5. .property("my-other-key", "my-other-value")
    6. .send();

    You can terminate the builder chain with sendAsync() and get a future return.

    消费者(Consumer)

    在Pulsar中,消费者订阅topic主题并处理生产者发布到这些主题的消息。 你可以首先实例化一个PulsarClient 对象并传递给他一个borker(如上所示) URL来实例化一个消费者

    一旦实例化一个PulsarClient 对象,你可以指定一个主题和一个订阅来创建一个 Consumer 消费者。

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscribe();

    The subscribe method will auto subscribe the consumer to the specified topic and subscription. 一种让消费者监听主题的方法是使用while循环。 In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed. If the processing logic fails, you can use negative acknowledgement to redeliver the message later.

    1. while (true) {
    2. // Wait for a message
    3. Message msg = consumer.receive();
    4. try {
    5. // Do something with the message
    6. System.out.println("Message received: " + new String(msg.getData()));
    7. // Acknowledge the message so that it can be deleted by the message broker
    8. consumer.acknowledge(msg);
    9. } catch (Exception e) {
    10. // Message failed to process, redeliver later
    11. consumer.negativeAcknowledge(msg);
    12. }
    13. }

    If you don’t want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.

    1. MessageListener myMessageListener = (consumer, msg) -> {
    2. try {
    3. System.out.println("Message received: " + new String(msg.getData()));
    4. consumer.acknowledge(msg);
    5. } catch (Exception e) {
    6. consumer.negativeAcknowledge(msg);
    7. }
    8. }
    9. Consumer consumer = client.newConsumer()
    10. .topic("my-topic")
    11. .subscriptionName("my-subscription")
    12. .messageListener(myMessageListener)
    13. .subscribe();

    配置消费者

    If you instantiate a Consumer object by specifying only a topic and subscription name as in the example above, the consumer uses the default configuration.

    When you create a consumer, you can use the loadConf configuration. The following parameters are available in loadConf.

    Type | Name|

    说明

    | Default |—-|—-|—-|—- Set<String>| topicNames| Topic name| Sets.newTreeSet() Pattern| topicsPattern| Topic pattern |None String| subscriptionName| Subscription name| None SubscriptionType| subscriptionType| Subscription type

    Four subscription types are available:

  • Exclusive
  • Failover(灾备)
  • Shared(共享)
  • Key_Shared |SubscriptionType.Exclusive int | receiverQueueSize | Size of a consumer’s receiver queue.

For example, the number of messages accumulated by a consumer before an application calls Receive.

A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.| 1000 long|acknowledgementsGroupTimeMicros|Group a consumer acknowledgment for a specified time.

By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.

Setting a group time of 0 sends out acknowledgments immediately.

A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.|TimeUnit.MILLISECONDS.toMicros(100) long|negativeAckRedeliveryDelayMicros|Delay to wait before redelivering messages that failed to be processed.

When an application uses {@link Consumer#negativeAcknowledge(Message)}, failed messages are redelivered after a fixed timeout. |TimeUnit.MINUTES.toMicros(1) int |maxTotalReceiverQueueSizeAcrossPartitions|The max total receiver queue size across partitions.

This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.|50000 String|consumerName|Consumer name|null long|ackTimeoutMillis|Timeout of unacked messages|0 long|tickDurationMillis|Granularity of the ack-timeout redelivery.

Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).|1000 int|priorityLevel|Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type.

The broker follows descending priorities. For example, 0=max-priority, 1, 2,…

In shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.

Example 1

If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.

Example 2

Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.|0 ConsumerCryptoFailureAction|cryptoFailureAction|Consumer should take action when it receives a message that can not be decrypted.

  • FAIL: this is the default option to fail messages until crypto succeeds.

  • DISCARD:silently acknowledge and not deliver message to an application.

  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

    The decompression of message fails.

    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

    Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.|ConsumerCryptoFailureAction.FAIL SortedMap<String, String>|properties|A name or value property of this consumer.

properties is application defined metadata attached to a consumer.

When getting a topic stats, associate this metadata with the consumer stats for easier identification.|new TreeMap<>() boolean|readCompacted|If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.|false SubscriptionInitialPosition|subscriptionInitialPosition|Initial position at which to set cursor when subscribing to a topic at first time.|SubscriptionInitialPosition.Latest int|patternAutoDiscoveryPeriod|Topic auto discovery period when using a pattern for topic’s consumer.

The default and minimum value is 1 minute.|1 RegexSubscriptionMode|regexSubscriptionMode|When subscribing to a topic using a regular expression, you can pick a certain type of topics.

  • PersistentOnly: only subscribe to persistent topics.

  • NonPersistentOnly: only subscribe to non-persistent topics.

  • AllTopics: subscribe to both persistent and non-persistent topics. |RegexSubscriptionMode.PersistentOnly DeadLetterPolicy|deadLetterPolicy|Dead letter policy for consumers.

By default, some messages are probably redelivered many times, even to the extent that it never stops.

By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically.

You can enable the dead letter mechanism by setting deadLetterPolicy.

Example

client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build()) .subscribe();

Default dead letter topic name is {TopicName}-{Subscription}-DLQ.

To set a custom dead letter topic name:
client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10) .deadLetterTopic("your-topic-name").build()) .subscribe();

When specifying the dead letter policy while not specifying ackTimeoutMillis, you can set the ack timeout to 30000 millisecond.|None boolean|autoUpdatePartitions|If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

Note: this is only for partitioned consumers.|true boolean|replicateSubscriptionState|If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.|false

You can configure parameters if you do not want to use the default configuration. For a full list, see the Javadoc for the ConsumerBuilder class.

The following is an example.

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .ackTimeout(10, TimeUnit.SECONDS)
  5. .subscriptionType(SubscriptionType.Exclusive)
  6. .subscribe();

异步接收

The receive method receives messages synchronously (the consumer process is blocked until a message is available). You can also use async receive, which returns a CompletableFuture object immediately once a new message is available.

The following is an example.

  1. CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

Async receive operations return a Message wrapped inside of a CompletableFuture.

批量接收

Use batchReceive to receive multiple messages for each call.

The following is an example.

  1. Messages messages = consumer.batchReceive();
  2. for (Object message : messages) {
  3. // do something
  4. }
  5. consumer.acknowledge(messages)

Note
Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages.

如果满足以下任一条件,则批量接收完成:足够的消息数量、足够的消息字节数、等待超时。

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .batchReceivePolicy(BatchReceivePolicy.builder()
  5. .maxNumMessages(100)
  6. .maxNumBytes(1024 * 1024)
  7. .timeout(200, TimeUnit.MILLISECONDS)
  8. .build())
  9. .subscribe();

默认批量接收策略是:

  1. BatchReceivePolicy.builder()
  2. .maxNumMessage(-1)
  3. .maxNumBytes(10 * 1024 * 1024)
  4. .timeout(100, TimeUnit.MILLISECONDS)
  5. .build();

多主题订阅

消费者除了订阅单个Pulsar主题外,你还可以使用多主题订阅订阅多个主题。 若要使用多主题订阅, 可以提供一个topic正则表达式 (regex) 或 主题List 。 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

The followings are some examples.

  1. import org.apache.pulsar.client.api.Consumer;
  2. import org.apache.pulsar.client.api.PulsarClient;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.regex.Pattern;
  6. ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
  7. .subscriptionName(subscription);
  8. // Subscribe to all topics in a namespace
  9. Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
  10. Consumer allTopicsConsumer = consumerBuilder
  11. .topicsPattern(allTopicsInNamespace)
  12. .subscribe();
  13. // Subscribe to a subsets of topics in a namespace, based on regex
  14. Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
  15. Consumer allTopicsConsumer = consumerBuilder
  16. .topicsPattern(someTopicsInNamespace)
  17. .subscribe();

In the above example, the consumer subscribes to the persistent topics that can match the topic name pattern. If you want the consumer subscribes to all persistent and non-persistent topics that can match the topic name pattern, set subscriptionTopicsMode to RegexSubscriptionMode.AllTopics.

  1. Pattern pattern = Pattern.compile("public/default/.*");
  2. pulsarClient.newConsumer()
  3. .subscriptionName("my-sub")
  4. .topicsPattern(pattern)
  5. .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
  6. .subscribe();

Note

默认情况下,消费者的 subscriptionTopicsModePersistentOnlysubscriptionTopicsMode 的可用选项为: PersistentOnlyNonPersistentOnly,以及 AllTopics

你还可以订阅明确的主题列表 (如果愿意, 可跨命名空间):

  1. List<String> topics = Arrays.asList(
  2. "topic-1",
  3. "topic-2",
  4. "topic-3"
  5. );
  6. Consumer multiTopicConsumer = consumerBuilder
  7. .topics(topics)
  8. .subscribe();
  9. // Alternatively:
  10. Consumer multiTopicConsumer = consumerBuilder
  11. .topic(
  12. "topic-1",
  13. "topic-2",
  14. "topic-3"
  15. )
  16. .subscribe();

You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. The following is an example.

  1. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
  2. consumerBuilder
  3. .topics(topics)
  4. .subscribeAsync()
  5. .thenAccept(this::receiveMessageFromConsumer);
  6. private void receiveMessageFromConsumer(Object consumer) {
  7. ((Consumer)consumer).receiveAsync().thenAccept(message -> {
  8. // Do something with the received message
  9. receiveMessageFromConsumer(consumer);
  10. });
  11. }

Subscription types

Pulsar has various subscription types to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.

A subscription is identical with the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.

Different subscription types have different message distribution modes. This section describes the differences of subscription types and how to use them.

In order to better describe their differences, assuming you have a topic named “my-topic”, and the producer has published 10 messages.

  1. Producer<String> producer = client.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .enableBatching(false)
  4. .create();
  5. // 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
  6. producer.newMessage().key("key-1").value("message-1-1").send();
  7. producer.newMessage().key("key-1").value("message-1-2").send();
  8. producer.newMessage().key("key-1").value("message-1-3").send();
  9. producer.newMessage().key("key-2").value("message-2-1").send();
  10. producer.newMessage().key("key-2").value("message-2-2").send();
  11. producer.newMessage().key("key-2").value("message-2-3").send();
  12. producer.newMessage().key("key-3").value("message-3-1").send();
  13. producer.newMessage().key("key-3").value("message-3-2").send();
  14. producer.newMessage().key("key-4").value("message-4-1").send();
  15. producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive

Create a new consumer and subscribe with the Exclusive subscription type.

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Exclusive)
  5. .subscribe()

Only the first consumer is allowed to the subscription, other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.

Note
If topic is a partitioned topic, the first consumer subscribes to all partitioned topics, other consumers are not assigned with partitions and receive an error.

Failover(灾备)

Create new consumers and subscribe with theFailover subscription type.

  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Failover)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Failover)
  10. .subscribe()
  11. //conumser1 is the active consumer, consumer2 is the standby consumer.
  12. //consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes active consumer.

If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-2")
  3. ("key-1", "message-1-3")
  4. ("key-2", "message-2-1")
  5. ("key-2", "message-2-2")

consumer2 will receive:

  1. ("key-2", "message-2-3")
  2. ("key-3", "message-3-1")
  3. ("key-3", "message-3-2")
  4. ("key-4", "message-4-1")
  5. ("key-4", "message-4-2")

Note
If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.

Shared(共享)

Create new consumers and subscribe with Shared subscription type.

  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Shared)
  10. .subscribe()
  11. //Both consumer1 and consumer 2 is active consumers.

In shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round robin distribution across consumers.

If a broker dispatches only one message at a time, consumer1 receives the following information.

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-3")
  3. ("key-2", "message-2-2")
  4. ("key-3", "message-3-1")
  5. ("key-4", "message-4-1")

consumer2 receives the following information.

  1. ("key-1", "message-1-2")
  2. ("key-2", "message-2-1")
  3. ("key-2", "message-2-3")
  4. ("key-3", "message-3-2")
  5. ("key-4", "message-4-2")

Shared subscription is different from Exclusive and Failover subscription types. Shared subscription has better flexibility, but cannot provide order guarantee.

Key_shared

This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with Key_Shared subscription type.

  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Key_Shared)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Key_Shared)
  10. .subscribe()
  11. //Both consumer1 and consumer2 are active consumers.

Key_Shared subscription is like Shared subscription, all consumers can attach to the same subscription. But it is different from Key_Shared subscription, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).

consumer1 receives the following information.

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-2")
  3. ("key-1", "message-1-3")
  4. ("key-3", "message-3-1")
  5. ("key-3", "message-3-2")

consumer2 receives the following information.

  1. ("key-2", "message-2-1")
  2. ("key-2", "message-2-2")
  3. ("key-2", "message-2-3")
  4. ("key-4", "message-4-1")
  5. ("key-4", "message-4-2")

If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the KeyBasedBatcher.

  1. Producer producer = client.newProducer()
  2. .topic("my-topic")
  3. .batcherBuilder(BatcherBuilder.KEY_BASED)
  4. .create();

Or the producer can disable batching.

  1. Producer producer = client.newProducer()
  2. .topic("my-topic")
  3. .enableBatching(false)
  4. .create();

Note
If the message key is not specified, messages without key are dispatched to one consumer in order by default.

Reader

With the reader interface, Pulsar clients can “manually position” themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create Reader objects by specifying a topic and a MessageId .

The following is an example.

  1. byte[] msgIdBytes = // Some message ID byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();
  7. while (true) {
  8. Message message = reader.readNext();
  9. // Process message
  10. }

In the example above, a Reader object is instantiated for a specific topic and message (by ID); the reader iterates over each message in the topic after the message is identified by msgIdBytes (how that value is obtained depends on the application).

上面的示例代码展示了Reader对象指向特定的消息(ID),但你也可以使用MessageId.earliest来指向topic上最早可用的消息,使用MessageId.latest指向最新的消息。

配置读者

When you create a reader, you can use the loadConf configuration. The following parameters are available in loadConf.

类型配置项
说明
默认值
String

topicName|Topic name. |None int|receiverQueueSize|Size of a consumer’s receiver queue.

For example, the number of messages that can be accumulated by a consumer before an application calls Receive.

A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.|1000 ReaderListener<T>|readerListener|A listener that is called for message received.|None String|readerName|Reader name.|null String| subscriptionName|Subscription name|When there is a single topic, the default subscription name is "reader-" + 10-digit UUID.
When there are multiple topics, the default subscription name is "multiTopicsReader-" + 10-digit UUID. String|subscriptionRolePrefix|Prefix of subscription role. |null CryptoKeyReader|cryptoKeyReader|Interface that abstracts the access to a key store.|null ConsumerCryptoFailureAction|cryptoFailureAction|Consumer should take action when it receives a message that can not be decrypted.

  • FAIL: this is the default option to fail messages until crypto succeeds.

  • DISCARD: silently acknowledge and not deliver message to an application.

  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

    The message decompression fails.

    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

    Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.|ConsumerCryptoFailureAction.FAIL boolean|readCompacted|If enabling readCompacted, a consumer reads messages from a compacted topic rather than a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.|false boolean|resetIncludeHead|If set to true, the first message to be returned is the one specified by messageId.

If set to false, the first message to be returned is the one next to the message specified by messageId.|false

Sticky key range reader

In sticky key range reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

The following is an example to create a sticky key range reader.

  1. pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.earliest)
  4. .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
  5. .create();

Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.

Schema

In Pulsar, all message data consists of byte arrays “under the hood.” Message schemas enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). 如果在不指定schema的情况下构造 生产者,则生产者只能生成类型为 byte[]的消息。 The following is an example.

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .create();

The producer above is equivalent to a Producer<byte[]> (in fact, you should always explicitly specify the type). If you’d like to use a producer for a different type of data, you’ll need to specify a schema that informs Pulsar which data type will be transmitted over the topic.

AvroBaseStructSchema example

假设您有一个 SensorReading 类, 你想通过Pulsar主题进行传输:

  1. public class SensorReading {
  2. public float temperature;
  3. public SensorReading(float temperature) {
  4. this.temperature = temperature;
  5. }
  6. // A no-arg constructor is required
  7. public SensorReading() {
  8. }
  9. public float getTemperature() {
  10. return temperature;
  11. }
  12. public void setTemperature(float temperature) {
  13. this.temperature = temperature;
  14. }
  15. }

You could then create a Producer<SensorReading> (or Consumer<SensorReading>) like this:

  1. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  2. .topic("sensor-readings")
  3. .create();

以下schema格式目前可用于 Java:

  • 无schema 或者字节数组schema(可以使用Schema.BYTES):

    1. Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
    2. .topic("some-raw-bytes-topic")
    3. .create();

    或者:

    1. Producer<byte[]> bytesProducer = client.newProducer()
    2. .topic("some-raw-bytes-topic")
    3. .create();
  • String for normal UTF-8-encoded string data. Apply the schema using Schema.STRING:

    1. Producer<String> stringProducer = client.newProducer(Schema.STRING)
    2. .topic("some-string-topic")
    3. .create();
  • Create JSON schemas for POJOs using Schema.JSON. The following is an example.

    1. Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
    2. .topic("some-pojo-topic")
    3. .create();
  • Generate Protobuf schemas using Schema.PROTOBUF. The following example shows how to create the Protobuf schema and use it to instantiate a new producer:

    1. Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
    2. .topic("some-protobuf-topic")
    3. .create();
  • Define Avro schemas with Schema.AVRO. The following code snippet demonstrates how to create and use Avro schema.

    1. Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
    2. .topic("some-avro-topic")
    3. .create();

ProtobufNativeSchema example

For example of ProtobufNativeSchema, see SchemaDefinition in Complex type.

认证

Pulsar currently supports three authentication schemes: TLS, Athenz, and Oauth2. You can use the Pulsar Java client with all of them.

TLS 认证

要使用TLS,你需要使用setUseTls方法设置TLS为true,将您的Pulsar客户端指向TLS证书路径,并提供证书和密钥文件的路径。

The following is an example.

  1. Map<String, String> authParams = new HashMap<>();
  2. authParams.put("tlsCertFile", "/path/to/client-cert.pem");
  3. authParams.put("tlsKeyFile", "/path/to/client-key.pem");
  4. Authentication tlsAuth = AuthenticationFactory
  5. .create(AuthenticationTls.class.getName(), authParams);
  6. PulsarClient client = PulsarClient.builder()
  7. .serviceUrl("pulsar+ssl://my-broker.com:6651")
  8. .enableTls(true)
  9. .tlsTrustCertsFilePath("/path/to/cacert.pem")
  10. .authentication(tlsAuth)
  11. .build();

Athenz

要使用Athenz做为身份认证提供者,你需要use TLS并且在hash提供如下四个参数的值:

  • tenantDomain
  • tenantService
  • providerDomain
  • privateKey

You can also set an optional keyId. The following is an example.

  1. Map<String, String> authParams = new HashMap<>();
  2. authParams.put("tenantDomain", "shopping"); // Tenant domain name
  3. authParams.put("tenantService", "some_app"); // Tenant service name
  4. authParams.put("providerDomain", "pulsar"); // Provider domain name
  5. authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
  6. authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
  7. Authentication athenzAuth = AuthenticationFactory
  8. .create(AuthenticationAthenz.class.getName(), authParams);
  9. PulsarClient client = PulsarClient.builder()
  10. .serviceUrl("pulsar+ssl://my-broker.com:6651")
  11. .enableTls(true)
  12. .tlsTrustCertsFilePath("/path/to/cacert.pem")
  13. .authentication(athenzAuth)
  14. .build();

支持的格式:

privateKey参数支持如下三种格式: * file:///path/to/file * file:/path/to/file * data:application/x-pem-file;base64,<base64-encoded value>

Oauth2

The following example shows how to use Oauth2 as an authentication provider for the Pulsar Java client.

你可以在 Pulsar Java 客户端中使用工厂方法配置身份认证操作。

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://broker.example.com:6650/")
  3. .authentication(
  4. AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
  5. .build();

此外,你也可以在 Pulsar 客户端中使用编码参数来配置身份认证。

  1. Authentication auth = AuthenticationFactory
  2. .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
  3. PulsarClient client = PulsarClient.builder()
  4. .serviceUrl("pulsar://broker.example.com:6650/")
  5. .authentication(auth)
  6. .build();