Pulsar broker集群负载均衡

Pulsar 为横向可扩展的消息传递系统,因此它有一个核心需求,其逻辑集群中的流量必须尽可能均匀地分布在所有可用的 Pulsar broker 中。

多数情况下,Pulsar 是即时可用的,不必为此而担心。 尽管如此,Pulsar 有多种设置和工具用于控制流量分配。想理解 Pulsar 中的流量管理方式,需要了解一些背景知识。

Pulsar 负载管理体系架构

向 broker 动态分配主题

根据集群中所有 broker 的负载状态,将主题动态分配给 broker。

当客户端使用未分配给任何 broker 的新主题时,会触发进程,进程将根据负载状态选择最适合的 broker 获取这些主题的所有权。

In case of partitioned topics, different partitions are assigned to different brokers. Here “topic” means either a non-partitioned topic or one partition of a topic.

分配是“动态的”,因为它变化很快。 例如,如果负责该主题的 broker 崩溃,则该主题将立即重新分配给另一个 broker。 另一种情况是负责该主题的 broker 过载。 在这种情况下, 主题重新分配给负载较少的 broker。

Broker 的无状态特性支持动态分配,可以根据使用情况快速扩展或缩小集群。

分配粒度

为 broker 分配主题/分区不在主题/分区级别完成,而是在 Bundle 级别(更高级别)完成。 目的是为了摊销需要跟踪的信息数量。 主题基于 CPU、内存、流量负载和其他索引项动态地分配给特定 broker。

每个 broker 都拥有主题子集的所有权以用于命名空间,而不是单独的主题/分区分配。 该子集称为“bundle”,实际上是一种分片机制。

命名空间是“管理”单元:许多配置调节器或操作在命名空间级别完成。

For assignment, a namespaces is sharded into a list of “bundles”, with each bundle comprising a portion of overall hash range of the namespace.

Topics are assigned to a particular bundle by taking the hash of the topic name and seeing in which bundle the hash falls into.

Each bundle is independent of the others and thus is independently assigned to different brokers.

Creating namespaces and bundles

When creating a new namespace, it will set to use the default number of bundles. This is set in conf/broker.conf:

  1. # When a namespace is created without specifying the number of bundle, this
  2. # value will be used as the default
  3. defaultNumberOfNamespaceBundles=4

One can either change the system default, or override it when creating a new namespace:

  1. $ bin/pulsar-admin namespaces create my-tenant/my-namespace --clusters us-west --bundles 16

With this command, we’re creating a namespace with 16 initial bundles. Therefore the topics for this namespaces can immediately be spread across up to 16 brokers.

In general, if the expected traffic and number of topics is known in advance, it’s a good idea to start with a reasonable number of bundles instead of waiting for the system to auto-correct the distribution.

On a same note, it is normally beneficial to start with more bundles than number of brokers, primarily because of the hashing nature of the distribution of topics into bundles. For example, for a namespace with 1000 topics, using something like 64 bundles will achieve a good distribution of traffic across 16 brokers.

Unloading topics and bundles

In Pulsar there is an admin operation of “unloading” a topic. Unloading means to close the topics, release ownership and reassign the topics to a new broker, based on current load.

When unload happens, the client will experience a small latency blip, typically in the order of tens of milliseconds, while the topic is reassigned.

Unloading is the mechanism used by the load-manager to perform the load shedding, but it can also be triggered manually, for example to correct the assignments and redistribute traffic even before having any broker overloaded.

Unloading a topic has no effect on the assignment, but it will just close and reopen the particular topic:

  1. pulsar-admin topics unload persistent://tenant/namespace/topic

To unload all topics for a namespace and trigger reassignments:

  1. pulsar-admin namespaces unload tenant/namespace

Namespace bundles splitting

Since the load for the topics in a bundle might change over time, or could just be hard to predict upfront, bundles can be split in 2 by brokers. The new smaller bundles can then be reassigned to different brokers.

The splitting happens based on some tunable thresholds. Any existing bundle that exceeds any of the threshold is a candidate to be split. By default the newly split bundles are also immediately offloaded to other brokers, to facilitate the traffic distribution.

  1. # enable/disable namespace bundle auto split
  2. loadBalancerAutoBundleSplitEnabled=true
  3. # enable/disable automatic unloading of split bundles
  4. loadBalancerAutoUnloadSplitBundlesEnabled=true
  5. # maximum topics in a bundle, otherwise bundle split will be triggered
  6. loadBalancerNamespaceBundleMaxTopics=1000
  7. # maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
  8. loadBalancerNamespaceBundleMaxSessions=1000
  9. # maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
  10. loadBalancerNamespaceBundleMaxMsgRate=30000
  11. # maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
  12. loadBalancerNamespaceBundleMaxBandwidthMbytes=100
  13. # maximum number of bundles in a namespace (for auto-split)
  14. loadBalancerNamespaceMaximumBundles=128

Automatic load shedding

In Pulsar’s load manager there is support for automatic load shedding. This means that whenever the system recognized a particular broker is overloaded, it will force some traffic to be reassigned to less loaded brokers.

When a broker is identifies as overloaded, it will force to “unload” a subset of the bundles, the ones with higher traffic, that make up for the overload percentage.

For example, the default threshold is 85% and if a broker is over quota at 95% CPU usage, then it will unload the percent difference plus a 5% margin: (95% - 85%) + 5% = 15%.

Given the selection of bundles to offload is based on traffic (as a proxy measure for cpu, network and memory), broker will unload bundles for at least 15% of traffic.

The automatic load shedding is enabled by default and can be disabled with this setting:

  1. # Enable/disable automatic bundle unloading for load-shedding
  2. loadBalancerSheddingEnabled=true

There are additional settings that apply to shedding:

  1. # Load shedding interval. Broker periodically checks whether some traffic should be offload from
  2. # some over-loaded broker to other under-loaded brokers
  3. loadBalancerSheddingIntervalMinutes=1
  4. # Prevent the same topics to be shed and moved to other brokers more that once within this timeframe
  5. loadBalancerSheddingGracePeriodMinutes=30

Broker 过载阈值

确定 broker 何时过载要基于 CPU、网络和内存使用的阈值。 这些指标中的任何一个达到阈值,都将触发脱落(如已启用)。

默认情况下,负载阈值设置为85%:

  1. # 使用阈值确定 broker 是否过载
  2. loadBalancerBrokerOverloadedThresholdPercentage=85

The usage stats are gathered by Pulsar from the system metrics.

In case of network utilization, in some cases the network interface speed reported by Linux is not correct and needs to be manually overridden. This is the case in AWS EC2 instances with 1Gbps NIC speed for which the OS report 10Gbps speed.

Because of the incorrect max speed, the Pulsar load manager might think the broker has not reached the NIC capacity, while in fact it’s already using all the bandwidth and the traffic is being slowed down.

以下设置可修正最大网卡速度:

  1. # Override the auto-detection of the network interfaces max speed.
  2. # This option is useful in some environments (eg: EC2 VMs) where the max speed
  3. # reported by Linux is not reflecting the real bandwidth available to the broker.
  4. # Since the network usage is employed by the load manager to decide when a broker
  5. # is overloaded, it is important to make sure the info is correct or override it
  6. # with the right value here. The configured value can be a double (eg: 0.8) and that
  7. # can be used to trigger load-shedding even before hitting on NIC limits.
  8. loadBalancerOverrideBrokerNicSpeedGbps=

当值为空时,Pulsar 将使用 OS 提供的值。