Topic compaction

Pulsar’s topic compaction feature enables you to create compacted topics in which older, “obscured” entries are pruned from the topic, allowing for faster reads through the topic’s history (which messages are deemed obscured/outdated/irrelevant will depend on your use case).

如要使用压缩:

  • You need to give messages keys, as topic compaction in Pulsar takes place on a per-key basis (i.e. messages are compacted based on their key). 对于股票报价器这个使用场景,股票代码 AAPLGOOG-—都可以用作这里需要的 key (详情请看 下面)。 没有 key 的消息会被压缩过程单独留出。
  • 压缩可以配置为自动运行,也可以通过 Pulsar Admin API 去触发压缩。
  • 你的消费者必须 被配置为 从压缩主题中进行读取 (比如,对于 Java 消费者 来说, readCompacted 设置项必须设置为 true )。 如果没这么配置,消费者将仍能从非压缩主题中进行读取。

压缩只能在带 key 的消息上生效(如股票报价器示例中的股票代码就是每条消息的 key )。 因此,有无 key 可以看作能否采用压缩的标志。 那些没有 key 的消息会直接被压缩过程无视掉。

When should I use compacted topics?

如果要说主题能从压缩中得到什么好处,一个很好的示例就是股票报价器主题,因为消费者们能从中获取到指定股票的最新消息。 想象这样一个场景:包含股票数据的消息使用股票代码作为 key (如 GOOG, AAPL, TWTR)。 压缩这个主题将为消费者在这个主题上提供两个选项:

  • 如果他们需要访问“历史”值(主题的所有信息),他们可以从“原始”非压缩主题读取。
  • 如果他们只想看到最新消息,他们就可以从压缩主题中读取。

因此,如果你在使用一个叫做 stock-values(股票值) 的 Pulsar 主题,即便一些消费者能访问主题中的所有消息(也许因为他们需要对过去几个小时的变化做计算),他们也会习惯性地使用实时股票报价器来仅仅查看压缩主题(从而不会强制处理那些过期的消息)。 要从哪种主题中拉取消息是由消费者的 配置 决定的。

在 Pulsar 中,压缩的好处之一是你无需强制在压缩与非压缩主题之间作出选择, 因为压缩过程会保留原来的主题,只是在此基础上增加了另一个选项。 换句话说,你可以在主题上使用压缩,而那些需要访问非压缩主题的消费者并不会受到负面影响。

Configuring compaction to run automatically

Tenant administrators can configure a policy for compaction at the namespace level. The policy specifies how large the topic backlog can grow before compaction is triggered.

比如,当积压达到 100MB 时触发压缩:

  1. $ bin/pulsar-admin namespaces set-compaction-threshold \
  2. --threshold 100M my-tenant/my-namespace

为命名空间配置的压缩阈值将适用于命名空间中的所有主题。

Triggering compaction manually

为了在主题上执行压缩,你需要在 pulsar-admin 命令行工具使用 topics compact 命令。 下面是一个示例:

  1. $ bin/pulsar-admin topics compact \
  2. persistent://my-tenant/my-namespace/my-topic

pulsar-admin 工具通过 Pulsar REST API 来执行压缩。 To run compaction in its own dedicated process, i.e. not through the REST API, you can use the pulsar compact-topic command. 下面是一个示例:

  1. $ bin/pulsar compact-topic \
  2. --topic persistent://my-tenant-namespace/my-topic

当您想要避免干扰 broker 的性能时,建议在单独的过程中运行压缩。 Broker 的性能应该只会在一个场景下被影响:在拥有庞大 keyspace 的主题(比如主题上有许多的 key)上执行压缩。 压缩过程的第一阶段会为主题中的每个 key 保留一份拷贝,所以当 key 增加时会加大内存压力。 使用 pulsar-admin topics compact 命令来通过 REST API 执行压缩在绝大多数情况下都不会出现问题,使用 pulsar compact-topic 应该相应地被视为一种边缘情况。

pulsar compact-topic 命令会直接与 ZooKeeper 进行通信。 不过,为了与 ZooKeeper 建立通信, pulsar 命令行工具需要有一个可用的 broker 配置。 你可以在 conf/broker.conf 提供一个正确的配置,或在配置中指定一个非默认路径:

  1. $ bin/pulsar compact-topic \
  2. --broker-conf /path/to/broker.conf \
  3. --topic persistent://my-tenant/my-namespace/my-topic
  4. # If the configuration is in conf/broker.conf
  5. $ bin/pulsar compact-topic \
  6. --topic persistent://my-tenant/my-namespace/my-topic

我该在什么时候触发压缩?

触发压缩 的频率会因使用场景而产生很大差异。 如果你想要一个压缩主题有极快的读取速度,那么你应该相当频繁地执行压缩。

Consumer configuration

Pulsar consumers and readers need to be configured to read from compacted topics. The sections below show you how to enable compacted topic reads for Pulsar’s language clients. If the

Java

In order to read from a compacted topic using a Java consumer, the readCompacted parameter must be set to true. Here’s an example consumer for a compacted topic:

  1. Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
  2. .topic("some-compacted-topic")
  3. .readCompacted(true)
  4. .subscribe();

As mentioned above, topic compaction in Pulsar works on a per-key basis. 这意味着您在压缩主题上生成的消息需要有 key(key 的内容将取决于你的用例)。 没有 key 的消息会被压缩过程单独留出。 下面是一个带 key 的 Pulsar 消息示例:

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageBuilder;
  3. Message<byte[]> msg = MessageBuilder.create()
  4. .setContent(someByteArray)
  5. .setKey("some-key")
  6. .build();

下面的示例展示了一个带 key 的消息在 Pulsar 压缩主题上被生产的过程。

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageBuilder;
  3. import org.apache.pulsar.client.api.Producer;
  4. import org.apache.pulsar.client.api.PulsarClient;
  5. PulsarClient client = PulsarClient.builder()
  6. .serviceUrl("pulsar://localhost:6650")
  7. .build();
  8. Producer<byte[]> compactedTopicProducer = client.newProducer()
  9. .topic("some-compacted-topic")
  10. .create();
  11. Message<byte[]> msg = MessageBuilder.create()
  12. .setContent(someByteArray)
  13. .setKey("some-key")
  14. .build();
  15. compactedTopicProducer.send(msg);