Work with consumer

After setting up your clients, you can explore more to start working with consumers.

Subscribe to topics

Pulsar has various subscription types to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.

A subscription is identical to the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.

Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.

To better describe their differences, assume you have a topic named “my-topic”, and the producer has published 10 messages.

  • Java
  1. Producer<String> producer = client.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .enableBatching(false)
  4. .create();
  5. // 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
  6. producer.newMessage().key("key-1").value("message-1-1").send();
  7. producer.newMessage().key("key-1").value("message-1-2").send();
  8. producer.newMessage().key("key-1").value("message-1-3").send();
  9. producer.newMessage().key("key-2").value("message-2-1").send();
  10. producer.newMessage().key("key-2").value("message-2-2").send();
  11. producer.newMessage().key("key-2").value("message-2-3").send();
  12. producer.newMessage().key("key-3").value("message-3-1").send();
  13. producer.newMessage().key("key-3").value("message-3-2").send();
  14. producer.newMessage().key("key-4").value("message-4-1").send();
  15. producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive

Create a new consumer and subscribe with the Exclusive subscription type.

  • Java
  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Exclusive)
  5. .subscribe()

Only the first consumer is allowed to the subscription, and other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.

Work with consumer - 图1note

If the topic is partitioned, the first consumer subscribes to all partitioned topics, and other consumers are not assigned with partitions and receive an error.

Failover

Create new consumers and subscribe with the Failover subscription type.

  • Java
  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Failover)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Failover)
  10. .subscribe()
  11. //conumser1 is the active consumer, consumer2 is the standby consumer.
  12. //consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes the active consumer.

If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-2")
  3. ("key-1", "message-1-3")
  4. ("key-2", "message-2-1")
  5. ("key-2", "message-2-2")

consumer2 will receive:

  1. ("key-2", "message-2-3")
  2. ("key-3", "message-3-1")
  3. ("key-3", "message-3-2")
  4. ("key-4", "message-4-1")
  5. ("key-4", "message-4-2")

Work with consumer - 图2note

If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.

Shared

Create new consumers and subscribe with Shared subscription type.

  • Java
  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Shared)
  10. .subscribe()
  11. //Both consumer1 and consumer2 are active consumers.

In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.

If a broker dispatches only one message at a time, consumer1 receives the following information.

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-3")
  3. ("key-2", "message-2-2")
  4. ("key-3", "message-3-1")
  5. ("key-4", "message-4-1")

consumer2 receives the following information.

  1. ("key-1", "message-1-2")
  2. ("key-2", "message-2-1")
  3. ("key-2", "message-2-3")
  4. ("key-3", "message-3-2")
  5. ("key-4", "message-4-2")

The Shared subscription is different from the Exclusive and Failover subscription types. Shared subscription has better flexibility, but cannot provide an ordering guarantee.

Key_shared

This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with Key_Shared subscription type.

  • Java
  1. Consumer consumer1 = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Key_Shared)
  5. .subscribe()
  6. Consumer consumer2 = client.newConsumer()
  7. .topic("my-topic")
  8. .subscriptionName("my-subscription")
  9. .subscriptionType(SubscriptionType.Key_Shared)
  10. .subscribe()
  11. //Both consumer1 and consumer2 are active consumers.

Just like in the Shared subscription, all consumers in the Key_Shared subscription type can attach to the same subscription. But the Key_Shared subscription type is different from the Shared subscription. In the Key_Shared subscription type, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).

consumer1 receives the following information.

  1. ("key-1", "message-1-1")
  2. ("key-1", "message-1-2")
  3. ("key-1", "message-1-3")
  4. ("key-3", "message-3-1")
  5. ("key-3", "message-3-2")

consumer2 receives the following information.

  1. ("key-2", "message-2-1")
  2. ("key-2", "message-2-2")
  3. ("key-2", "message-2-3")
  4. ("key-4", "message-4-1")
  5. ("key-4", "message-4-2")

If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the KeyBasedBatcher.

  • Java
  1. Producer producer = client.newProducer()
  2. .topic("my-topic")
  3. .batcherBuilder(BatcherBuilder.KEY_BASED)
  4. .create();

Or the producer can disable batching.

  • Java
  1. Producer producer = client.newProducer()
  2. .topic("my-topic")
  3. .enableBatching(false)
  4. .create();

Work with consumer - 图3note

If the message key is not specified, messages without keys are dispatched to one consumer in order by default.

Subscribe to multi-topics

In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using multi-topic subscriptions. To use multi-topic subscriptions you can supply either a regular expression (regex) or a List of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.

The followings are some examples.

  • Java
  • Go
  • Python
  1. import org.apache.pulsar.client.api.Consumer;
  2. import org.apache.pulsar.client.api.PulsarClient;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.regex.Pattern;
  6. ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
  7. .subscriptionName(subscription);
  8. // Subscribe to all topics in a namespace
  9. Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
  10. Consumer allTopicsConsumer = consumerBuilder
  11. .topicsPattern(allTopicsInNamespace)
  12. .subscribe();
  13. // Subscribe to a subsets of topics in a namespace, based on regex
  14. Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
  15. Consumer allTopicsConsumer = consumerBuilder
  16. .topicsPattern(someTopicsInNamespace)
  17. .subscribe();

In the above example, the consumer subscribes to the persistent topics that can match the topic name pattern. If you want the consumer subscribes to all persistent and non-persistent topics that can match the topic name pattern, set subscriptionTopicsMode to RegexSubscriptionMode.AllTopics.

  1. Pattern pattern = Pattern.compile("public/default/.*");
  2. pulsarClient.newConsumer()
  3. .subscriptionName("my-sub")
  4. .topicsPattern(pattern)
  5. .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
  6. .subscribe();

Work with consumer - 图4note

By default, the subscriptionTopicsMode of the consumer is PersistentOnly. Available options of subscriptionTopicsMode are PersistentOnly, NonPersistentOnly, and AllTopics.

You can also subscribe to an explicit list of topics (across namespaces if you wish):

  1. List<String> topics = Arrays.asList(
  2. "topic-1",
  3. "topic-2",
  4. "topic-3"
  5. );
  6. Consumer multiTopicConsumer = consumerBuilder
  7. .topics(topics)
  8. .subscribe();
  9. // Alternatively:
  10. Consumer multiTopicConsumer = consumerBuilder
  11. .topic(
  12. "topic-1",
  13. "topic-2",
  14. "topic-3"
  15. )
  16. .subscribe();

You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. The following is an example.

  1. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
  2. consumerBuilder
  3. .topics(topics)
  4. .subscribeAsync()
  5. .thenAccept(this::receiveMessageFromConsumer);
  6. private void receiveMessageFromConsumer(Object consumer) {
  7. ((Consumer)consumer).receiveAsync().thenAccept(message -> {
  8. // Do something with the received message
  9. receiveMessageFromConsumer(consumer);
  10. });
  11. }
  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. topics := []string{"topic-1", "topic-2"}
  8. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  9. // fill `Topics` field will create a multi-topic consumer
  10. Topics: topics,
  11. SubscriptionName: "multi-topic-sub",
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer consumer.Close()
  1. import re
  2. consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
  3. while True:
  4. msg = consumer.receive()
  5. try:
  6. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  7. # Acknowledge successful processing of the message
  8. consumer.acknowledge(msg)
  9. except Exception:
  10. # Message failed to be processed
  11. consumer.negative_acknowledge(msg)
  12. client.close()

Unsubscribe from topics

This example shows how a consumer unsubscribes from a topic.

  • Java
  • C#
  1. consumer.unsubscribe();
  1. await consumer.Unsubscribe();

Work with consumer - 图5note

A consumer cannot be used and is disposed once the consumer unsubscribes from a topic.

Receive messages

This example shows how a consumer receives messages from a topic.

  • Java
  • C#
  1. Message message = consumer.receive();
  1. await foreach (var message in consumer.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

Receive messages with timeout

  • Java
  • Go
  1. consumer.receive(10, TimeUnit.SECONDS);
  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "test-topic-with-no-messages"
  9. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  10. defer cancel()
  11. // create consumer
  12. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  13. Topic: topic,
  14. SubscriptionName: "my-sub1",
  15. Type: pulsar.Shared,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()
  21. // receive message with a timeout
  22. msg, err := consumer.Receive(ctx)
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Println(msg.Payload())

Async receive messages

The receive method receives messages synchronously (the consumer process is blocked until a message is available). You can also use async receive, which returns a CompletableFuture object immediately once a new message is available.

The following is an example.

  • Java
  1. CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

Async receive operations return a Message wrapped inside of a CompletableFuture.

Batch receive messages

Use batchReceive to receive multiple messages for each call.

The following is an example.

  • Java
  1. Messages messages = consumer.batchReceive();
  2. for (Object message : messages) {
  3. // do something
  4. }
  5. consumer.acknowledge(messages)

Work with consumer - 图6note

Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages. The batch receive is completed if any of the following conditions are met: enough number of messages, bytes of messages, wait timeout.

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .batchReceivePolicy(BatchReceivePolicy.builder()
  5. .maxNumMessages(100)
  6. .maxNumBytes(1024 * 1024)
  7. .timeout(200, TimeUnit.MILLISECONDS)
  8. .build())
  9. .subscribe();

The default batch receive policy is:

  1. BatchReceivePolicy.builder()
  2. .maxNumMessage(-1)
  3. .maxNumBytes(10 * 1024 * 1024)
  4. .timeout(100, TimeUnit.MILLISECONDS)
  5. .build();

Acknowledge messages

Messages can be acknowledged individually or cumulatively. For details about message acknowledgment, see acknowledgment.

Acknowledge messages individually

  • Java
  • C#
  1. consumer.acknowledge(msg);
  1. await consumer.Acknowledge(message);

Acknowledge messages cumulatively

  • Java
  • C#
  1. consumer.acknowledgeCumulative(msg);
  1. await consumer.AcknowledgeCumulative(message);

Negative acknowledgment redelivery backoff

The RedeliveryBackoff introduces a redelivery backoff mechanism. You can achieve redelivery with different delays by setting the redelivery count of messages.

  • Java
  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
  5. .minDelayMs(1000)
  6. .maxDelayMs(60 * 1000)
  7. .build())
  8. .subscribe();

Acknowledgment timeout redelivery backoff

The RedeliveryBackoff introduces a redelivery backoff mechanism. You can redeliver messages with different delays by setting the number of times the messages are retried.

  • Java
  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .ackTimeout(10, TimeUnit.SECOND)
  5. .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
  6. .minDelayMs(1000)
  7. .maxDelayMs(60000)
  8. .multiplier(2)
  9. .build())
  10. .subscribe();

The message redelivery behavior should be as follows.

Redelivery countRedelivery delay
110 + 1 seconds
210 + 2 seconds
310 + 4 seconds
410 + 8 seconds
510 + 16 seconds
610 + 32 seconds
710 + 60 seconds
810 + 60 seconds

Work with consumer - 图7note

  • The negativeAckRedeliveryBackoff does not work with consumer.negativeAcknowledge(MessageId messageId) because you are not able to get the redelivery count from the message ID.
  • If a consumer crashes, it triggers the redelivery of unacked messages. In this case, RedeliveryBackoff does not take effect and the messages might get redelivered earlier than the delay time from the backoff.

Configure chunking

You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring specific parameters. When the configured threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later.

The following is an example of how to configure message chunking.

  • Java
  • C++
  • Go
  • Python
  1. Consumer<byte[]> consumer = client.newConsumer()
  2. .topic(topic)
  3. .subscriptionName("test")
  4. .autoAckOldestChunkedMessageOnQueueFull(true)
  5. .maxPendingChunkedMessage(100)
  6. .expireTimeOfIncompleteChunkedMessage(10, TimeUnit.MINUTES)
  7. .subscribe();
  1. ConsumerConfiguration conf;
  2. conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
  3. conf.setMaxPendingChunkedMessage(100);
  4. Consumer consumer;
  5. client.subscribe("my-topic", "my-sub", conf, consumer);

Coming soon…

  1. consumer = client.subscribe(topic, "my-subscription",
  2. max_pending_chunked_message=10,
  3. auto_ack_oldest_chunked_message_on_queue_full=False
  4. )

Create a consumer with a message listener

You can avoid running a loop by blocking calls with an event-based style by using a message listener which is invoked for each message that is received.

  • Java
  • C++
  • Go
  1. Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
  2. .topic("persistent://my-property/my-ns/my-topic")
  3. .subscriptionName("my-subscription")
  4. .messageListener((c, m) -> {
  5. try {
  6. c.acknowledge(m);
  7. } catch (Exception e) {
  8. Assert.fail("Failed to acknowledge", e);
  9. }
  10. })
  11. .subscribe();

This example starts a subscription at the earliest offset and consumes 100 messages.

  1. #include <pulsar/Client.h>
  2. #include <atomic>
  3. #include <thread>
  4. using namespace pulsar;
  5. std::atomic<uint32_t> messagesReceived;
  6. void handleAckComplete(Result res) {
  7. std::cout << "Ack res: " << res << std::endl;
  8. }
  9. void listener(Consumer consumer, const Message& msg) {
  10. std::cout << "Got message " << msg << " with content '" << msg.getDataAsString() << "'" << std::endl;
  11. messagesReceived++;
  12. consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete);
  13. }
  14. int main() {
  15. Client client("pulsar://localhost:6650");
  16. Consumer consumer;
  17. ConsumerConfiguration config;
  18. config.setMessageListener(listener);
  19. config.setSubscriptionInitialPosition(InitialPositionEarliest);
  20. Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
  21. if (result != ResultOk) {
  22. std::cout << "Failed to subscribe: " << result << std::endl;
  23. return -1;
  24. }
  25. // wait for 100 messages to be consumed
  26. while (messagesReceived < 100) {
  27. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  28. }
  29. std::cout << "Finished consuming asynchronously!" << std::endl;
  30. client.close();
  31. return 0;
  32. }
  1. import (
  2. "fmt"
  3. "log"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. defer client.Close()
  12. // we can listen this channel
  13. channel := make(chan pulsar.ConsumerMessage, 100)
  14. options := pulsar.ConsumerOptions{
  15. Topic: "topic-1",
  16. SubscriptionName: "my-subscription",
  17. Type: pulsar.Shared,
  18. // fill `MessageChannel` field will create a listener
  19. MessageChannel: channel,
  20. }
  21. consumer, err := client.Subscribe(options)
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. defer consumer.Close()
  26. // Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
  27. // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
  28. // shared across multiple consumers as well
  29. for cm := range channel {
  30. consumer := cm.Consumer
  31. msg := cm.Message
  32. fmt.Printf("Consumer %s received a message, msgId: %v, content: '%s'\n",
  33. consumer.Name(), msg.ID(), string(msg.Payload()))
  34. consumer.Ack(msg)
  35. }
  36. }

Intercept messages

ConsumerInterceptors intercept and possibly mutate messages received by the consumer.

The interface has six main events:

  • beforeConsume is triggered before the message is returned by receive() or receiveAsync(). You can modify messages within this event.
  • onAcknowledge is triggered before the consumer sends the acknowledgement to the broker.
  • onAcknowledgeCumulative is triggered before the consumer sends the cumulative acknowledgement to the broker.
  • onNegativeAcksSend is triggered when a redelivery from a negative acknowledgement occurs.
  • onAckTimeoutSend is triggered when a redelivery from an acknowledgement timeout occurs.
  • onPartitionsChange is triggered when the partitions of the (partitioned) topic change.

To intercept messages, you can add one or multiple ConsumerInterceptors when creating a Consumer as follows.

  • Java
  1. Consumer<String> consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .intercept(new ConsumerInterceptor<String> {
  5. @Override
  6. public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
  7. // user-defined processing logic
  8. }
  9. @Override
  10. public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
  11. // user-defined processing logic
  12. }
  13. @Override
  14. public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
  15. // user-defined processing logic
  16. }
  17. @Override
  18. public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
  19. // user-defined processing logic
  20. }
  21. @Override
  22. public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
  23. // user-defined processing logic
  24. }
  25. @Override
  26. public void onPartitionsChange(String topicName, int partitions) {
  27. // user-defined processing logic
  28. }
  29. })
  30. .subscribe();

Work with consumer - 图8note

If you are using multiple interceptors, they apply in the order they are passed to the intercept method.