Pulsar 采用发布-订阅的设计模式,也称作 pub-sub。 该设计模式中,producer 发布消息到 topicConsumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。

一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。

Messages

消息是 Pulsar 的基础“单元”。 消息指 producer 发布到 topic的内容,也指 consumer 从 topic 中 consume 的内容(并在消息处理完成后发送确认)。 消息类似于邮政服务系统中的信件。

Component作用
Value / data payloadThe data carried by the message. All Pulsar messages carry raw bytes, although message data can also conform to data schemas.
Key为消息添加密钥(key)标签(可选),这在 topic 压缩时很有用。
属性用户自定义属性的键值对(可选)。
Producer 名称Producer 名称(producer 有默认名称,但也可以自定义其名称)。
序列 IDEach Pulsar message belongs to an ordered sequence on its topic. A message’s sequence ID is its ordering in that sequence.
发布时间消息发布的时间戳(由 producer 自动添加)。
事件时间可选的时间戳,应用程序可以将某些事情发生的时间添加到消息上,例如消息处理的时间。 The event time of a message is 0 if none is explicitly set.
TypedMessageBuilderTypedMessageBuilder 用于构造消息。 您可以使用 TypedMessageBuilder 设置消息的键值对属性。
在设置 TypedMessageBuilder 时,最佳实践是将 key 设置为字符串。 如果将 key 设置为其他类型(例如,AVRO 对象),则 key 会以字节形式发送,这时 consumer 就很难使用了。

更多关于 Pulsar 消息的信息,参阅二进制协议

Producers

生产者是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。

发送模式

Producer可以以同步(sync)或者异步(async)的方式发布消息到broker。

发送模式Description
同步发送The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn’t received then the producer will consider the send operation a failure.
异步发送Producer 将消息发送到一个阻塞队列(blocking queue)后就立刻返回。 然后,客户端将在后台将消息发送给broker。 如果队列满了(大小可配) ,生产者在调用 API 时就可能立即失败或被禁止调用(这取决于传递给生产者的参数)。

压缩

Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports the following types of compression:

批量处理

If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batch size is defined by the maximum number of messages and maximum publish latency.

Pulsar 跟踪和存储批处理的粒度也是批而不是单条消息。 由消费者来将批分解成单条的消息。 当消息以批的形式存储,backlog 的大小也将表示批的总数,而不是消息的总条数。

调度消息(使用 deliverAtdeliverAfter)即使启用了批处理,也总是作为单条消息发送,

注意:由于批是作为跟踪的单元,因此只有当批的所有消息都被消费者确认后,才会考虑确认该批次。 这意味着当发生不可预料的失败、负确认(negative acknowledgements)或确认超时,都可能导致批中的所有消息都被重新发送,即使其中一些消息已经被确认了。

Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 队列大小可以通过 receiverQueueSize 进行配置(默认:1000)。 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

接收模式

Messages can be received from brokers either synchronously (sync) or asynchronously (async).

发送模式Description
同步接收同步方式会在获取到消息前呈阻塞状态。
异步接收异步方式会立即返回 future 值(如java中的 CompletableFuture),一旦有新消息,会立刻完成。

监听

Client libraries provide listener implementation for consumers. For example, the Java client provides a MesssageListener interface. 在这个接口中,一旦接受到新的消息,received方法将被调用。

确认

当 consumer 成功消费一条消息后会向 broker 发送一个确认请求(acknowledgement request)。 仅当所有订阅都完成确认后,消息才会被删除,在这之前消息都是被永久保存 的。 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。

消息的确认可以一个接一个,也可以累积一起。 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次重发给那个消费者。

Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.

In the shared subscription mode, messages can be acknowledged individually.

取消确认

When a consumer does not consume a message successfully at a time, and wants to consume the message again, the consumer can send a negative acknowledgement to the broker, and then the broker will redeliver the message.

Messages can be negatively acknowledged one by one or cumulatively, which depends on the consumption subscription mode.

In the exclusive and failover subscription modes, consumers only negatively acknowledge the last message they have received.

In the shared and Key_Shared subscription modes, you can negatively acknowledge messages individually.

注意:如果开启批处理,在确认取消后,同一批中的其他消息都会被重新发送给 Consumer。

确认超时

When a message is not consumed successfully, and you want to trigger the broker to redeliver the message automatically, you can adopt the unacknowledged message automatic re-delivery mechanism. Client will track the unacknowledged messages within the entire acktimeout time range, and send a redeliver unacknowledged messages request to the broker automatically when the acknowledgement timeout is specified.

注意: 如果启用批处理,同一批次中的其他消息就跟未确认消息一样可能会重新发送给消费者。

Note
Prefer negative acknowledgements over acknowledgement timeout. 确认取消是以更高的精度在控制单条消息的重新传递。当消息处理时间超过确认超时时间时,要避免无效的消息重传。

死信主题

Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.

The following example shows how to enable dead letter topic in a Java client using the default dead letter topic:

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .build())
  8. .subscribe();

The default dead letter topic uses this format:

  1. <topicname>-<subscriptionname>-DLQ

If you want to specify the name of the dead letter topic, use this Java client example:

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .deadLetterTopic("your-topic-name")
  8. .build())
  9. .subscribe();

Dead letter topic depends on message re-delivery. Messages are redelivered either due to acknowledgement timeout or negative acknowledgement. If you are going to use negative acknowledgement on a message, make sure it is negatively acknowledged before the acknowledgement timeout.

Note
Currently, dead letter topic is enabled only in the shared subscription mode.

Topic

和其他的发布订阅系统一样,Pulsar 中的 topic 是被命名的通道,用做从producerconsumer传输消息。 Topic的名称为符合良好结构的URL:

  1. {persistent|non-persistent}://tenant/namespace/topic
Topic名称组成Description
persistent / non-persistent用来标识 topic 的类型。 Pulsar 支持两种不同 topic:持久化非持久 型(如果你没有明确指定,topic 将会是默认的持久化类型)。 持久化 topic 的所有消息都会存储到硬盘上(除非是单机模式的broker,否则都是会在多块磁盘上)。非持久 topic 的数据将不会存储到硬盘上。
租户The topic’s tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.
命名空间将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对命名空间的一项配置。 每个租户可以有多个命名空间。
topicThe final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.

不需要显式的创建topic

你并不需要显式的创建topic。 如果客户端尝试从一个还不存在的topic写或者接受消息,pulsar将会按在topic名称提供的namespace下自动创建topic。 If no tenant or namespace is specified when a client creates a topic, the topic is created in the default tenant and namespace. You can also create a topic in a specified tenant and namespace, such as persistent://my-tenant/my-namespace/my-topic. persistent://my-tenant/my-namespace/my-topic means the my-topic topic is created in the my-namespace namespace of the my-tenant tenant.

命名空间

命名空间是租户内部逻辑上的命名术语。 一个租户可以通过admin API创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic

Subscriptions

订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar 中有 4 种可用的订阅模式: 独占(exclusive)共享(shared)灾备(failover)键共享(key_shared)。 下图展示了这三种模式:

订阅模型

订阅-发布,队列,亦或两者都是

订阅可以灵活组合出很多可能性:
- 如果你想实现传统的 “发布-订阅消息”形式 ,你可以让每个消费者都有一个唯一的订阅名称(独占)
- 如果你想实现传统的“消息队列” 形式,可以使多个消费者使用同一个的订阅名称(共享、灾备、键共享)
- 如果你想同时做到这两点,你可以让一些消费者使用独占方式,剩余消费者使用其他方式。

Exclusive

In exclusive mode, only a single consumer is allowed to attach to the subscription. 如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。

In the diagram below, only Consumer A-0 is allowed to consume messages.

Exclusive模式为默认订阅模式。

独占订阅

Failover(灾备)

In failover mode, multiple consumers can attach to the same subscription. A master consumer is picked for non-partitioned topic or each partition of partitioned topic and receives messages. When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line.

For partitioned topics, broker will sort consumers by priority level and lexicographical order of consumer name. Then broker will try to evenly assigns topics to consumers with the highest priority level.

For non-partitioned topic, broker will pick consumer in the order they subscribe to the non partitioned topic.

In the diagram below, Consumer-B-0 is the master consumer while Consumer-B-1 would be the next consumer in line to receive messages if Consumer-B-0 is disconnected.

灾备订阅

Shared(共享)

In shared or round robin mode, multiple consumers can attach to the same subscription. 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

In the diagram below, Consumer-C-1 and Consumer-C-2 are able to subscribe to the topic, but Consumer-C-3 and others could as well.

Shared模式的限制

When using shared mode, be aware that: * Message ordering is not guaranteed. * You cannot use cumulative acknowledgment with shared mode.

共享订阅

Key_Shared

In Key_Shared mode, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.

Limitations of Key_Shared mode

使用 Key_Shared 模式时需要注意两点:
* 需要为消息指定一个 key 或 orderingKey
* Key_Shared 模式不能使用累积确认

Key_Shared subscriptions

可以在 broker.config 中禁用 Key_Shared 模式。

多主题订阅

当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:

  • On the basis of a regular expression (regex), for example persistent://public/default/finance-.*
  • 通过明确指定的topic列表

通过正则订阅多主题时,所有的主题必须在同一个namespace

当订阅多主题时,Pulsar客户端会自动调用Pulsar的API来发现匹配表达式或者列表的所有topic,然后全部订阅。 如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。

No ordering guarantees across multiple topics

When a producer sends messages to a single topic, all messages are guaranteed to be read from that topic in the same order. However, these guarantees do not hold across multiple topics. So when a producer sends message to multiple topics, the order in which messages are read from those topics is not guaranteed to be the same.

下面是多主题订阅在java中的例子:

  1. import java.util.regex.Pattern;
  2. import org.apache.pulsar.client.api.Consumer;
  3. import org.apache.pulsar.client.api.PulsarClient;
  4. PulsarClient pulsarClient = // Instantiate Pulsar client object
  5. // Subscribe to all topics in a namespace
  6. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
  7. Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
  8. .topicsPattern(allTopicsInNamespace)
  9. .subscriptionName("subscription-1")
  10. .subscribe();
  11. // Subscribe to a subsets of topics in a namespace, based on regex
  12. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
  13. Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
  14. .topicsPattern(someTopicsInNamespace)
  15. .subscriptionName("subscription-1")
  16. .subscribe();

代码例子,请见:

分区 topic

通常一个topic仅被一个broker服务,这限制了topic的最大吞吐量。 Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.

其实在背后,分区的topic通过N个内部topic实现,N是分区的数量。 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。

下图对此做了阐明:

Messaging - 图6

Here, the topic Topic1 has five partitions (P0 through P4) split across three brokers. 因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。

这个topic的消息被广播给两个consumer。 路由模式确定每条消息该发往哪个分区,而订阅模式确定消息传递给哪个消费者。

大多数境况下,路由和订阅模式可以分开制定。 通常来讲,吞吐能力的要求,决定了 分区/路由 的方式。订阅模式则应该由应用的语义来做决定。

分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。

Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.

路由模式

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—-that is, which internal topic—-each message should be published to.

有三种 MessageRoutingMode 可用:

发送模式Description
RoundRobinPartition如果消息没有指定 key,为了达到最大吞吐量,消息会以 round-robin 方式被路由所有分区。 请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。 这是默认的模式。
SinglePartition如果消息没有指定 key,生产者将会随机选择一个分区,并发送所有消息。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。
CustomPartition使用自定义消息路由,可以定制消息如何进入特定的分区。 可以使用 Java client 或实现MessageRouter 接口来实现自定义的路由模式。

顺序保证

The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.

当使用 SinglePartition或者RoundRobinPartition模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilderHashingScheme 指定的散列shema。

顺序保证Description路由策略与消息Key
每个 key 分区所有具有相同 key 的消息将按顺序排列并放置在相同的分区(Partition)中。使用 SinglePartitionRoundRobinPartition 模式,每条消息都需要有key。
同一个生产者来自同一生产者的所有消息都是有序的路由策略为SinglePartition, 且每条消息都没有key。

散列scheme

HashingScheme 是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。

有两种可用的散列函数: JavaStringHashMurmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash

非持久topic

By default, Pulsar persistently stores all unacknowledged messages on multiple BookKeeper bookies (storage nodes). 因此,持久性主题上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。

Pulsar also, however, supports non-persistent topics, which are topics on which messages are never persisted to disk and live only in memory. Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,杀掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。

非持久性主题具有这种形式的名称(注意名称中的 non-persistent):

  1. non-persistent://tenant/namespace/topic

如何使用非持久topic的更多信息,请参考 Non-persistent messaging cookbook

In non-persistent topics, brokers immediately deliver messages to all connected subscribers without persisting them in BookKeeper. 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。

非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 Use non-persistent topics only if you’re certain that your use case requires it and can sustain it.

默认非持久topic在broker上是开启的。 你可以通过broker的配置关闭。 你可以通过使用pulsar-admin-topics接口管理非持久topic。

性能

Non-persistent messaging is usually faster than persistent messaging because brokers don’t persist messages and immediately send acks back to the producer as soon as that message is delivered to connected brokers. 非持久topic让producer有更低的发布延迟。

客户端API

Producer和consumer以连接持久topic同样的方式连接到非持久topic。重要的区别是,topic的名称必须以non-persistent开头。 三种订阅模式—exclusivesharedfailover对于非持久topic都是支持的。

下面是一个非持久topic的java consumer例子:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. String npTopic = "non-persistent://public/default/my-topic";
  5. String subscriptionName = "my-subscription-name";
  6. Consumer<byte[]> consumer = client.newConsumer()
  7. .topic(npTopic)
  8. .subscriptionName(subscriptionName)
  9. .subscribe();

这里还有一个非持久topic的java producer例子:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(npTopic)
  3. .create();

消息保留和到期

Pulsar broker默认如下:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • 以消息backlog的形式,持久保存所有的未被确认消息

Pulsar有两个特性,让你可以覆盖上面的默认行为。

  • Message retention enables you to store messages that have been acknowledged by a consumer
  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry cookbook.

下图说明了这两种概念:

消息保留和到期

图中上面的是消息存留,存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 Without a retention policy, all of the acknowledged messages would be deleted.

图中下面的是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)

Message deduplication

Message duplication occurs when a message is persisted by Pulsar more than once. Message deduplication is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, even if the message is received more than once.

下图展示了开启和关闭消息去重的场景:

Pulsar消息去重

最上面的场景中,消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被持久化到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。

在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1.

Message deduplication is handled at the namespace level. For more instructions, see the message deduplication cookbook.

生产者幂等

The other available approach to message deduplication is to ensure that each message is only produced once. This approach is typically called producer idempotency. 这种方式的缺点是,把消息去重的工作推给了应用去做。 在Pulsar中,去重被broker处理的,这意味着你不需要修改你的客户端代码。 你只需要做一些管理上的变化(参考Managing message deduplication

去重和实际一次语义

消息去重,使Pulsar成为与流处理引擎(SPE)或者其他寻求实际一次处理语义的系统连接的完美消息系统。 消息系统若不提供自动消息去重,则需要SPE或者其他系统保证去重。这意味着严格的消息顺序来自于让程序承担额外的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。

更深入的信息可以参考 Streamlio blog上的此篇博文

消息延迟传递

Delayed message delivery enables you to consume a message later rather than immediately. In this mechanism, a message is stored in BookKeeper, DelayedDeliveryTracker maintains the time index(time -> messageId) in memory after published to a broker, and it is delivered to a consumer once the specific delayed time is passed.

Delayed message delivery only works well in Shared subscription mode. In Exclusive and Failover subscription mode, the delayed message is dispatched immediately.

The diagram below illustrates the concept of delayed message delivery:

Delayed Message Delivery

A broker saves a message without any check. When a consumer consumes a message, if the message is set to delay, then the message is added to DelayedDeliveryTracker. A subscription checks and gets timeout messages from DelayedDeliveryTracker.

Broker

Delayed message delivery is enabled by default. You can change it in the broker configuration file as below:

  1. # Whether to enable the delayed delivery for messages.
  2. # If disabled, messages are immediately delivered and there is no tracking overhead.
  3. delayedDeliveryEnabled=true
  4. # Control the ticking time for the retry of delayed message delivery,
  5. # affecting the accuracy of the delivery time compared to the scheduled time.
  6. # Default is 1 second.
  7. delayedDeliveryTickTimeMillis=1000

Producer

The following is an example of delayed message delivery for a producer in Java:

  1. // message to be delivered at the configured delay interval
  2. producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();