默认情况下,Pulsar主题在单个broker上运行。 Using only a single broker, however, limits a topic’s maximum throughput. 分区Topic是一种特殊类型的 topic,可以跨越多个broker,从而实现更高的吞吐量。 有关分区主题如何工作,请参见 分区主题的概念。

You can publish to partitioned topics using Pulsar’s client libraries and you can create and manage partitioned topics using Pulsar’s admin API.

Publishing to partitioned topics

When publishing to partitioned topics, the only difference from non-partitioned topics is that you need to specify a routing mode when you create a new producer. Examples for Java are below.

Java

在Java客户端中将消息发布到分区主题,和发布到普通主题非常类似。 不同之处在于您需要指定一个消息路器(从已有的路由器中选择一个或实现一个自定义的路由器)。

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. You have three options:

  • SinglePartition
  • RoundRobinPartition
  • CustomPartition

Here’s an example:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRoutingMode(MessageRoutingMode.SinglePartition)
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

Custom message router

要使用自定义消息路由器,您需要提供MessageRouter 接口的实现,该接口只有一个choosePartition方法:

  1. public interface MessageRouter extends Serializable {
  2. int choosePartition(Message msg);
  3. }

Here’s a (not very useful!) router that routes every message to partition 10:

  1. public class AlwaysTenRouter implements MessageRouter {
  2. public int choosePartition(Message msg) {
  3. return 10;
  4. }
  5. }

有了这个实现,你可以这样发送消息:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRouter(new AlwaysTenRouter())
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

Managing partitioned topics

You can use Pulsar’s admin API to create and manage partitioned topics.