Work with producer

After setting up your clients, you can explore more to start working with producers.

Create the producer

This example shows how to create a producer.

  • Java
  • C++

Create a producer synchronously:

  1. Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .create();

Create a producer asynchronously:

  1. pulsarClient.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .createAsync()
  4. .thenAccept(p -> {
  5. log.info("Producer created: {}", p.getProducerName());
  6. });
  1. Producer producer;
  2. Result result = client.createProducer("my-topic", producer);

Publish messages

Pulsar supports both synchronous and asynchronous publishing of messages in most clients. In some language-specific clients, such as Node.js and C#, you can publish messages synchronously based on the asynchronous method using language-specific mechanisms (like await).

With async publishment, the producer puts the message in a blocking queue and returns it immediately. Then the client library sends the message to the broker in the background. If the queue is full (max size configurable), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer.

This example shows how to publish messages using producers. The publish operation is done until the broker tells you the message has been successfully published. The broker returns the message ID after the message is published successfully.

  • Java
  • C++
  • Go
  • Node.js
  • C#

Publish messages synchronously:

  1. MessageId messageId = producer.newMessage()
  2. .value("my-sync-message")
  3. .send();

Publish messages asynchronously:

  1. producer.newMessage()
  2. .value("my-sync-message")
  3. .sendAsync()
  4. .thenAccept(messageId -> {
  5. log.info("Message ID: {}", messageId);
  6. });
  1. Message msg = MessageBuilder()
  2. .setContent("my-sync-message")
  3. .build();
  4. Result res = producer.send(msg);
  1. msg := pulsar.ProducerMessage{
  2. Payload: []byte("my-sync-message"),
  3. }
  4. if _, err := producer.send(msg); err != nil {
  5. log.Fatalf("Could not publish message due to: %v", err)
  6. }

For all methods of the ProducerMessage object, see Go API doc.

  1. const msg = {
  2. data: Buffer.from('my-sync-message'),
  3. }
  4. await producer.send(msg);

The following keys are available for producer message objects:

ParameterDescription
dataThe actual data payload of the message.
propertiesA Object for any application-specific metadata attached to the message.
eventTimestampThe timestamp associated with the message.
sequenceIdThe sequence ID of the message.
partitionKeyThe optional key associated with the message (particularly useful for things like topic compaction).
replicationClustersThe clusters to which this message is replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
deliverAtThe absolute timestamp at or after which the message is delivered.
deliverAfterThe relative delay after which the message is delivered.

Message object operations

In Pulsar Node.js client, you can receive (or read) message objects as consumers (or readers).

The message object has the following methods available:

MethodDescriptionReturn type
getTopicName()Getter method of topic name.String
getProperties()Getter method of properties.Array<Object>
getData()Getter method of message data.Buffer
getMessageId()Getter method of message id object.Object
getPublishTimestamp()Getter method of publish timestamp.Number
getEventTimestamp()Getter method of event timestamp.Number
getRedeliveryCount()Getter method of redelivery count.Number
getPartitionKey()Getter method of partition key.String

Message ID object operations

In Pulsar Node.js client, you can get message id objects from message objects.

The message id object has the following methods available:

MethodDescriptionReturn type
serialize()Serialize the message id into a Buffer for storing.Buffer
toString()Get message id as String.String

The client has a static method of message id object. You can access it as Pulsar.MessageId.someStaticMethod.

The following static methods are available for the message id object:

MethodDescriptionReturn type
earliest()MessageId representing the earliest, or oldest available message stored in the topic.Object
latest()MessageId representing the latest, or last published message in the topic.Object
deserialize(Buffer)Deserialize a message id object from a Buffer.Object
  1. var data = Encoding.UTF8.GetBytes("Hello World");
  2. await producer.Send(data);

Configure messages

You can set various properties of Pulsar’s messages. The values of these properties are stored in the metadata of a message.

  • Java
  • C++
  • C#
  1. producer.newMessage()
  2. .key("my-key") // Set the message key
  3. .eventTime(System.currentTimeMillis()) // Set the event time
  4. .sequenceId(1203) // Set the sequenceId for the deduplication purposes
  5. .deliverAfter(1, TimeUnit.HOURS) // Delay message delivery for 1 hour
  6. .property("my-key", "my-value") // Set the customized metadata
  7. .property("my-other-key", "my-other-value")
  8. .replicationClusters(
  9. Lists.newArrayList("r1", "r2")) // Set the geo-replication clusters for this message.
  10. .value("content")
  11. .send();

For the Java client, you can also use loadConf to configure the message metadata. Here is an example:

  1. Map<String, Object> conf = new HashMap<>();
  2. conf.put("key", "my-key");
  3. conf.put("eventTime", System.currentTimeMillis());
  4. producer.newMessage()
  5. .value("my-message")
  6. .loadConf(conf)
  7. .send();
  1. Message msg = MessageBuilder()
  2. .setContent("content")
  3. .setProperty("my-key", "my-value")
  4. .setProperty("my-other-key", "my-other-value")
  5. .setDeliverAfter(std::chrono::minutes(3)) // Delay message delivery for 3 minutes
  6. .build();
  7. Result res = producer.send(msg);
  1. ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
  2. Payload: []byte(fmt.Sprintf("content")),
  3. DeliverAfter: 3 * time.Second, // Delay message delivery for 3 seconds
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  1. var messageId = await producer.NewMessage()
  2. .Property("SomeKey", "SomeValue")
  3. .Send(data);

Publish messages to partitioned topics

By default, Pulsar topics are served by a single broker, which limits the maximum throughput of a topic. Partitioned topics can span multiple brokers and thus allow for higher throughput.

You can publish messages to partitioned topics using Pulsar client libraries. When publishing messages to partitioned topics, you must specify a routing mode. If you do not specify any routing mode when you create a new producer, the round-robin routing mode is used.

Use built-in message router

The routing mode determines which partition (internal topic) each message should be published to.

The following is an example:

  • Java
  • C++
  • Go
  1. Producer<byte[]> producer = pulsarClient.newProducer()
  2. .topic("my-topic")
  3. .messageRoutingMode(MessageRoutingMode.SinglePartition)
  4. .create();
  1. #include "lib/RoundRobinMessageRouter.h" // Make sure include this header file
  2. Producer producer;
  3. Result result = client.createProducer(
  4. "my-topic",
  5. ProducerConfiguration().setMessageRouter(std::make_shared<RoundRobinMessageRouter>(
  6. ProducerConfiguration::BoostHash, true, 1000, 100000, boost::posix_time::seconds(1))),
  7. producer);
  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. MessageRouter: func(msg *pulsar.ProducerMessage, tm pulsar.TopicMetadata) int {
  4. fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing message ", msg, " to partition 2.")
  5. // always push msg to partition 2
  6. return 2
  7. },
  8. })

Customize message router

  • Java
  • C++
  • Go

To use a custom message router, you need to provide an implementation of the MessageRouter interface, which has just one choosePartition method:

  1. public interface MessageRouter extends Serializable {
  2. int choosePartition(Message msg);
  3. }

The following router routes every message to partition 10:

  1. public class AlwaysTenRouter implements MessageRouter {
  2. public int choosePartition(Message msg) {
  3. return 10;
  4. }
  5. }

With that implementation, you can send messages to partitioned topics as below.

  1. Producer<byte[]> producer = pulsarClient.newProducer()
  2. .topic("my-topic")
  3. .messageRouter(new AlwaysTenRouter())
  4. .create();
  5. producer.send("Partitioned topic message".getBytes());

To use a custom message router, you need to provide an implementation of the MessageRoutingPolicy interface, which has one getPartition method:

  1. class MessageRouter : public MessageRoutingPolicy {
  2. public:
  3. MessageRouter() : {}
  4. int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
  5. // The implementation of getPartition
  6. }
  7. };

The following router routes every message to partition 10:

  1. class MessageRouter : public MessageRoutingPolicy {
  2. public:
  3. MessageRouter() {}
  4. int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
  5. return 10;
  6. }
  7. };

With that implementation, you can send messages to partitioned topics as below.

  1. Producer producer;
  2. Result result = client.createProducer(
  3. "my-topic",
  4. ProducerConfiguration().setMessageRouter(std::make_shared<MessageRouter>()),
  5. producer);
  6. Message msg = MessageBuilder().setContent("content").build();
  7. result = producer.send(msg);

In the Go client, you can configure a customized message router by passing a function.

  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. MessageRouter: func(msg *pulsar.ProducerMessage, tm pulsar.TopicMetadata) int {
  4. fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing message ", msg, " to partition 10.")
  5. // always push msg to partition 10
  6. return 10
  7. },
  8. })

Choose partitions when using a key

If a message has a key, it supersedes the round robin routing policy. The following java example code illustrates how to choose the partition when using a key.

  1. // If the message has a key, it supersedes the round robin routing policy
  2. if (msg.hasKey()) {
  3. return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
  4. }
  5. if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
  6. long currentMs = clock.millis();
  7. return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
  8. } else {
  9. return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
  10. }

Enable chunking

Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages on the consumer side.

The message chunking feature is OFF by default. The following is an example of how to enable message chunking when creating a producer.

  • Java
  • C++
  • Go
  • Python
  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .enableChunking(true)
  4. .enableBatching(false)
  5. .create();
  1. ProducerConfiguration conf;
  2. conf.setBatchingEnabled(false);
  3. conf.setChunkingEnabled(true);
  4. Producer producer;
  5. client.createProducer("my-topic", conf, producer);
  1. // The message chunking feature is OFF by default.
  2. // By default, a producer chunks the large message based on the max message size (`maxMessageSize`) configured at the broker side (for example, 5MB).
  3. // Client can also configure the max chunked size using the producer configuration `ChunkMaxMessageSize`.
  4. // Note: to enable chunking, you need to disable batching (`DisableBatching=true`) concurrently.
  5. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  6. Topic: "my-topic",
  7. DisableBatching: true,
  8. EnableChunking: true,
  9. })
  10. if err != nil {
  11. log.Fatal(err)
  12. }
  13. defer producer.Close()
  1. producer = client.create_producer(
  2. topic,
  3. chunking_enabled=True
  4. )

By default, producer chunks the large message based on max message size (maxMessageSize) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration chunkMaxMessageSize.

Work with producer - 图1note

To enable chunking, you need to disable batching (enableBatching\=false) concurrently.

Intercept messages

ProducerInterceptor intercepts and possibly mutates messages received by the producer before they are published to the brokers.

The interface has three main events:

  • eligible checks if the interceptor can be applied to the message.
  • beforeSend is triggered before the producer sends the message to the broker. You can modify messages within this event.
  • onSendAcknowledgement is triggered when the message is acknowledged by the broker or the sending failed.

To intercept messages, you can add a ProducerInterceptor or multiple ones when creating a Producer as follows.

  • Java
  • C++
  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .intercept(new ProducerInterceptor {
  4. @Override
  5. boolean eligible(Message message) {
  6. return true; // process all messages
  7. }
  8. @Override
  9. Message beforeSend(Producer producer, Message message) {
  10. // user-defined processing logic
  11. }
  12. @Override
  13. void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
  14. // user-defined processing logic
  15. }
  16. })
  17. .create();

Implement the custom interceptor:

  1. class MyInterceptor : public ProducerInterceptor {
  2. public:
  3. MyInterceptor() {}
  4. Message beforeSend(const Producer& producer, const Message& message) override {
  5. // Your implementation code
  6. return message;
  7. }
  8. void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
  9. const MessageId& messageID) override {
  10. // Your implementation code
  11. }
  12. void close() override {
  13. // Your implementation code
  14. }
  15. };

Configue the producer:

  1. ProducerConfiguration conf;
  2. conf.intercept({std::make_shared<MyInterceptor>(),
  3. std::make_shared<MyInterceptor>()}); // You can add multiple interceptors to the same producer
  4. Producer producer;
  5. client.createProducer(topic, conf, producer);

Work with producer - 图2note

Multiple interceptors apply in the order they are passed to the intercept method.

Configure encryption policies

The Pulsar C# client supports four kinds of encryption policies:

  • EnforceUnencrypted: always use unencrypted connections.
  • EnforceEncrypted: always use encrypted connections)
  • PreferUnencrypted: use unencrypted connections, if possible.
  • PreferEncrypted: use encrypted connections, if possible.

This example shows how to set the EnforceUnencrypted encryption policy.

  • C#
  1. using DotPulsar;
  2. var client = PulsarClient.Builder()
  3. .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
  4. .Build();

Configure access mode

Access mode allows applications to require exclusive producer access on a topic to achieve a “single-writer” situation.

This example shows how to set producer access mode.

  • Java
  • C++

Work with producer - 图3note

This feature is supported in Java client 2.8.0 or later versions.

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .accessMode(ProducerAccessMode.Exclusive)
  4. .create();

Work with producer - 图4note

This feature is supported in C++ client 3.1.0 or later versions.

  1. Producer producer;
  2. ProducerConfiguration producerConfiguration;
  3. producerConfiguration.setAccessMode(ProducerConfiguration::Exclusive);
  4. client.createProducer(topicName, producerConfiguration, producer);