Pulsar binary protocol specification

Pulsar使用自定义二进制协议在 Producers/Consumers 和 Brokers 之间进行通信。 该协议旨在支持必需的功能,例如确认和流控制,同时确保最大的传输和执行效率。

Clients and brokers exchange commands with each other. Commands are formatted as binary protocol buffer (aka protobuf) messages. 在 PulsarApi.proto 中指定了 protobuf 命令的格式,并在下面的章节的 Protobuf interface 中记录。

连接共享

不同生产者和消费者的指令可以不受限制地相互交错,并通过相同的连接发送。

与 Pulsar 协议相关的所有命令都包含在BaseCommand协议消息内,它包含了一个所有可能子命令的可选字段的 类型 枚举 列表。 BaseCommand 消息只能指定一个子命令。

框架

因为 protobuf 不提供任何类型的消息帧,所以 Pulsar 协议的所有消息都是以指定帧大小的4字节字段为前缀。 单个帧的最大可用大小是5MB。

Pulsar 协议允许两个类型的命令:

  1. Simple commands that do not carry a message payload.
  2. Payload commands that bear a payload that is used when publishing or delivering messages. 在有效载荷命令中,protobuf 命令数据包含 protobuf 元数据和原始消息内容,消息以原始格式进行传递。 所有消息都是以4字节无符号大枚举整数的形式进行传递。

为了提高效率,消息有效负载以原始格式而不是原buf格式传递。

简单命令

简单消息(无有效载荷)的基本结构如下:

组件说明长度 (in bytes)
totalSize帧的总长度,帧中所有字段的总长度,以字节为单位。4
commandSizeProtobuf 序列化命令的长度4
message以原始二进制格式(而不是 protobuf 格式)序列化的 protobuf 消息

有效载荷命令

有效载荷命令的基本结构如下:

组件说明长度 (in bytes)
totalSize帧的总长度,帧中所有字段的总长度,以字节为单位。4
commandSizeProtobuf 序列化命令的长度4
message以原始二进制格式(而不是 protobuf 格式)序列化的 protobuf 消息
magicNumber消息格式版本号,是一个2 byte长的字节数组(如0x0e01)2
checksumCRC32-C 校验码,对之后的所有内容(metadataSize + metadata + payload)生成的一个校验码,用来校验是否有数据丢失或者被篡改。4
metadataSize消息元数据的长度4
metadata消息元数据存储成二进制 protobuf 消息。
payload帧中剩下的任何内容都被认为是有效负载,可以包括任何字节序列。

消息元数据

Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.

字段说明
producer_name发布消息的生产者的名称
sequence_id生产者指定的消息的序号ID
publish_time生产者发布消息的时间,Unix 时间戳格式(即自1970年1月1日起的毫秒数)
properties键/值对序列(使用Key Value格式的消息)。 这是应用指定的键和值,在 Pulsar 中没有特殊的意义。
replicated_from (optional)发送消息的时候,指定消息能被复制的目标集群的集群名称。
partition_key (optional)当发布消息到分区主题的时候,如果key存在,将根据该 key 的哈希值来选择消息存储的目标分区。
compression (optional)表示消息内容将被哪种压缩协议进行压缩。
uncompressed_size (optional)如果使用压缩,生产者必须用原始有效载荷大小填充未压缩的大小字段
num_messages_in_batch (optional)如果消息是多个消息组成的批量消息,则这个字段必须指定这个批消息里面包含的消息的数量。

批量消息

当使用批量消息的时候,payload 字段将包含多个消息条目。每个消息条目都有对应的元数据,定义了SingleMessageMetadata 结构来表示单条消息的元数据。

对于单个批次,payload 字段的格式如下:

字段说明
metadataSizeN单条消息元数据序列为 Protobuf 格式数据后的大小。
metadataN单条消息元数据
payloadN应用发送的消息内容

每条元数据的格式如下:

字段说明
properties应用程序定义的属性
partition key (optional)指定某个分区的散列的键。
payload_size批量消息里面的单条消息的消息内容的长度。

当启用压缩时,batch 消息只会被压缩一次。

数据交互

建立连接

当建立到 broker 的 TCP 连接后,通常连接到 broker 的6650端口,客户端将负责初始化这个会话。

Connect interaction

收到 broker 返回的 连接成功的信息后,客户端认为连接已经准备就绪。 或者,如果 broker 校验客户端权限没有通过,broker 将返回一个错误信息,同时关闭 TCP 连接。

例子:

  1. message CommandConnect {
  2. "client_version" : "Pulsar-Client-Java-v1.15.2",
  3. "auth_method_name" : "my-authentication-plugin",
  4. "auth_data" : "my-auth-data",
  5. "protocol_version" : 6
  6. }

Fields:

  • client_version → String based identifier. Format is not enforced
  • auth_method_name(optional) Name of the authentication plugin if auth enabled
  • auth_data(optional) Plugin specific authentication data
  • protocol_version → Indicates the protocol version supported by the client. Broker will not send commands introduced in newer revisions of the protocol. Broker 可能会强制选择一个最小的协议版本。
  1. message CommandConnected {
  2. "server_version" : "Pulsar-Broker-v1.15.2",
  3. "protocol_version" : 6
  4. }

Fields:

  • server_version → String identifier of broker version
  • protocol_version → Protocol version supported by the broker. Client must not attempt to send commands introduced in newer revisions of the protocol

保持连接

为了识别客户端和 broker 之间的网络分区情况,或者机器崩溃但是TCP连接却没有正常断开(断电,内核异常,机器重启等)。我们介绍一个探测远程对端服务的可用状态的策略。

客户端和 broker 之间会定期发送 Ping命令。如果超时时间(默认使用 broker 的配置60s)内没有返回Pong消息就会断开该 Socket 连接。

一个可用的 Pulsar 客户端不需要自己实现定时发送 Ping 探测的逻辑。但是它需要在收到 broker 的信息后立即回复,以防 broker 强制关闭 TCP 连接。

生产者(Producer)

In order to send messages, a client needs to establish a producer. When creating a producer, the broker will first verify that this particular client is authorized to publish on the topic.

一旦生产者创建成功,通过使用建立连接过程中生成的生产者 ID,它将可以发布消息到 broker。

Producer interaction

生产命令
  1. message CommandProducer {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "producer_id" : 1,
  4. "request_id" : 1
  5. }

Parameters:

  • topic → Complete topic name to where you want to create the producer on
  • producer_id → Client generated producer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • producer_name(optional) If a producer name is specified, the name will be used, otherwise the broker will generate a unique name. Generated producer name is guaranteed to be globally unique. Implementations are expected to let the broker generate a new producer name when the producer is initially created, then reuse it when recreating the producer after reconnections.

broker 将返回生产成功或者错误命令给客户端。

生产成功命令
  1. message CommandProducerSuccess {
  2. "request_id" : 1,
  3. "producer_name" : "generated-unique-producer-name"
  4. }

Parameters:

  • request_id → Original id of the CreateProducer request
  • producer_name → Generated globally unique producer name or the name specified by the client, if any.
发送命令

命令 Send 是给一个已经存在的生产者在上下文中发布新的消息的。 此命令用在消息帧中,此消息帧包含了该命令及消息的有效载荷。payload commands包含了完整的命令格式。

  1. message CommandSend {
  2. "producer_id" : 1,
  3. "sequence_id" : 0,
  4. "num_messages" : 1
  5. }

Parameters:

  • producer_id → id of an existing producer
  • sequence_id → each message has an associated sequence id which is expected to be implemented with a counter starting at 0. The SendReceipt that acknowledges the effective publishing of a messages will refer to it by its sequence id.
  • num_messages(optional) Used when publishing a batch of messages at once.
发送消息返回的命令

当消息在配置的存储副本数中都存储完成后,broker 将给生产者发送确认消息。

  1. message CommandSendReceipt {
  2. "producer_id" : 1,
  3. "sequence_id" : 0,
  4. "message_id" : {
  5. "ledgerId" : 123,
  6. "entryId" : 456
  7. }
  8. }

Parameters:

  • producer_id → id of producer originating the send request
  • sequence_id → sequence id of the published message
  • message_id → message id assigned by the system to the published message Unique within a single cluster. Message id is composed of 2 longs, ledgerId and entryId, that reflect that this unique id is assigned when appending to a BookKeeper ledger
关闭生产者命令

Note: This command can be sent by either producer or broker.

当 broker 收到CloseProducer命令时,broker 将停止接收这个生产者的任何消息。并会一直等待,直到所有的消息被持久化存储并给客户端返回Success的消息后,才关闭连接。

当 broker 在演练故障转移(比如: broker重启或者正在通过负载均衡器卸载主题到其他的broker)时,Broker 也能发送CloseProducer 命令给客户端,通知关闭该连接。

When receiving the CloseProducer, the client is expected to go through the service discovery lookup again and recreate the producer again. The TCP connection is not affected.

消费者(Consumer)

消费者可以添加到订阅,并消费来自该订阅的消息。 每次重新连接后,客户端必须去订阅此主题。 如果订阅不存在,将创建一个新的订阅。

消费者(Consumer)

流量控制

After the consumer is ready, the client needs to give permission to the broker to push messages. This is done with the Flow command.

A Flow command gives additional permits to send messages to the consumer. 典型的使用者实现将在应用程序准备使用这些消息之前使用队列来累积这些消息。

应用程序将队列中一半的消息从队列中取出后,消费者会向 broker 发送许可请求更多消息(等于队列中一半的消息)。

For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue. Then the consumer sends permits to the broker to ask for 500 messages.

订阅命令
  1. message CommandSubscribe {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "subscription" : "my-subscription-name",
  4. "subType" : "Exclusive",
  5. "consumer_id" : 1,
  6. "request_id" : 1
  7. }

Parameters:

  • topic → Complete topic name to where you want to create the consumer on
  • subscription → Subscription name
  • subType → Subscription type: Exclusive, Shared, Failover, Key_Shared
  • consumer_id → Client generated consumer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • consumer_name(optional) Clients can specify a consumer name. This name can be used to track a particular consumer in the stats. Also, in Failover subscription type, the name is used to decide which consumer is elected as master (the one receiving messages): consumers are sorted by their consumer name and the first one is elected master.
流控命令
  1. message CommandFlow {
  2. "consumer_id" : 1,
  3. "messagePermits" : 1000
  4. }

Parameters:

  • consumer_id → Id of an already established consumer
  • messagePermits → Number of additional permits to grant to the broker for pushing more messages
消息命令

Command Message is used by the broker to push messages to an existing consumer, within the limits of the given permits.

This command is used in a frame that includes the message payload as well, for which the complete format is specified in the payload commands section.

  1. message CommandMessage {
  2. "consumer_id" : 1,
  3. "message_id" : {
  4. "ledgerId" : 123,
  5. "entryId" : 456
  6. }
  7. }
确认命令

An Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

In addition, the broker will also maintain the consumer position based on the acknowledged messages.

  1. message CommandAck {
  2. "consumer_id" : 1,
  3. "ack_type" : "Individual",
  4. "message_id" : {
  5. "ledgerId" : 123,
  6. "entryId" : 456
  7. }
  8. }

Parameters:

  • consumer_id → Id of an already established consumer
  • ack_type → Type of acknowledgment: Individual or Cumulative
  • message_id → Id of the message to acknowledge
  • validation_error(optional) Indicates that the consumer has discarded the messages due to: UncompressedSizeCorruption, DecompressionError, ChecksumMismatch, BatchDeSerializeError
关闭消费者命令

Note: This command can be sent by either producer or broker.

This command behaves the same as CloseProducer

重新发送未确认消息命令

A consumer can ask the broker to redeliver some or all of the pending messages that were pushed to that particular consumer and not yet acknowledged.

The protobuf object accepts a list of message ids that the consumer wants to be redelivered. If the list is empty, the broker will redeliver all the pending messages.

On redelivery, messages can be sent to the same consumer or, in the case of a shared subscription, spread across all available consumers.

终止消费主题命令

This is sent by a broker to a particular consumer, whenever the topic has been “terminated” and all the messages on the subscription were acknowledged.

The client should use this command to notify the application that no more messages are coming from the consumer.

获取消费统计数据命令

This command is sent by the client to retreive Subscriber and Consumer level stats from the broker. Parameters:

  • request_id → Id of the request, used to correlate the request and the response.
  • consumer_id → Id of an already established consumer.
消费统计数据返回命令

This is the broker’s response to ConsumerStats request by the client. It contains the Subscriber and Consumer level stats of the consumer_id sent in the request. If the error_code or the error_message field is set it indicates that the request has failed.

解除订阅命令

This command is sent by the client to unsubscribe the consumer_id from the associated topic. Parameters:

  • request_id → Id of the request.
  • consumer_id → Id of an already established consumer which needs to unsubscribe.

服务发现

搜索主题

Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

Lookup can be done with a REST call as described in the admin API docs.

Since Pulsar-1.16 it is also possible to perform the lookup within the binary protocol.

For the sake of example, let’s assume we have a service discovery component running at pulsar://broker.example.com:6650

Individual brokers will be running at pulsar://broker-1.example.com:6650, pulsar://broker-2.example.com:6650, …

A client can use a connection to the discovery service host to issue a LookupTopic command. The response can either be a broker hostname to connect to, or a broker hostname to which retry the lookup.

The LookupTopic command has to be used in a connection that has already gone through the Connect / Connected initial handshake.

搜索主题

  1. message CommandLookupTopic {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "request_id" : 1,
  4. "authoritative" : false
  5. }

Fields:

  • topic → Topic name to lookup
  • request_id → Id of the request that will be passed with its response
  • authoritative → Initial lookup request should use false. When following a redirect response, client should pass the same value contained in the response
查找主题返回

Example of response with successful lookup:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Connect",
  4. "brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
  6. "authoritative" : true
  7. }

Example of lookup response with redirection:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Redirect",
  4. "brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
  6. "authoritative" : true
  7. }

In this second case, we need to reissue the LookupTopic command request to broker-2.example.com and this broker will be able to give a definitive answer to the lookup request.

发现分区主题

Partitioned topics metadata discovery is used to find out if a topic is a “partitioned topic” and how many partitions were set up.

If the topic is marked as “partitioned”, the client is expected to create multiple producers or consumers, one for each partition, using the partition-X suffix.

This information only needs to be retrieved the first time a producer or consumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similar to the topic lookup. The client send a request to the service discovery address and the response will contain actual metadata.

获取分区主题元数据命令
  1. message CommandPartitionedTopicMetadata {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "request_id" : 1
  4. }

Fields:

  • topic → the topic for which to check the partitions metadata
  • request_id → Id of the request that will be passed with its response
获取分区元数据返回命令

Example of response with metadata:

  1. message CommandPartitionedTopicMetadataResponse {
  2. "request_id" : 1,
  3. "response" : "Success",
  4. "partitions" : 32
  5. }

Protobuf 接口

All Pulsar’s Protobuf definitions can be found here .