Pulsar binary protocol specification

Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency.

Clients and brokers exchange commands with each other. Commands are formatted as binary protocol buffer (aka protobuf) messages. The format of protobuf commands is specified in the PulsarApi.proto file and also documented in the Protobuf interface section below.

Connection sharing

Commands for different producers and consumers can be interleaved and sent through the same connection without restriction.

All commands associated with Pulsar's protocol are contained in aBaseCommand protobuf message that includes a Typeenum with all possible subcommands as optional fields. BaseCommand messages can specify only one subcommand.

Framing

Since protobuf doesn't provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB.

The Pulsar protocol allows for two types of commands:

  • Simple commands that do not carry a message payload.
  • Payload commands that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf metadata and then the payload, which is passed in raw format outside of protobuf. All sizes are passed as 4-byte unsigned big endian integers.

Message payloads are passed in raw format rather than protobuf format for efficiency reasons.

Simple commands

Simple (payload-free) commands have this basic structure:

ComponentDescriptionSize (in bytes)
totalSizeThe size of the frame, counting everything that comes after it (in bytes)4
commandSizeThe size of the protobuf-serialized command4
messageThe protobuf message serialized in a raw binary format (rather than in protobuf format)

Payload commands

Payload commands have this basic structure:

ComponentDescriptionSize (in bytes)
totalSizeThe size of the frame, counting everything that comes after it (in bytes)4
commandSizeThe size of the protobuf-serialized command4
messageThe protobuf message serialized in a raw binary format (rather than in protobuf format)
magicNumberA 2-byte byte array (0x0e01) identifying the current format2
checksumA CRC32-C checksum of everything that comes after it4
metadataSizeThe size of the message metadata4
metadataThe message metadata stored as a binary protobuf message
payloadAnything left in the frame is considered the payload and can include any sequence of bytes

Message metadata

Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.

FieldDescription
producernameThe name of the producer that published the message
sequence_idThe sequence ID of the message, assigned by producer
publish_timeThe publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC)
propertiesA sequence of key/value pairs (using the KeyValue message). These are application-defined keys and values with no special meaning to Pulsar.
replicated_from (optional)Indicates that the message has been replicated and specifies the name of the cluster where the message was originally published
partition_key (optional)While publishing on a partition topic, if the key is present, the hash of the key is used to determine which partition to choose
compression (optional)Signals that payload has been compressed and with which compression library
uncompressed_size (optional)If compression is used, the producer must fill the uncompressed size field with the original payload size
num_messages_in_batch (optional)_If this message is really a batch of multiple entries, this field must be set to the number of messages in the batch

Batch messages

When using batch messages, the payload will be containing a list of entries,each of them with its individual metadata, defined by the SingleMessageMetadataobject.

For a single batch, the payload format will look like this:

FieldDescription
metadataSizeNThe size of the single message metadata serialized Protobuf
metadataNSingle message metadata
payloadNMessage payload passed by application

Each metadata field looks like this;

FieldDescription
propertiesApplication-defined properties
partition key (optional)Key to indicate the hashing to a particular partition
payload_sizeSize of the payload for the single message in the batch

When compression is enabled, the whole batch will be compressed at once.

Interactions

Connection establishment

After opening a TCP connection to a broker, typically on port 6650, the clientis responsible to initiate the session.

Connect interaction

After receiving a Connected response from the broker, the client canconsider the connection ready to use. Alternatively, if the broker doesn'tvalidate the client authentication, it will reply with an Error command andclose the TCP connection.

Example:

  1. message CommandConnect {
  2. "client_version" : "Pulsar-Client-Java-v1.15.2",
  3. "auth_method_name" : "my-authentication-plugin",
  4. "auth_data" : "my-auth-data",
  5. "protocol_version" : 6
  6. }

Fields:

  • client_version → String based identifier. Format is not enforced
  • authmethod_name(optional)_ Name of the authentication plugin if authenabled
  • authdata(optional)_ Plugin specific authentication data
  • protocol_version → Indicates the protocol version supported by theclient. Broker will not send commands introduced in newer revisions of theprotocol. Broker might be enforcing a minimum version
  1. message CommandConnected {
  2. "server_version" : "Pulsar-Broker-v1.15.2",
  3. "protocol_version" : 6
  4. }

Fields:

  • server_version → String identifier of broker version
  • protocol_version → Protocol version supported by the broker. Clientmust not attempt to send commands introduced in newer revisions of theprotocol

Keep Alive

To identify prolonged network partitions between clients and brokers or casesin which a machine crashes without interrupting the TCP connection on the remoteend (eg: power outage, kernel panic, hard reboot…), we have introduced amechanism to probe for the availability status of the remote peer.

Both clients and brokers are sending Ping commands periodically and they willclose the socket if a Pong response is not received within a timeout (defaultused by broker is 60s).

A valid implementation of a Pulsar client is not required to send the Pingprobe, though it is required to promptly reply after receiving one from thebroker in order to prevent the remote side from forcibly closing the TCP connection.

Producer

In order to send messages, a client needs to establish a producer. When creatinga producer, the broker will first verify that this particular client isauthorized to publish on the topic.

Once the client gets confirmation of the producer creation, it can publishmessages to the broker, referring to the producer id negotiated before.

Producer interaction

Command Producer
  1. message CommandProducer {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "producer_id" : 1,
  4. "request_id" : 1
  5. }

Parameters:

  • topic → Complete topic name to where you want to create the producer on
  • producer_id → Client generated producer identifier. Needs to be uniquewithin the same connection
  • request_id → Identifier for this request. Used to match the response withthe originating request. Needs to be unique within the same connection
  • producername(optional)_ If a producer name is specified, the name willbe used, otherwise the broker will generate a unique name. Generatedproducer name is guaranteed to be globally unique. Implementations areexpected to let the broker generate a new producer name when the produceris initially created, then reuse it when recreating the producer afterreconnections.The broker will reply with either ProducerSuccess or Error commands.
Command ProducerSuccess
  1. message CommandProducerSuccess {
  2. "request_id" : 1,
  3. "producer_name" : "generated-unique-producer-name"
  4. }

Parameters:

  • request_id → Original id of the CreateProducer request
  • producer_name → Generated globally unique producer name or the namespecified by the client, if any.
Command Send

Command Send is used to publish a new message within the context of analready existing producer. This command is used in a frame that includes commandas well as message payload, for which the complete format is specified in thepayload commands section.

  1. message CommandSend {
  2. "producer_id" : 1,
  3. "sequence_id" : 0,
  4. "num_messages" : 1
  5. }

Parameters:

  • producer_id → id of an existing producer
  • sequence_id → each message has an associated sequence id which is expectedto be implemented with a counter starting at 0. The SendReceipt thatacknowledges the effective publishing of a messages will refer to it byits sequence id.
  • nummessages(optional)_ Used when publishing a batch of messages atonce.
Command SendReceipt

After a message has been persisted on the configured number of replicas, thebroker will send the acknowledgment receipt to the producer.

  1. message CommandSendReceipt {
  2. "producer_id" : 1,
  3. "sequence_id" : 0,
  4. "message_id" : {
  5. "ledgerId" : 123,
  6. "entryId" : 456
  7. }
  8. }

Parameters:

  • producer_id → id of producer originating the send request
  • sequence_id → sequence id of the published message
  • message_id → message id assigned by the system to the published messageUnique within a single cluster. Message id is composed of 2 longs, ledgerIdand entryId, that reflect that this unique id is assigned when appendingto a BookKeeper ledger
Command CloseProducer

Note: This command can be sent by either producer or broker.

When receiving a CloseProducer command, the broker will stop accepting anymore messages for the producer, wait until all pending messages are persistedand then reply Success to the client.

The broker can send a CloseProducer command to client when it's performinga graceful failover (eg: broker is being restarted, or the topic is being unloadedby load balancer to be transferred to a different broker).

When receiving the CloseProducer, the client is expected to go through theservice discovery lookup again and recreate the producer again. The TCPconnection is not affected.

Consumer

A consumer is used to attach to a subscription and consume messages from it.After every reconnection, a client needs to subscribe to the topic. If asubscription is not already there, a new one will be created.

Consumer

Flow control

After the consumer is ready, the client needs to give permission to thebroker to push messages. This is done with the Flow command.

A Flow command gives additional permits to send messages to the consumer.A typical consumer implementation will use a queue to accumulate these messagesbefore the application is ready to consume them.

After the application has dequeued a number of message, the consumer willsend additional number of permits to allow the broker to push more messages.

Command Subscribe
  1. message CommandSubscribe {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "subscription" : "my-subscription-name",
  4. "subType" : "Exclusive",
  5. "consumer_id" : 1,
  6. "request_id" : 1
  7. }

Parameters:

  • topic → Complete topic name to where you want to create the consumer on
  • subscription → Subscription name
  • subType → Subscription type: Exclusive, Shared, Failover
  • consumer_id → Client generated consumer identifier. Needs to be uniquewithin the same connection
  • request_id → Identifier for this request. Used to match the response withthe originating request. Needs to be unique within the same connection
  • consumername(optional) Clients can specify a consumer name. Thisname can be used to track a particular consumer in the stats. Also, inFailover subscription type, the name is used to decide which consumer iselected as _master (the one receiving messages): consumers are sorted bytheir consumer name and the first one is elected master.
Command Flow
  1. message CommandFlow {
  2. "consumer_id" : 1,
  3. "messagePermits" : 1000
  4. }

Parameters:

  • consumer_id → Id of an already established consumer
  • messagePermits → Number of additional permits to grant to the broker forpushing more messages
Command Message

Command Message is used by the broker to push messages to an existing consumer,within the limits of the given permits.

This command is used in a frame that includes the message payload as well, forwhich the complete format is specified in the payload commandssection.

  1. message CommandMessage {
  2. "consumer_id" : 1,
  3. "message_id" : {
  4. "ledgerId" : 123,
  5. "entryId" : 456
  6. }
  7. }
Command Ack

An Ack is used to signal to the broker that a given message has beensuccessfully processed by the application and can be discarded by the broker.

In addition, the broker will also maintain the consumer position based on theacknowledged messages.

  1. message CommandAck {
  2. "consumer_id" : 1,
  3. "ack_type" : "Individual",
  4. "message_id" : {
  5. "ledgerId" : 123,
  6. "entryId" : 456
  7. }
  8. }

Parameters:

  • consumer_id → Id of an already established consumer
  • ack_type → Type of acknowledgment: Individual or Cumulative
  • message_id → Id of the message to acknowledge
  • validationerror(optional)_ Indicates that the consumer has discardedthe messages due to: UncompressedSizeCorruption,DecompressionError, ChecksumMismatch, BatchDeSerializeError
Command CloseConsumer

Note: This command can be sent by either producer or broker.

This command behaves the same as CloseProducer

Command RedeliverUnacknowledgedMessages

A consumer can ask the broker to redeliver some or all of the pending messagesthat were pushed to that particular consumer and not yet acknowledged.

The protobuf object accepts a list of message ids that the consumer wants tobe redelivered. If the list is empty, the broker will redeliver all thepending messages.

On redelivery, messages can be sent to the same consumer or, in the case of ashared subscription, spread across all available consumers.

Command ReachedEndOfTopic

This is sent by a broker to a particular consumer, whenever the topichas been "terminated" and all the messages on the subscription wereacknowledged.

The client should use this command to notify the application that no moremessages are coming from the consumer.

Command ConsumerStats

This command is sent by the client to retreive Subscriber and Consumer levelstats from the broker.Parameters:

  • request_id → Id of the request, used to correlate the requestand the response.
  • consumer_id → Id of an already established consumer.
Command ConsumerStatsResponse

This is the broker's response to ConsumerStats request by the client.It contains the Subscriber and Consumer level stats of the consumer_id sent in the request.If the error_code or the error_message field is set it indicates that the request has failed.

Command Unsubscribe

This command is sent by the client to unsubscribe the consumer_id from the associated topic.Parameters:

  • request_id → Id of the request.
  • consumer_id → Id of an already established consumer which needs to unsubscribe.

Service discovery

Topic lookup

Topic lookup needs to be performed each time a client needs to create orreconnect a producer or a consumer. Lookup is used to discover which particularbroker is serving the topic we are about to use.

Lookup can be done with a REST call as described in theadmin APIdocs.

Since Pulsar-1.16 it is also possible to perform the lookup within the binaryprotocol.

For the sake of example, let's assume we have a service discovery componentrunning at pulsar://broker.example.com:6650

Individual brokers will be running at pulsar://broker-1.example.com:6650,pulsar://broker-2.example.com:6650, …

A client can use a connection to the discovery service host to issue aLookupTopic command. The response can either be a broker hostname toconnect to, or a broker hostname to which retry the lookup.

The LookupTopic command has to be used in a connection that has alreadygone through the Connect / Connected initial handshake.

Topic lookup

  1. message CommandLookupTopic {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "request_id" : 1,
  4. "authoritative" : false
  5. }

Fields:

  • topic → Topic name to lookup
  • request_id → Id of the request that will be passed with its response
  • authoritative → Initial lookup request should use false. When following aredirect response, client should pass the same value contained in theresponse
LookupTopicResponse

Example of response with successful lookup:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Connect",
  4. "brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
  6. "authoritative" : true
  7. }

Example of lookup response with redirection:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Redirect",
  4. "brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
  6. "authoritative" : true
  7. }

In this second case, we need to reissue the LookupTopic command requestto broker-2.example.com and this broker will be able to give a definitiveanswer to the lookup request.

Partitioned topics discovery

Partitioned topics metadata discovery is used to find out if a topic is a"partitioned topic" and how many partitions were set up.

If the topic is marked as "partitioned", the client is expected to createmultiple producers or consumers, one for each partition, using the partition-Xsuffix.

This information only needs to be retrieved the first time a producer orconsumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similar to the topiclookup. The client send a request to the service discovery address and theresponse will contain actual metadata.

Command PartitionedTopicMetadata
  1. message CommandPartitionedTopicMetadata {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "request_id" : 1
  4. }

Fields:

  • topic → the topic for which to check the partitions metadata
  • request_id → Id of the request that will be passed with its response
Command PartitionedTopicMetadataResponse

Example of response with metadata:

  1. message CommandPartitionedTopicMetadataResponse {
  2. "request_id" : 1,
  3. "response" : "Success",
  4. "partitions" : 32
  5. }

Protobuf interface

All Pulsar's Protobuf definitions can be found here.