在最高级别,单个 Pulsar 实例由一个或多个 Pulsar 集群组成。实例中的集群之间可以相互复制数据。

单个 Pulsar 集群由以下三部分组成:

  • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
  • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储
  • A ZooKeeper cluster specific to that cluster handles coordination tasks between Pulsar clusters.

下图为一个 Pulsar 集群:

Pulsar架构图

在更细粒度的实例级别, 有一个能访问到全部实例的ZooKeeper群集处理涉及多个pulsar集群的配置协调任务, 例如 异地复制

Brokers

Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:

  • 一个 HTTP 服务器, 它暴露了 REST 系统管理接口以及在生产者和消费者之间进行 Topic查找的API。
  • 一个调度分发器, 它是异步的TCP服务器,通过自定义 二进制协议应用于所有相关的数据传输。

出于性能的考虑, 通常从 managed ledger (ledger是Pulsar底层存储BookKeeper中的概念,相当于一种记录的集合) 缓存中调度消息, 除非 积压的消息超过这个缓存的大小。 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域

如何管理Pulsar Brokers, 请参考 brokers 指南

集群

一个Pulsar实例包含一个或者多个Pulsar集群。集群包括:

  • 一个或者多个Pulsar brokers
  • 一个ZooKeeper协调器,用于集群级别的配置和协调
  • 一组BookKeeper的Bookies用于消息的 持久化存储

集群间可以通过异地复制进行消息同步

如何管理Pulsar集群,请参考clusters指南

元数据存储

Pulsar利用Apache Zookeeper进行元数据存储,集群配置和协调。在一个Pulsar实例中:

  • 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项
  • 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的),等等

持久化存储

Pulsar提供有保证的应用消息运输。如果消息成功送达了Pulsar Broker,它一定会被送达至它的目的地

为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达 这种消息推送方式通常叫做持久化消息推送 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储

Apache BookKeeper

Pulsar用 Apache BookKeeper作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

  • 能让Pulsar创建多个独立的日志,这种独立的日志就是ledgers. 随着时间的推移,Pulsar会为Topic创建多个ledgers。
  • 为按条目复制的顺序数据提供了非常高效的存储。
  • 保证了多系统挂掉时ledgers的读取一致性。
  • 提供不同的Bookies之间均匀的IO分布的特性。
  • 容量和吞吐量都能水平扩展。并且容量可以通过在集群内添加更多的Bookies立刻提升。
  • Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

除了消息数据,cursors也会被持久化入BookKeeper。 Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。

目前,Pulsar支持持久消息存储。这可以通过topic名称中的persistent字眼实现。这里有一个例子:

  1. persistent://tenant/namespace/topic

Pulsar也支持临时消息( (non-persistent) )存储。

下图展示了brokers和bookies是如何交互的

Brokers和bookies

Ledgers

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:

  • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。
  • 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。
  • 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)。

Ledger读一致性

BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。

Managed ledgers

由于BookKeeper Ledgers提供了单一的日志抽象,在ledger的基础上我们开发了一个叫managed ledger的库,用以表示单个topic的存储层。 managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。

一个managed ledger在内部用多个BookKeeper ledgers保存数据,这么做有两个原因:

  1. 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。
  2. ledger在所有cursors消费完它所保存的消息之后就可以被删除,这样可以实现ledgers的定期翻滚从头写。

日志存储

BookKeeper的日志文件包含事务日志。 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的日志文件会被创建。

Pulsar proxy

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。

Pulsar proxy提供了解决这个问题的方案,它可以作为集群中的所有brokers的统一网关。 如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信。

为了性能和容错,你可以运行任意个Pulsar proxy。

架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。 Here’s an example:

  1. $ bin/pulsar proxy \
  2. --zookeeper-servers zk-0,zk-1,zk-2 \
  3. --configuration-store-servers zk-0,zk-1,zk-2

Pulsar proxy 文档

如何使用Pulsar proxy,参考 Pulsar proxy 管理指南

关于Pulsar proxy有一些比较重要的注意点:

  • 连接客户端不需要为使用Pulsar proxy提供任何特定配置。 除了更新用于服务URL的IP之外,你不需要为现有的应用更新客户端配置(例如你在Pulsar proxy上层架设运行了负载均衡器)。
  • Pulsar proxy支持TLS 加密认证

Service discovery

Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar内部提供了服务发现的机制,你可以通过 配置Pulsar实例指南设置。

你也可以用你自己的服务发现系统。 如果你用你自己的系统,只需满足一个需求:当客户端发送一个HTTP请求,例如发到http://pulsar.us-west.example.com:8080,客户端需要被重定向到某些所需的集群中活跃的broker,或者通过DNS,或者通过HTTP和IP重定向,或者其他机制。

下面这张图展示了Pulsar服务发现机制:

alt-text

图中,Pulsar集群可以通过一个DNS名称寻址:pulsar-cluster.acme.com。 例如Python客户端,可以像这样访问这个Pulsar集群:

  1. from pulsar import Client
  2. client = Client('pulsar://pulsar-cluster.acme.com:6650')