Pulsar brokers 负责处理通过Pulsar的消息,包括消息的持久存储。 By default, for each topic, brokers only retain messages that are in at least one backlog. A backlog is the set of unacknowledged messages for a particular subscription. As a topic can have multiple subscriptions, a topic can have multiple backlogs.

As a consequence, no messages are retained (by default) on a topic that has not had any subscriptions created for it.

(Note that messages that are no longer being stored are not necessarily immediately deleted, and may in fact still be accessible until the next ledger rollover. Because clients cannot predict when rollovers may happen, it is not wise to rely on a rollover not happening at an inconvenient point in time.)

In Pulsar, you can modify this behavior, with namespace granularity, in two ways:

  • You can persistently store messages that are not within a backlog (because they’ve been acknowledged by on every existing subscription, or because there are no subscriptions) by setting retention policies.
  • Messages that are not acknowledged within a specified timeframe can be automatically acknowledged, by specifying the time to live (TTL).

Pulsar’s admin interface enables you to manage both retention policies and TTL with namespace granularity (and thus within a specific tenant and either on a specific cluster or in the global cluster).

Retention and TTL solve two different problems

  • 消息保留: 保存数据至少X小时(即使消息已被确认消费)
  • 生存时间(TTL):一段时间后丢弃数据(通过自动确认)

Most applications will want to use at most one of these.

保留策略

By default, when a Pulsar message arrives at a broker it will be stored until it has been acknowledged on all subscriptions, at which point it will be marked for deletion. You can override this behavior and retain even messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention policies are either a size limit or a time limit.

Retention policies are particularly useful if you intend to exclusively use the Reader interface. Because the Reader interface does not use acknowledgements, messages will never exist within backlogs. Most realistic Reader-only use cases require that retention be configured.

When you set a size limit of, say, 10 gigabytes, then acknowledged messages in all topics in the namespace will be retained until the size limit for the topic is reached; if you set a time limit of, say, 1 day, then acknowledged messages for all topics in the namespace will be retained for 24 hours. The retention settings apply to all messages on topics that do not have any subscriptions, or if there are subscriptions, to messages that have been acked by all subscriptions. The retention policy settings do not affect unacknowledged messages on topics with subscriptions — these are instead controlled by the backlog quota (see below).

When a retention limit is exceeded, the oldest message is marked for deletion until the set of retained messages falls within the specified limits again.

It is also possible to set unlimited retention time or size by setting -1 for either time or size retention.

默认情况

下面的两个配置参数可用于设置实例范围的消息保留默认值:defaultRetentionTimeInMinutes=0defaultRetentionSizeInMB=0

这两个参数都在 broker.conf 配置文件中。

设置保留策略

You can set a retention policy for a namespace by specifying the namespace as well as both a size limit and a time limit.

pulsar-admin

使用set-retention子命令并指定命名空间,使用-s/--size参数指定大小限制,使用-t/--time参数指定时间限制。

示例

my-tenant/my-ns 命名空间设置10的大小限制和3小时的时间限制:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size 10G \
  3. --time 3h

To set retention with a size limit but without a time limit:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size 1T \
  3. --time -1

Retention can be configured to be unlimited both in size and time:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size -1 \
  3. --time -1

REST API

POST /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. int retentionTime = 10; // 10 minutes
  2. int retentionSize = 500; // 500 megabytes
  3. RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);
  4. admin.namespaces().setRetention(namespace, policies);

获取保留策略

You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes and retentionSizeInMB.

pulsar-admin

使用 get-retention子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-retention my-tenant/my-ns
  2. {
  3. "retentionTimeInMinutes": 10,
  4. "retentionSizeInMB": 0
  5. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. admin.namespaces().getRetention(namespace);

Backlog quotas

Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.

You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:

TODO: Expand on is this per backlog or per topic?

  • an allowable size threshold for each topic in the namespace
  • a retention policy that determines which action the broker takes if the threshold is exceeded.

可以使用以下保留策略:

策略触发的操作
producer_request_holdBroker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储
producer_exceptionBroker 会与客户端断开连接并抛出异常
consumer_backlog_evictionBroker 将开始丢弃backlog的消息

注意保留策略类型之间的区别

As you may have noticed, there are two definitions of the term “retention policy” in Pulsar, one that applies to persistent storage of messages not in backlogs, and one that applies to messages within backlogs.

Backlog quotas are handled at the namespace level. They can be managed via:

设置大小阈值和 backlog 保留策略

通过指定命名空间的大小限制和策略,为命名空间中的所有主题设置大小阈值和 backlog 保留策略。

pulsar-admin

使用set-backlog-quota子命令,并使用 -l/--limit 参数指定命名空间大小限制 ,以及使用 <code>-p/--policy 参数指定保留策略。

示例
  1. $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
  2. --limit 2G \
  3. --policy producer_request_hold

REST API

POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. long sizeLimit = 2147483648L;
  2. BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
  3. BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
  4. admin.namespaces().setBacklogQuota(namespace, quota);

获取 backlog 大小阈值和 backlog 保留策略

可以查看已对命名空间应用的大小阈值和 backlog 保留策略。

pulsar-admin

Use the get-backlog-quotas subcommand and specify a namespace. Here’s an example:

  1. $ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
  2. {
  3. "destination_storage": {
  4. "limit" : 2147483648,
  5. "policy" : "producer_request_hold"
  6. }
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap

Java

  1. Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
  2. admin.namespaces().getBacklogQuotas(namespace);

移除 backlog quotas

pulsar-admin

Use the remove-backlog-quota subcommand and specify a namespace. Here’s an example:

  1. $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. admin.namespaces().removeBacklogQuota(namespace);

清除 backlog

pulsar-admin

使用 clear-backlog 子命令。

示例
  1. $ pulsar-admin namespaces clear-backlog my-tenant/my-ns

By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f/--force flag.

生存时间 (TTL)

默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。

为名称空间设置生存时间(TTL)

pulsar-admin

使用 set-message-ttl 子命令并指定命名空间和TTL(以秒为单位,使用-ttl/--messageTTL参数指定)。

示例
  1. $ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
  2. --messageTTL 120 # TTL of 2 minutes

REST API

POST /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);

获取命名空间的生存时间(TTL) 配置

pulsar-admin

使用 get-message-ttl 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
  2. 60

REST API

GET /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().getNamespaceMessageTTL(namespace)