Architecture Overview

At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can replicate data amongst themselves.

单个 Pulsar 集群由以下三部分组成:

  • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
  • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储
  • 一个Zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

下图为一个 Pulsar 集群:

Pulsar架构图

在更细粒度的实例级别, 有一个能访问到全部实例的ZooKeeper群集处理涉及多个pulsar集群的配置协调任务, 例如 异地复制

Brokers

Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:

  • An HTTP server that exposes a REST API for both administrative tasks and topic lookup for producers and consumers. The producers connect to the brokers to publish messages and the consumers connect to the brokers to consume the messages.
  • 一个调度分发器, 它是异步的TCP服务器,通过自定义 二进制协议应用于所有相关的数据传输。

Messages are typically dispatched out of a managed ledger cache for the sake of performance, unless the backlog exceeds the cache size. 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域

如何管理Pulsar Brokers, 请参考 brokers 指南

集群

A Pulsar instance consists of one or more Pulsar clusters. Clusters, in turn, consist of:

  • 一个或者多个Pulsar brokers
  • 一个ZooKeeper协调器,用于集群级别的配置和协调
  • 一组BookKeeper的Bookies用于消息的 持久化存储

集群间可以通过异地复制进行消息同步

如何管理Pulsar集群,请参考clusters指南

元数据存储

The Pulsar metadata store maintains all the metadata of a Pulsar cluster, such as topic metadata, schema, broker load data, and so on. Pulsar uses Apache ZooKeeper for metadata storage, cluster configuration, and coordination. The Pulsar metadata store can be deployed on a separate ZooKeeper cluster or deployed on an existing ZooKeeper cluster. You can use one ZooKeeper cluster for both Pulsar metadata store and BookKeeper metadata store. If you want to deploy Pulsar brokers connected to an existing BookKeeper cluster, you need to deploy separate ZooKeeper clusters for Pulsar metadata store and BookKeeper metadata store respectively.

In a Pulsar instance:

  • 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项
  • 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的)等等。

Configuration store

The configuration store maintains all the configurations of a Pulsar instance, such as clusters, tenants, namespaces, partitioned topic related configurations, and so on. A Pulsar instance can have a single local cluster, multiple local clusters, or multiple cross-region clusters. Consequently, the configuration store can share the configurations across multiple clusters under a Pulsar instance. The configuration store can be deployed on a separate ZooKeeper cluster or deployed on an existing ZooKeeper cluster.

持久化存储

Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.

为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达。 This mode of messaging is commonly called persistent messaging. 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储。

Apache BookKeeper

Pulsar用 Apache BookKeeper作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

  • It enables Pulsar to utilize many independent logs, called ledgers. Multiple ledgers can be created for topics over time.
  • 为按条目复制的顺序数据提供了非常高效的存储。
  • 保证了多系统挂掉时ledgers的读取一致性。
  • 提供不同的Bookies之间均匀的IO分布的特性。
  • It’s horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
  • Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

In addition to message data, cursors are also persistently stored in BookKeeper. Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。

At the moment, Pulsar supports persistent message storage. This accounts for the persistent in all topic names. 下面是一个示例:

  1. persistent://tenant/namespace/topic

Pulsar也支持临时消息( (non-persistent) )存储。

下图展示了brokers和bookies是如何交互的

Brokers和bookies

Ledgers

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:

  • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。
  • 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。
  • 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)。

Ledger读一致性

BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。

Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

  1. 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。
  2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

日志存储

In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的日志文件会被创建。

Pulsar proxy

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。

The Pulsar proxy provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. 如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信。

为了性能和容错,你可以运行任意个Pulsar proxy。

架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。 下面是一个示例:

  1. $ bin/pulsar proxy \
  2. --zookeeper-servers zk-0,zk-1,zk-2 \
  3. --configuration-store-servers zk-0,zk-1,zk-2

Pulsar proxy 文档

如何使用Pulsar proxy,参考 Pulsar proxy 管理指南

关于Pulsar proxy有一些比较重要的注意点:

  • Connecting clients don’t need to provide any specific configuration to use the Pulsar proxy. 除了更新用于服务URL的IP之外,你不需要为现有的应用更新客户端配置(例如你在Pulsar proxy上层架设运行了负载均衡器)。
  • Pulsar proxy支持TLS 加密认证

服务发现

客户端 需要能够使用单个 URL 与整个 Pulsar 实例进行通信。 Pulsar内部提供了服务发现的机制,你可以通过 配置Pulsar实例指南设置。

你也可以用你自己的服务发现系统。 If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as http://pulsar.us-west.example.com:8080, the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

下面这张图展示了Pulsar服务发现机制:

alt-text

图中,Pulsar集群可以通过一个DNS名称寻址:pulsar-cluster.acme.com。 例如Python客户端,可以像这样访问这个Pulsar集群:

  1. from pulsar import Client
  2. client = Client('pulsar://pulsar-cluster.acme.com:6650')

Note In Pulsar, each topic is handled by only one broker. 客户端发出的读取,更新或删除主题的初始请求将发送给可能不是处理该主题的 broker 。 如果这个 broker 不能处理该主题的请求,broker 将会把该请求重定向到可以处理主题请求的 broker。