消息队列是许多大规模数据架构的基本组件。 If every single work object that passes through your system absolutely must be processed in spite of the slowness or downright failure of this or that system component, there’s a good chance that you’ll need a message queue to step in and ensure that unprocessed data is retained—-with correct ordering—-until the required actions are taken.

对于消息队列而言,Pulsar 是绝佳选择,这是因为:

你可以使用相同的 Pulsar 安装配置来充当实时消息总线和消息队列(也可只使用其中一个)。 你可以为实时目的留出一些主题,为消息队列目的留出其他主题(也可为任一目的使用特定的命名空间)。

客户端配置更改

要将 Pulsar 主题用作消息队列,应将该主题的接收器负载分配到多个消费者(最佳消费者数取决于实时负载量)。 每个消费者必须:

  • 建立共享订阅,使用与其他消费者相同的订阅名称(否则订阅将不共享,消费者集群无法充当处理集合)

  • If you’d like to have tight control over message dispatching across consumers, set the consumers’ receiver queue size very low (potentially even to 0 if necessary). 每个 Pulsar 消费者都有一个接收器队列,用于确定消费者一次尝试获取的消息数量。 例如,接收器队列 1000 (默认值)意味着消费者将尝试在连接时处理来自主题的 1000 条待办消息。 将接收器队列值设置为零实质上意味着确保每个消费者一次只做一件事。

    限制消费者的接收器队列大小的缺点是限制了这些消费者的潜在吞吐量,并且不能与分区主题一起使用。 降低性能以获得更好的控制是否值得取决于你的用例。

Java 客户端

以下是使用共享订阅的 Java 消费者配置示例:

  1. import org.apache.pulsar.client.api.Consumer;
  2. import org.apache.pulsar.client.api.PulsarClient;
  3. import org.apache.pulsar.client.api.SubscriptionType;
  4. String SERVICE_URL = "pulsar://localhost:6650";
  5. String TOPIC = "persistent://public/default/mq-topic-1";
  6. String subscription = "sub-1";
  7. PulsarClient client = PulsarClient.builder()
  8. .serviceUrl(SERVICE_URL)
  9. .build();
  10. Consumer consumer = client.newConsumer()
  11. .topic(TOPIC)
  12. .subscriptionName(subscription)
  13. .subscriptionType(SubscriptionType.Shared)
  14. // If you'd like to restrict the receiver queue size
  15. .receiverQueueSize(10)
  16. .subscribe();

Python 客户端

以下是使用共享订阅的 Python 消费者配置示例:

  1. from pulsar import Client, ConsumerType
  2. SERVICE_URL = "pulsar://localhost:6650"
  3. TOPIC = "persistent://public/default/mq-topic-1"
  4. SUBSCRIPTION = "sub-1"
  5. client = Client(SERVICE_URL)
  6. consumer = client.subscribe(
  7. TOPIC,
  8. SUBSCRIPTION,
  9. # If you'd like to restrict the receiver queue size
  10. receiver_queue_size=10,
  11. consumer_type=ConsumerType.Shared)

C++ 客户端

以下是使用共享订阅的 C++ 消费者配置示例:

  1. #include <pulsar/Client.h>
  2. std::string serviceUrl = "pulsar://localhost:6650";
  3. std::string topic = "persistent://public/defaultmq-topic-1";
  4. std::string subscription = "sub-1";
  5. Client client(serviceUrl);
  6. ConsumerConfiguration consumerConfig;
  7. consumerConfig.setConsumerType(ConsumerType.ConsumerShared);
  8. // If you'd like to restrict the receiver queue size
  9. consumerConfig.setReceiverQueueSize(10);
  10. Consumer consumer;
  11. Result result = client.subscribe(topic, subscription, consumerConfig, consumer);