Pulsar Clients

Pulsar exposes a client API with language bindings for Java, Go, Python, C++ and C#. Puslar API 优化并封装了客户端到 broker 端的通信协议,暴露出一套简单直观的 API 供应用程序使用。

底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。

Custom client libraries If you’d like to create your own client library, we recommend consulting the documentation on Pulsar’s custom binary protocol.

客户端使用步骤

Before an application creates a producer/consumer, the Pulsar client library needs to initiate a setup phase including two steps:

  1. The client attempts to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata knows who is serving the topic or, in case nobody is serving it, tries to assign it to the least loaded broker.
  2. Once the client library has the broker address, it creates a TCP connection (or reuse an existing connection from the pool) and authenticates it. 客户端和broker通过该连接交换基于自定义协议的二进制命令。 At this point the client sends a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.

Whenever the TCP connection breaks, the client immediately re-initiates this setup phase and keeps trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.

Reader 接口

In Pulsar, the “standard” consumer interface involves using consumers to listen on topics, process incoming messages, and finally acknowledge those messages when they are processed. Whenever a new subscription is created, it is initially positioned at the end of the topic (by default), and consumers associated with that subscription begin reading with the first message created afterwards. 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。

The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:

  • The earliest available message in the topic
  • The latest available message in the topic
  • 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。

Reader 接口对流处理系统中,需要用到 effectively-once(仅仅一次) 语义的场景是很有帮助的。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。

Reader 接口内部是作为一个使用独占、非持久化订阅的被随机命名的一个消费者来实现的。

[ IMPORTANT ]

Unlike subscription/consumer, readers are non-durable in nature and does not prevent data in a topic from being deleted, thus it is strongly advised that data retention be configured. 如果主题没有配置足够长的消息保留时间,就会出现消息还没有被读取就被删除的情况。 This causes the readers to essentially skip messages. Configuring the data retention for a topic guarantees the reader with a certain duration to read a message.

Please also note that a reader can have a “backlog”, but the metric is only used for users to know how behind the reader is. The metric is not considered for any backlog quota calculations.

Pulsar的消费者和读取器接口

下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageId;
  3. import org.apache.pulsar.client.api.Reader;
  4. // Create a reader on a topic and for a specific message (and onward)
  5. Reader<byte[]> reader = pulsarClient.newReader()
  6. .topic("reader-api-test")
  7. .startMessageId(MessageId.earliest)
  8. .create();
  9. while (true) {
  10. Message message = reader.readNext();
  11. // Process the message
  12. }

To create a reader that reads from the latest available message:

  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.latest)
  4. .create();

To create a reader that reads from some message between the earliest and the latest:

  1. byte[] msgIdBytes = // Some byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader<byte[]> reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();