Message retention and expiry

Pulsar brokers 负责处理通过 Pulsar 的消息,包括消息的存储持久化。 默认情况下,对于每个主题,broker 会至少保留一个 backlog 消息。 backlog 是特定订阅的未确认的消息的集合。 每个主题可以有多个订阅者,所以每个主题可以有多个 backlog。

因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。

(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)

在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:

  • 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
  • 可以通过设置time to live(TTL),设置消息在指定的时间内不被确认的话,自动确认。

你可以通过 Pulsar admin 接口 在命名空间(租户、集群或整个集群) 级别管理消息保留策略和TTL。

消息保留和 TTL 有以下两点不同

  • 消息保留: 保留消息(包括已确认的) 数据至少X小时
  • TTL:一段时间后丢弃数据(自动确认)

大多数应用程序最多只想使用其中的一个。

消息保留策略

默认情况下,当消息到达 broker 时,会一直保留这条消息直到消费者确认已消费到这条消息,此时它才被删除。 You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention is based on both a size limit and a time limit.

当使用 Reader 接口时,保留策略是有用的。 因为 Reader 接口不使用消息确认机制,消息将永远不会存在 backlog 中。 在只读的使用场景中,保留时间是必须配置的。

When you set a retention policy on topics in a namespace, you must set both a size limit and a time limit. 您可以参考以下表格来设置 pulsar-admin 和 Java 中的保留策略。

时间限制大小限制:消息保留策略
-1-1无限保留
-1>0基于存储大小限制
>0-1基于存储时间限制
00不开启消息保留机制(默认值)
0>0无效
>00无效
>0>0当消息保留时间或大小达到限制时,不会保留已确认的消息或没有活跃订阅者的消息。

消息保留策略配置会对没有订阅的主题的所有消息生效,或者对有订阅的主题并已被所有消费者确认的消息生效。 保留策略设置不影响订阅主题的未确认消息。 未确认的信息受积压配额控制。

当超过保留限额时,最老的消息会被标记删除,直到保留的消息在指定的限制范围之内。

默认值

你可以通过以下两个参数来配置实例级别的消息保留策略:: defaultRetentionTimeInminutes and defaultReintentionSizeInMB 默认情况下,这两个参数都设置为 0

如果需要了解两个参数的更多信息,请参阅 broker.conf 配置文件。

设置保留策略

你可以通过在 pulsar-admin、REST API 和 Java 中自定命名空间、大小限制和时间限制来设置命名空间的保留策略。

pulsar-admin

REST API

Java

使用set-retention子命令并指定命名空间,使用-s/--size参数指定大小限制,使用-t/--time参数指定时间限制。

在下面的例子中, my-tenant/my-ns 命名空间中的每一个主题大小限制设置为 10 GB,时间限制设置为 3 小时。

  • When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained.
  • After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained.
  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 10G \ --time 3h

In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 1T \ --time -1

In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time 3h

要实现无限留存,将两个值都设置为 -1

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time -1

要禁用消息保留恶略配置,将值设置为 0

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 0 \ --time 0

POST /admin/v2/namespaces/:tenant/:namespace/retention

Note
To disable the retention policy, you need to set both the size and time limit to 0. Set either size or time limit to 0 is invalid.

  1. int retentionTime = 10; // 10 minutesint retentionSize = 500; // 500 megabytesRetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);admin.namespaces().setRetention(namespace, policies);

获取保留策略

You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes and retentionSizeInMB.

pulsar-admin

使用 get-retention子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-retention my-tenant/my-ns
  2. {
  3. "retentionTimeInMinutes": 10,
  4. "retentionSizeInMB": 500
  5. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. admin.namespaces().getRetention(namespace);

积压配额

Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.

You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:

待办:如何对每个主题或每个 backlog 进行扩展?

  • an allowable size threshold for each topic in the namespace
  • a retention policy that determines which action the broker takes if the threshold is exceeded.

可以使用以下保留策略:

策略触发的操作
producer_request_holdBroker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储
producer_exceptionBroker 会与客户端断开连接并抛出异常
consumer_backlog_evictionBroker 将开始丢弃backlog的消息

注意保留策略类型之间的区别

你可能已经注意到,Pulsar 中关于“保留策略”有两种定义,一种适用于已确认消息的持久存储,一种适用于 backlog。

Backlog quotas are handled at the namespace level. They can be managed via:

设定大小/时间阈值和积压保留政策

您可以通过指定命名空间、大小限制和/或时间限制(以秒为单位)以及按名称指定的策略,为命名空间中的所有主题设置大小或时间阈值(可以同时设置大小和时间阈值或者只设置其中一个阈值)以及积压保留策略。

pulsar-admin

使用set-backlog-quota子命令,并使用 -l/--limit 参数指定命名空间大小限制 ,以及使用 <code>-p/--policy 参数指定保留策略。

示例
  1. $ pulsar-admin namespaces set-backlog-quot-tenant/my-ns \
  2. --limit 2G
  3. --limitTime 36000 \
  4. --policy producer_request_hold

REST API

POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. long sizeLimit = 2147483648L;
  2. BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
  3. BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
  4. admin.namespaces().setBacklogQuota(namespace, quota);

获取积压阈值和积压保留政策

可以查看已对命名空间应用的大小阈值和 backlog 保留策略。

pulsar-admin

Use the get-backlog-quotas subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
  2. {
  3. "destination_storage": {
  4. "limit" : 2147483648,
  5. "policy" : "producer_request_hold"
  6. }
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap

Java

  1. Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
  2. admin.namespaces().getBacklogQuotas(namespace);

移除 backlog quota 策略

pulsar-admin

Use the remove-backlog-quota subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. admin.namespaces().removeBacklogQuota(namespace);

清除积压消息

pulsar-admin

使用 clear-backlog 子命令。

示例
  1. $ pulsar-admin namespaces clear-backlog my-tenant/my-ns

By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f/--force flag.

生存时间 (TTL)

默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。

设置命名空间的TTL

pulsar-admin

使用set-message-ttl子命令并指定命名空间和TTL(以秒为单位,使用-ttl/--messageTTL参数指定)。

示例
  1. $ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
  2. --messageTTL 120 # TTL of 2 minutes

REST API

POST /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);

获取命名空间的 TTL 配置

pulsar-admin

使用 get-message-ttl 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
  2. 60

REST API

GET /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().getNamespaceMessageTTL(namespace)

删除命名空间的TTL配置

pulsar-admin

使用 get-message-ttl 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().removeNamespaceMessageTTL(namespace)

从命名空间中删除消息

如果没有任何保留期,且从未有过很多积压, 已被确认消息的保留时间等于 Pulsar segment 滚动周期 + entry 日志滚动期 + (garbage collection interval * 垃圾收集比率)。

  • Segment rollover period: basically, the segment rollover period is how often a new segment is created. 一旦创建新的片段,旧的片段会被删除。 默认情况下,这种情况会在你已写入50,000条条目 (消息) 时或者等待240分钟之后发生。 你可以在broker服务端调整这个策略。

  • Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. 对于已删除的 ledger ,必须将条目日志 entry log 全部滚动。 Entry log 滚动周期是可配置的,但完全基于条目日志的大小。 详情参阅 这里。 Entry log 一经滚动,就可以进行垃圾回收 entry log。

  • Garbage collection interval: because entry logs have interleaved ledgers, to free up space, the entry logs need to be rewritten. 垃圾收集间隔是 BookKeeper 执行垃圾收集的频率 这与 entry log 的小压缩和大压缩有关。 详情参阅这里