Topic Compaction

消息数据高度可扩展的持久存储,是Pulsar构建的主要目标。 Pulsar的topic让你可以持久存储所有你所需要的未被确认消息,同时保留了消息的顺序。 By default, Pulsar stores all unacknowledged/unprocessed messages produced on a topic. 在很多Pulsar的使用案例中,在topic累积大量的未被确认的消息是有必要的。但对于Pulsar的consumer来说,在完整的消息log中回退,将变得非常耗时。

更多topic压缩实践的指南,请参考 Topic compaction cookbook

某些情况下,consumer并不需要完整的topic日志。 他们可能只需要几个值来构造一个更 “浅” 的日志图像, 也许仅仅只是最新的值。 For these kinds of use cases Pulsar offers topic compaction. When you run compaction on a topic, Pulsar goes through a topic’s backlog and removes messages that are obscured by later messages, i.e. it goes through the topic on a per-key basis and leaves only the most recent message associated with that key.

Pulsar的topic压缩特性:

  • 允许通过主题日志更快地 “后退”
  • 仅适用于 持久topic
  • Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the Topic compaction cookbook
  • 在概念上和操作上与 retention和expiry 是不同的。 Topic compaction does, however, respect retention. 如果retention已经从topic的backlog中移除了消息,那么此条消息在压缩的topic账簿上也是无法被读取的。

Topic压缩示例:股票报价机

Pulsar topic压缩的使用例子,可以看一下股票报价机topic。 在股票报价机topic中,每条消息都有带有时间戳的股票买入价格(包含代表股票符号的消息key,例如AAPL或者GOOG)。 可能你感兴趣的只是股票报价机中最新的股票价格,而对历史数据并不感兴趣(即你不需要构建topic每个key下消息序列的完整图像)。 压缩在这种场景下非常的方便,因为它使得用户不需回退到模糊的消息上。

Topic压缩的工作原理

通过命令行触发topic压缩,Pulsar将会从头到尾迭代整个topic。 对于它碰到的每个key,压缩程序将会只保留这个key最近的事件。

之后,broker将会创建一个新的BookKeeper ledger 然后开始对topic的每条消息进行第二次迭代。 对于每条消息,如果key匹配到它的最新事件,key的数据内容,消息ID,元数据将会被写入最新创建的ledger。 如果key并没有匹配到最新的消息,消息将被跳过。 如果给定的消息,负载是空的,它将被跳过并且视为删除(类似key-value数据库中的 tombstones)概念); At the end of this second iteration through the topic, the newly created BookKeeper ledger is closed and two things are written to the topic’s metadata: the ID of the BookKeeper ledger and the message ID of the last compacted message (this is known as the compaction horizon of the topic). 写入元数据后,压缩就完成了。

初始化压缩操作完成后,将来任何对压缩层位及压缩backlog的修改,都会通知给拥有该topic的Pulsar broker。 当下列更改发生时:

  • 启用读取压缩功能的客户端(consumer和reader),将会尝试从topic中读取消息,或者:
    • 像从正常的主题那样读取(如果消息的ID大于等于压缩层位),或
    • 从压缩层位的开始读取(如果消息ID小于压缩层位)