Pulsar brokers 负责处理通过Pulsar的消息,包括消息的持久存储。 默认情况下,broker 的策略如下:

  • 立即删除每个订阅上已确认的所有消息。
  • 并且持久性地将所有未确认的消息存储在 backlog 中。

在Pulsar中,你可以在命名空间级别以两种方式覆盖这两种默认行为:

  • 通过设置 保留策略,可以持久地存储已经被使用和确认的消息,并将其保留最少时间。
  • 通过指定[生存时间](#time-to-live- TTL) (TTL),可以自动将未在指定时间范围内得到确认的消息标记为已消费。

Pulsar的 admin interface 使您能够在命名空间级别管理保留策略和TTL(在指定集群或全局集群的租户内)。

保留和TTL解决的是两个不同的问题

  • 消息保留: 保存数据至少X小时(即使消息已被确认消费)
  • 生存时间(TTL):一段时间后丢弃数据(通过自动确认)

在大多数情况下,应用程序会使用其中一个策略(或不使用)。

保留策略

默认情况下,当消息到达broker时,它会被存储起来,直到消费者确认已消费它,此时它才被删除。 可以修改此行为,通过对给定命名空间中的所有主题设置 保留策略 保留已经确认的消息。 设置保留策略时,可以设置大小限制时间限制

当设置一个大小限制,比如10GB,那么命名空间中的所有主题中的消息,甚至已确认的消息 都将被保留,直到达到主题的大小限制为止;如果将时间限制设置为1天,那么命名空间中所有主题的消息将保留24小时。

还可以设置无限的保留时长限制或大小限制,方法是把保留时间或大小值设置为-1

默认情况

下面的两个配置参数可用于设置实例范围的消息保留默认值:defaultRetentionTimeInMinutes=0defaultRetentionSizeInMB=0

这两个参数都在 broker.conf 配置文件中。

设置保留策略

通过指定命名空间上的大小限制时间限制来为命名空间设置保留策略。

pulsar-admin

使用set-retention子命令并指定命名空间,使用-s/--size参数指定大小限制,使用-t/--time参数指定时间限制。

示例

my-tenant/my-ns 命名空间设置10的大小限制和3小时的时间限制:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size 10G \
  3. --time 3h

设置无限的时间限制和1T的大小限制:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size 1T \
  3. --time -1

同样地,设置无限的时间限制和无限的大小限制:

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \
  2. --size -1 \
  3. --time -1

REST API

POST /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. int retentionTime = 10; // 10 minutes
  2. int retentionSize = 500; // 500 megabytes
  3. RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);
  4. admin.namespaces().setRetention(namespace, policies);

获取保留策略

You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes and retentionSizeInMB.

pulsar-admin

使用 get-retention子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-retention my-tenant/my-ns
  2. {
  3. "retentionTimeInMinutes": 10,
  4. "retentionSizeInMB": 0
  5. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/retention

Java

  1. admin.namespaces().getRetention(namespace);

Backlog quotas

Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.

You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:

  • 命名空间中每个主题允许的大小阈值
  • 保留策略,决定了如果超过阈值,broker 将采取什么操作。

可以使用以下保留策略:

策略触发的操作
producer_request_holdBroker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储
producer_exceptionBroker 会与客户端断开连接并抛出异常
consumer_backlog_evictionBroker 将开始丢弃backlog的消息

注意保留策略类型之间的区别

您可能已经注意到,Pulsar 中关于“保留策略”的两种定义,一个适用于已确认的消息的持久存储,另一个适用于 backlogs。

Backlog quotas are handled at the namespace level. They can be managed via:

设置大小阈值和 backlog 保留策略

通过指定命名空间的大小限制和策略,为命名空间中的所有主题设置大小阈值和 backlog 保留策略。

pulsar-admin

使用set-backlog-quota子命令,并使用 -l/--limit 参数指定命名空间大小限制 ,以及使用 <code>-p/--policy 参数指定保留策略。

示例
  1. $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
  2. --limit 2G \
  3. --policy producer_request_hold

REST API

POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. long sizeLimit = 2147483648L;
  2. BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
  3. BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
  4. admin.namespaces().setBacklogQuota(namespace, quota);

获取 backlog 大小阈值和 backlog 保留策略

可以查看已对命名空间应用的大小阈值和 backlog 保留策略。

pulsar-admin

Use the get-backlog-quotas subcommand and specify a namespace. Here’s an example:

  1. $ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
  2. {
  3. "destination_storage": {
  4. "limit" : 2147483648,
  5. "policy" : "producer_request_hold"
  6. }
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap

Java

  1. Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
  2. admin.namespaces().getBacklogQuotas(namespace);

移除 backlog quotas

pulsar-admin

Use the remove-backlog-quota subcommand and specify a namespace. Here’s an example:

  1. $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota

Java

  1. admin.namespaces().removeBacklogQuota(namespace);

清除 backlog

pulsar-admin

使用 clear-backlog 子命令。

示例
  1. $ pulsar-admin namespaces clear-backlog my-tenant/my-ns

By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f/--force flag.

生存时间 (TTL)

默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。

为名称空间设置生存时间(TTL)

pulsar-admin

使用 set-message-ttl 子命令并指定命名空间和TTL(以秒为单位,使用-ttl/--messageTTL参数指定)。

示例
  1. $ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
  2. --messageTTL 120 # TTL of 2 minutes

REST API

POST /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);

获取命名空间的生存时间(TTL) 配置

pulsar-admin

使用 get-message-ttl 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
  2. 60

REST API

GET /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().getNamespaceMessageTTL(namespace)