Message retention and expiry

Pulsar brokers 负责处理通过 Pulsar 的消息,包括消息的存储持久化。 默认情况下,对于每个主题,broker 会至少保留一个 backlog 消息。 backlog 是特定订阅的未确认的消息的集合。 每个主题可以有多个订阅者,所以每个主题可以有多个 backlog。

因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。

(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)

在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:

  • 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
  • 可以通过设置 time to live(TTL),设置消息在指定的时间内不被确认的话,自动确认。

你可以通过 Pulsar admin 接口 在命名空间(租户、集群或整个集群) 级别管理消息保留策略和TTL。

消息保留和 TTL 有以下两点不同

  • 消息保留: 保留消息(包括已确认的) 数据至少X小时
  • TTL:一段时间后丢弃数据(自动确认)

大多数应用程序最多只想使用其中的一个。

消息保留策略

By default, when a Pulsar message arrives at a broker, the message is stored until it has been acknowledged on all subscriptions, at which point it is marked for deletion. You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention is based on both a size limit and a time limit.

Retention policies are useful when you use the Reader interface. The Reader interface does not use acknowledgements, and messages do not exist within backlogs. It is required to configure retention for Reader-only use cases.

When you set a retention policy on topics in a namespace, you must set both a size limit and a time limit. You can refer to the following table to set retention policies in pulsar-admin and Java.

Time limitSize limitMessage retention
-1-1Infinite retention
-1>0Based on the size limit
>0-1Based on the time limit
00Disable message retention (by default)
0>0Invalid
>00Invalid
>0>0Acknowledged messages or messages with no active subscription will not be retained when either time or size reaches the limit.

The retention settings apply to all messages on topics that do not have any subscriptions, or to messages that have been acknowledged by all subscriptions. The retention policy settings do not affect unacknowledged messages on topics with subscriptions. The unacknowledged messages are controlled by the backlog quota.

When a retention limit on a topic is exceeded, the oldest message is marked for deletion until the set of retained messages falls within the specified limits again.

默认值

You can set message retention at instance level with the following two parameters: defaultRetentionTimeInMinutes and defaultRetentionSizeInMB. Both parameters are set to 0 by default.

For more information of the two parameters, refer to the broker.conf configuration file.

设置保留策略

You can set a retention policy for a namespace by specifying the namespace, a size limit and a time limit in pulsar-admin, REST API and Java.

pulsar-admin

REST API

Java

You can use the set-retention subcommand and specify a namespace, a size limit using the -s/--size flag, and a time limit using the -t/--time flag.

In the following example, the size limit is set to 10 GB and the time limit is set to 3 hours for each topic within the my-tenant/my-ns namespace.

  • When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained.
  • After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained.
  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 10G \ --time 3h

In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 1T \ --time -1

In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time 3h

To achieve infinite retention, set both values to -1.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time -1

To disable the retention policy, set both values to 0.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 0 \ --time 0

POST /admin/v2/namespaces/:tenant/:namespace/retention

Note
To disable the retention policy, you need to set both the size and time limit to 0. Set either size or time limit to 0 is invalid.

  1. int retentionTime = 10; // 10 minutesint retentionSize = 500; // 500 megabytesRetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);admin.namespaces().setRetention(namespace, policies);

获取保留策略

You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes and retentionSizeInMB.

pulsar-admin

使用 get-retention子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-retention my-tenant/my-ns
  2. {
  3. "retentionTimeInMinutes": 10,
  4. "retentionSizeInMB": 500
  5. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. admin.namespaces().getRetention(namespace);

Backlog quotas

Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.

You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:

待办:如何对每个主题或每个 backlog 进行扩展?

  • an allowable size threshold for each topic in the namespace
  • a retention policy that determines which action the broker takes if the threshold is exceeded.

可以使用以下保留策略:

策略触发的操作
producer_request_holdBroker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储
producer_exceptionBroker 会与客户端断开连接并抛出异常
consumer_backlog_evictionBroker 将开始丢弃backlog的消息

注意保留策略类型之间的区别

你可能已经注意到,Pulsar 中关于“保留策略”有两种定义,一种适用于已确认消息的持久存储,一种适用于 backlog。

Backlog quotas are handled at the namespace level. They can be managed via:

Set size/time thresholds and backlog retention policies

You can set a size and/or time threshold and backlog retention policy for all of the topics in a namespace by specifying the namespace, a size limit and/or a time limit in second, and a policy by name.

pulsar-admin

使用set-backlog-quota子命令,并使用 -l/--limit 参数指定命名空间大小限制 ,以及使用 <code>-p/--policy 参数指定保留策略。

示例
  1. $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
  2. --limit 2G \
  3. --limitTime 36000 \
  4. --policy producer_request_hold

REST API

POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. long sizeLimit = 2147483648L;
  2. BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
  3. BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
  4. admin.namespaces().setBacklogQuota(namespace, quota);

获取 backlog 大小阈值和 backlog 保留策略

可以查看已对命名空间应用的大小阈值和 backlog 保留策略。

pulsar-admin

Use the get-backlog-quotas subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
  2. {
  3. "destination_storage": {
  4. "limit" : 2147483648,
  5. "policy" : "producer_request_hold"
  6. }
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap

Java

  1. Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
  2. admin.namespaces().getBacklogQuotas(namespace);

移除 backlog quotas

pulsar-admin

Use the remove-backlog-quota subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. admin.namespaces().removeBacklogQuota(namespace);

清除 backlog

pulsar-admin

使用 clear-backlog 子命令。

示例
  1. $ pulsar-admin namespaces clear-backlog my-tenant/my-ns

By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f/--force flag.

生存时间 (TTL)

默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。

设置命名空间的TTL

pulsar-admin

使用 set-message-ttl 子命令并指定命名空间和TTL(以秒为单位,使用-ttl/--messageTTL参数指定)。

示例
  1. $ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
  2. --messageTTL 120 # TTL of 2 minutes

REST API

POST /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);

获取命名空间的 TTL 配置

pulsar-admin

使用 get-message-ttl 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
  2. 60

REST API

GET /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().getNamespaceMessageTTL(namespace)

Remove the TTL configuration for a namespace

pulsar-admin

Use the remove-message-ttl subcommand and specify a namespace.

示例
  1. $ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().removeNamespaceMessageTTL(namespace)

Delete messages from namespaces

If you do not have any retention period and that you never have much of a backlog, the upper limit for retaining messages, which are acknowledged, equals to the Pulsar segment rollover period + entry log rollover period + (garbage collection interval * garbage collection ratios).

  • Segment rollover period: basically, the segment rollover period is how often a new segment is created. Once a new segment is created, the old segment will be deleted. By default, this happens either when you have written 50,000 entries (messages) or have waited 240 minutes. You can tune this in your broker.

  • Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. In order for a ledger that has been deleted, the entry log must all be rolled over. The entry log rollover period is configurable, but is purely based on the entry log size. For details, see here. Once the entry log is rolled over, the entry log can be garbage collected.

  • Garbage collection interval: because entry logs have interleaved ledgers, to free up space, the entry logs need to be rewritten. The garbage collection interval is how often BookKeeper performs garbage collection. which is related to minor compaction and major compaction of entry logs. For details, see here.