Partitioned topics

默认情况下,Pulsar主题在单个broker上运行。 仅仅使用单个 broker 会限制主题的最大吞吐量。 Partitioned topics are a special type of topic that can span multiple brokers and thus allow for much higher throughput. 有关分区主题如何工作,请参见 分区主题的概念。

可以使用 Pulsar 客户端库生产消息到分区主题,也可以通过 Pulsar admin API创建和管理分区主题。

发布到分区主题

发布消息到主题,创建生产者的时候不需要特意指定路由模式。 如果不特意指定路由模式,则默认使用轮询路由策略。 这是一个Java的示例。

在Java客户端中将消息发布到分区主题,和发布到普通主题非常类似。 不同之处在于您需要指定一个消息路器(从已有的路由器中选择一个或实现一个自定义的路由器)。

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. Three options are available:

  • SinglePartition
  • RoundRobinPartition
  • CustomPartition

如下所示:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRoutingMode(MessageRoutingMode.SinglePartition)
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

Custom message router

要使用自定义消息路由器,您需要提供MessageRouter 接口的实现,该接口只有一个choosePartition方法:

  1. public interface MessageRouter extends Serializable {
  2. int choosePartition(Message msg);
  3. }

下面的路由模式表示所有的消息都会被发送到分区10:

  1. public class AlwaysTenRouter implements MessageRouter {
  2. public int choosePartition(Message msg) {
  3. return 10;
  4. }
  5. }

有了这个实现,你可以这样发送消息:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRouter(new AlwaysTenRouter())
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

使用 Key 时如何选择分区

If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose partition when you use a key.

  1. // 如果消息存在key,轮询路由策略将被替换
  2. if (msg.hasKey()) {
  3. return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
  4. }
  5. if (isBatchingEnabled) { // 如果开启批处理,请在 `partitionSwitchMs` 边界上选择分区。
  6. long currentMs = clock.millis();
  7. return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartiations());
  8. other.
  9. return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartiations());
  10. }

管理分区主题

你能够使用 Pulsar admin API 创建和管理分区主题