Pulsar 推出了支持 JavaGoPythonC++ 的客户端 API。 Puslar API 优化并封装了客户端到 broker 端的通信协议,暴露出一套简单直观的 API 供应用程序使用。

底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。

自定义客户端库

如果您想创建自己的客户端库, 我们建议参考Pulsar的自定义 二进制协议 的文档。

客户端使用步骤

当应用程序要创建生产者/消费者时, Pulsar客户端库执行按以下两个步骤的工作:

  1. 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询Zookeeper中(缓存)的元数据,来确定这条消息的topic在哪个broker上,如果该topic不在任何一个broker上,则把这个topic分配在负载最少的broker上。
  2. 当客户端获取了broker的地址之后,将会创建一个TCP连接(或复用连接池中的连接)并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。

每当 TCP 连接中断时, 客户端将立即重新启动此安装阶段, 并将继续尝试使用指数退避重新建立生产者或使用者, 直到操作成功为止。

Reader 接口

在Pulsar中, “标准” 消费者接口 涉及使用消费者监听 主题, 处理传入消息, 并在处理完这些消息后最终确认它们。 不论任何时候创建的一个新订阅,默认都会定位在 topic 的末尾位置,这意味着使用该订阅的消费者都只能接收在这之后新产生的消息。 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。

Pulsar 的 reader 接口允许应用程序手动管理游标。 当您使用 reader(而不是消费者)连接 topic 时,需要指定 reader 在连接到该 topic 时从哪条消息开始消费。 当连接到一个 topic 时,reader 接口支持的开始位置包括:

  • Topic 中最早的可用消息
  • Topic 中最新的可用消息
  • 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。

Pulsar的读取器接口在流计算场景下,对提供effective-once的语义很有帮助。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。

Pulsar的消费者和读取器接口

仅支持没有分区的 topic

Reader 接口目前无法在有分区主题(partitioned topics)上使用。

下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageId;
  3. import org.apache.pulsar.client.api.Reader;
  4. // Create a reader on a topic and for a specific message (and onward)
  5. Reader<byte[]> reader = pulsarClient.newReader()
  6. .topic("reader-api-test")
  7. .startMessageId(MessageId.earliest)
  8. .create();
  9. while (true) {
  10. Message message = reader.readNext();
  11. // Process the message
  12. }

创建一个从最新可用消息处开始读取消息的读取器

  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.latest)
  4. .create();

创建一个从其他位置(非最早可用且非最新可用消息处)读取消息的读取器

  1. byte[] msgIdBytes = // Some byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader<byte[]> reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();