Pulsar broker集群负载均衡

Pulsar 是一个横向可伸缩的消息系统,其中一个核心需求是:一个合理的集群中的流量必须尽可能均匀地分布在所有可用的 Pulsar brokers 上。

您可以使用多种设置和工具来控制流量分布,这需要了解一些如何在 Pulsar 中管理流量的背景知识。 当然,在大多数情况下,上面提到的核心需求是开箱即用的,您不必担心。

Pulsar 负载管理体系架构

接下来的部分介绍了 Pulsar 负载管理器(load manager)的基本结构。

动态地为 brokers 分配 topic

Pulsar 会根据集群所有 brokers 的负载情况,动态的将 topics 分配给 brokers。

新出现的 topic 都会触发负载检测,会选择出最适合的 broker 来认领这个 topic 的所有权(ownership)。

In case of partitioned topics, different partitions are assigned to different brokers. Here “topic” means either a non-partitioned topic or one partition of a topic.

分配的速度会很快,看起来就像是在 “无缝切换”。 例如有 broker 崩溃,它的 topic 将被立即重新分配给另一个 broker 。 还有种情况是 broker 已经过载。 那么它的 topic 就会被重新分配给负载较少的 broker。

又因为 Brokers 的无状态特性,使得根据使用情况动态分配成为可能,如可以快速扩容或缩小集群规模。

分配粒度

向 brokers 分配 topics 或分区不是在 topics 或分区级别完成的,而是在 Bundle 级别(更高级别)完成的。 目的是为了摊销需要跟踪的信息数量。 主题基于 CPU、内存、流量负载和其他索引项动态地分配给特定 broker。

与单个 topic 或分区分配不同,每个 broker 拥有一个命名空间下所有 topic 的一个子集的所有权(ownership)。 This subset is called a “bundle“ and effectively this subset is a sharding mechanism.

The namespace is the “administrative” unit: many config knobs or operations are done at the namespace level.

For assignment, a namespaces is sharded into a list of “bundles”, with each bundle comprising a portion of overall hash range of the namespace.

Topics are assigned to a particular bundle by taking the hash of the topic name and checking in which bundle the hash falls into.

Each bundle is independent of the others and thus is independently assigned to different brokers.

创建命名空间和包

When you create a new namespace, the new namespace sets to use the default number of bundles. 您可以在 conf/brocher.conf 中设置此选项:

  1. # When a namespace is created without specifying the number of bundle, this
  2. # value will be used as the default
  3. defaultNumberOfNamespaceBundles=4

You can either change the system default, or override it when you create a new namespace:

  1. $ bin/pulsar-admin namespaces create my-tenant/my-namespace --clusters us-west --bundles 16

With this command, you create a namespace with 16 initial bundles. Therefore the topics for this namespaces can immediately be spread across up to 16 brokers.

In general, if you know the expected traffic and number of topics in advance, you had better start with a reasonable number of bundles instead of waiting for the system to auto-correct the distribution.

On the same note, it is beneficial to start with more bundles than the number of brokers, because of the hashing nature of the distribution of topics into bundles. For example, for a namespace with 1000 topics, using something like 64 bundles achieves a good distribution of traffic across 16 brokers.

Unload topics and bundles

You can “unload” a topic in Pulsar with admin operation. Unloading means to close the topics, release ownership and reassign the topics to a new broker, based on current load.

When unloading happens, the client experiences a small latency blip, typically in the order of tens of milliseconds, while the topic is reassigned.

Unloading is the mechanism that the load-manager uses to perform the load shedding, but you can also trigger the unloading manually, for example to correct the assignments and redistribute traffic even before having any broker overloaded.

Unloading a topic has no effect on the assignment, but just closes and reopens the particular topic:

  1. pulsar-admin topics unload persistent://tenant/namespace/topic

To unload all topics for a namespace and trigger reassignments:

  1. pulsar-admin namespaces unload tenant/namespace

拆分命名空间

Since the load for the topics in a bundle might change over time, or predicting upfront might just be hard, brokers can split bundles into two. The new smaller bundles can be reassigned to different brokers.

The splitting happens based on some tunable thresholds. Any existing bundle that exceeds any of the threshold is a candidate to be split. By default the newly split bundles are also immediately offloaded to other brokers, to facilitate the traffic distribution.

  1. # enable/disable namespace bundle auto split
  2. loadBalancerAutoBundleSplitEnabled=true
  3. # enable/disable automatic unloading of split bundles
  4. loadBalancerAutoUnloadSplitBundlesEnabled=true
  5. # maximum topics in a bundle, otherwise bundle split will be triggered
  6. loadBalancerNamespaceBundleMaxTopics=1000
  7. # maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
  8. loadBalancerNamespaceBundleMaxSessions=1000
  9. # maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
  10. loadBalancerNamespaceBundleMaxMsgRate=30000
  11. # maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
  12. loadBalancerNamespaceBundleMaxBandwidthMbytes=100
  13. # maximum number of bundles in a namespace (for auto-split)
  14. loadBalancerNamespaceMaximumBundles=128

Shed load automatically

The support for automatic load shedding is avaliable in the load manager of Pulsar. This means that whenever the system recognizes a particular broker is overloaded, the system forces some traffic to be reassigned to less loaded brokers.

When a broker is identified as overloaded, the broker forces to “unload” a subset of the bundles, the ones with higher traffic, that make up for the overload percentage.

For example, the default threshold is 85% and if a broker is over quota at 95% CPU usage, then the broker unloads the percent difference plus a 5% margin: (95% - 85%) + 5% = 15%.

Given the selection of bundles to offload is based on traffic (as a proxy measure for cpu, network and memory), broker unloads bundles for at least 15% of traffic.

The automatic load shedding is enabled by default and you can disable the automatic load shedding with this setting:

  1. # Enable/disable automatic bundle unloading for load-shedding
  2. loadBalancerSheddingEnabled=true

Additional settings that apply to shedding:

  1. # Load shedding interval. Broker periodically checks whether some traffic should be offload from
  2. # some over-loaded broker to other under-loaded brokers
  3. loadBalancerSheddingIntervalMinutes=1
  4. # Prevent the same topics to be shed and moved to other brokers more that once within this timeframe
  5. loadBalancerSheddingGracePeriodMinutes=30

Broker 过载阈值

The determinations of when a broker is overloaded is based on threshold of CPU, network and memory usage. Whenever either of those metrics reaches the threshold, the system triggers the shedding (if enabled).

默认情况下,负载阈值设置为85%:

  1. # 使用阈值确定 broker 是否过载
  2. loadBalancerBrokerOverloadedThresholdPercentage=85

Pulsar gathers the usage stats from the system metrics.

In case of network utilization, in some cases the network interface speed that Linux reports is not correct and needs to be manually overridden. This is the case in AWS EC2 instances with 1Gbps NIC speed for which the OS reports 10Gbps speed.

Because of the incorrect max speed, the Pulsar load manager might think the broker has not reached the NIC capacity, while in fact the broker already uses all the bandwidth and the traffic is slowed down.

You can use the following setting to correct the max NIC speed:

  1. # Override the auto-detection of the network interfaces max speed.
  2. # This option is useful in some environments (eg: EC2 VMs) where the max speed
  3. # reported by Linux is not reflecting the real bandwidth available to the broker.
  4. # Since the network usage is employed by the load manager to decide when a broker
  5. # is overloaded, it is important to make sure the info is correct or override it
  6. # with the right value here. The configured value can be a double (eg: 0.8) and that
  7. # can be used to trigger load-shedding even before hitting on NIC limits.
  8. loadBalancerOverrideBrokerNicSpeedGbps=

When the value is empty, Pulsar uses the value that the OS reports.