Pulsar Java 客户端可以用于创建Java生产者(Producer)和消费者(Consumer),消息读取器以及执行管理的任务 Java 客户端的当前版本为 2.6.1

Pulsar客户端的Javadoc分成了两个包:

DescriptionMaven Artifact
org.apache.pulsar.client.api生产者和消费者APIorg.apache.pulsar:pulsar-client:2.6.1
org.apache.pulsar.client.adminJava 管理APIorg.apache.pulsar:pulsar-client-admin:2.6.1

本文档仅关注Pulsar主题消息的生产和消费的客户端API. 关于使用 Java 管理客户端的指南, 请参见 Pulsar管理接口

安装

The latest version of the Pulsar Java client library is available via Maven Central. To use the latest version, add the pulsar-client library to your build configuration.

Maven

如果你使用maven,添加以下内容到你的 pom.xml 中:

  1. <!-- 在你的 <properties> 部分-->
  2. <pulsar.version>2.6.1</pulsar.version>
  3. <!-- 在你的 <dependencies> 部分-->
  4. <dependency>
  5. <groupId>org.apache.pulsar</groupId>
  6. <artifactId>pulsar-client</artifactId>
  7. <version>${pulsar.version}</version>
  8. </dependency>

Gradle

如果你使用Gradle,添加以下内容到你的 build.gradle 中:

  1. def pulsarVersion = '2.6.1'
  2. dependencies {
  3. compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
  4. }

连接URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here’s an example for localhost:

  1. pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

If you’re using TLS authentication, the URL will look like something like this:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

客户端配置

你可以用一个URL来实例化一个连接到指定的Pulsar 集群PulsarClient 对象,像这样:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();

默认的broker URL是单机集群。

如果你使用单机模式运行一个集群,broker将默认使用pulsar://localhost:6650

完整的配置参数列表参考 PulsarClient 类的javadoc文档 。

除了客户端级别的配置,你还可以使用 生产者消费者特定配置,你将在下面的章节中看到。

Producers

在Pulsar中,生产者写消息到主题中。 一旦你实例化一个PulsarClient 客户端对象(在如上z章节),你可以创建一个Producer 生产者用于特定的主题

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic("my-topic")
  3. .create();
  4. // 然后你就可以发送消息到指定的broker 和topic上:
  5. producer.send("My message".getBytes());

By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message schema.

  1. Producer<String> stringProducer = client.newProducer(Schema.STRING)
  2. .topic("my-topic")
  3. .create();
  4. stringProducer.send("My message");

在不再使用时,你需要确保关闭生产者、消费者和客户端:

producer.close(); consumer.close(); client.close();

  1. 关闭操作也可以是异步的:
  2. ```java
  3. producer.closeAsync()
  4. .thenRun(() -> System.out.println("Producer closed"));
  5. .exceptionally((ex) -> {
  6. System.err.println("Failed to close producer: " + ex);
  7. return ex;
  8. });

生产者配置

如果实例化 生产者 对象时仅指定主题topic名称 (如上面的示例所示), 则生产者将使用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。 For a full listing, see the Javadoc for the ProducerBuilder class. Here’s an example:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic("my-topic")
  3. .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
  4. .sendTimeout(10, TimeUnit.SECONDS)
  5. .blockIfQueueFull(true)
  6. .create();

消息路由

当使用分区主题时,当你使用生产者发布消息时你可以指定路由模式。 有关使用 Java 客户端指定路由模式的更多内容, 请参见 分区主题 cookbook。

异步发送

你可以使用Java客户端异步发布消息。 使用异步发送,生产者将消息放入阻塞队列并立即返回。 然后,客户端将在后台将消息发送给broker。 如果队列已满(最大值可配置),则在调用API时,生产者可能会被阻塞或立即失败,具体取决于传递给生产者的参数。

以下是异步发送操作的示例:

  1. producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
  2. System.out.printf("Message with ID %s successfully sent", msgId);
  3. });

As you can see from the example above, async send operations return a MessageId wrapped in a CompletableFuture.

消息配置

除了value之外, 还可以在特定消息上设置其他选项:

  1. producer.newMessage()
  2. .key("my-message-key")
  3. .value("my-async-message".getBytes())
  4. .property("my-key", "my-value")
  5. .property("my-other-key", "my-other-value")
  6. .send();

对于前一种情况,也可以使用sendAsync()来终止构建器链,并获取future返回值。

Consumers

在Pulsar中,消费者订阅topic主题并处理生产者发布到这些主题的消息。 你可以首先实例化一个PulsarClient 对象并传递给他一个borker(如上所示) URL来实例化一个消费者

一旦实例化一个PulsarClient 对象,你可以指定一个主题和一个订阅来创建一个 Consumer 消费者。

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .subscribe();

subscribe 方法将自动将订阅消费者指定的主题和订阅。 一种让消费者监听主题的方法是使用while循环。 In this example loop, the consumer listens for messages, prints the contents of any message that’s received, and then acknowledges that the message has been processed. If the processing logic fails, we use negative acknowledgement to have the message redelivered at a later point in time.

  1. while (true) {
  2. // Wait for a message
  3. Message msg = consumer.receive();
  4. try {
  5. // Do something with the message
  6. System.out.printf("Message received: %s", new String(msg.getData()));
  7. // Acknowledge the message so that it can be deleted by the message broker
  8. consumer.acknowledge(msg);
  9. } catch (Exception e) {
  10. // Message failed to process, redeliver later
  11. consumer.negativeAcknowledge(msg);
  12. }
  13. }

消费者配置

如果实例化 消费者 对象, 仅指定主题和订阅名称, 如上面的示例所示, 消费者将采用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。 有关完整列表, 请参阅 ConsumerBuilder 类javadoc文档。 Here’s an example:

这是一个示例配置:

  1. Consumer consumer = client.newConsumer()
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .ackTimeout(10, TimeUnit.SECONDS)
  5. .subscriptionType(SubscriptionType.Exclusive)
  6. .subscribe();

异步接收

receive方法将异步接受消息(消费者处理器将被阻塞,直到有消息到达)。 你也可以使用异步接收方法,这将在一个新消息到达时立即返回一个CompletableFuture对象。

Here’s an example:

  1. CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

Async receive operations return a Message wrapped inside of a CompletableFuture.

多主题订阅

消费者除了订阅单个Pulsar主题外,你还可以使用多主题订阅订阅多个主题。 若要使用多主题订阅, 可以提供一个topic正则表达式 (regex) 或 主题List 。 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

下面是一些示例:

  1. import org.apache.pulsar.client.api.Consumer;
  2. import org.apache.pulsar.client.api.PulsarClient;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.regex.Pattern;
  6. ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
  7. .subscriptionName(subscription);
  8. // 订阅命名空间中的所有主题
  9. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
  10. Consumer allTopicsConsumer = consumerBuilder
  11. .topicsPattern(allTopicsInNamespace)
  12. .subscribe();
  13. // 使用regex订阅命名空间中的主题子集
  14. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
  15. Consumer allTopicsConsumer = consumerBuilder
  16. .topicsPattern(someTopicsInNamespace)
  17. .subscribe();

你还可以订阅明确的主题列表 (如果愿意, 可跨命名空间):

  1. List<String> topics = Arrays.asList(
  2. "topic-1",
  3. "topic-2",
  4. "topic-3"
  5. );
  6. Consumer multiTopicConsumer = consumerBuilder
  7. .topics(topics)
  8. .subscribe();
  9. // 或者:
  10. Consumer multiTopicConsumer = consumerBuilder
  11. .topics(
  12. "topic-1",
  13. "topic-2",
  14. "topic-3"
  15. )
  16. .subscribe();

You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. Here’s an example:

  1. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
  2. consumerBuilder
  3. .topics(topics)
  4. .subscribeAsync()
  5. .thenAccept(consumer -> {
  6. do {
  7. try {
  8. Message msg = consumer.receive();
  9. // Do something with the received message
  10. } catch (PulsarClientException e) {
  11. e.printStackTrace();
  12. }
  13. } while (true);
  14. });

Reader 接口

使用 reader 接口, Pulsar客户可以在主题中“手动定位”自己,从指定的消息开始向前读取所有消息。 The Pulsar API for Java enables you to create Reader objects by specifying a topic, a MessageId , and ReaderConfiguration .

Here’s an example:

  1. ReaderConfiguration conf = new ReaderConfiguration();
  2. byte[] msgIdBytes = // 一些消息ID 的字节数组
  3. MessageId id = MessageId.fromByteArray(msgIdBytes);
  4. Reader reader = pulsarClient.newReader()
  5. .topic(topic)
  6. .startMessageId(id)
  7. .create();
  8. while (true) {
  9. Message message = reader.readNext();
  10. // 处理消息
  11. }

在上面的示例中,实例化一个Reader对象对指定的主题和消息(ID); reader将遍历主题中msgIdBytes(取值方式取决于应用程序) 之后的消息。

上面的示例代码展示了Reader对象指向特定的消息(ID),但你也可以使用MessageId.earliest来指向topic上最早可用的消息,使用MessageId.latest指向最新的消息。

Schema

在Pulsar中,所有的消息数据都在字节数组中,消息schema允许在构造和处理消息时使用其他类型的数据(从简单类型(如字符串)到更复杂的特定应用程序的类型)。 如果在不指定schema的情况下构造 生产者,则生产者只能生成类型为 byte[]的消息。 Here’s an example:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .create();

上面的生产者相当于 Producer<byte[]> (实际上, 你应该 总是 显式指定类型)。 如果你想让产生者使用不同类型的数据,你需要指定一个schema来通知Pulsar 在topic上传输哪种类型的数据。

Schema实例

假设您有一个 SensorReading 类, 你想通过Pulsar主题进行传输:

  1. public class SensorReading {
  2. public float temperature;
  3. public SensorReading(float temperature) {
  4. this.temperature = temperature;
  5. }
  6. // A no-arg constructor is required
  7. public SensorReading() {
  8. }
  9. public float getTemperature() {
  10. return temperature;
  11. }
  12. public void setTemperature(float temperature) {
  13. this.temperature = temperature;
  14. }
  15. }

你可以创建一个Producer<SensorReading> (或Consumer<SensorReading>)像这样:

  1. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  2. .topic("sensor-readings")
  3. .create();

以下schema格式目前可用于 Java:

  • 无schema 或者字节数组schema(可以使用Schema.BYTES):
  1. Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
  2. .topic("some-raw-bytes-topic")
  3. .create();

Or, equivalently:

  1. Producer<byte[]> bytesProducer = client.newProducer()
  2. .topic("some-raw-bytes-topic")
  3. .create();
  • String for normal UTF-8-encoded string data. This schema can be applied using Schema.STRING:

    1. Producer<String> stringProducer = client.newProducer(Schema.STRING)
    2. .topic("some-string-topic")
    3. .create();
  • JSON schemas can be created for POJOs using the JSONSchema class. Here’s an example:

    1. Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
    2. Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
    3. .topic("some-pojo-topic")
    4. .create();

Authentication

Pulsar currently supports two authentication schemes: TLS and Athenz. The Pulsar Java client can be used with both.

TLS Authentication

To use TLS, you need to set TLS to true using the setUseTls method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.

Here’s an example configuration:

  1. Map<String, String> authParams = new HashMap<>();
  2. authParams.put("tlsCertFile", "/path/to/client-cert.pem");
  3. authParams.put("tlsKeyFile", "/path/to/client-key.pem");
  4. Authentication tlsAuth = AuthenticationFactory
  5. .create(AuthenticationTls.class.getName(), authParams);
  6. PulsarClient client = PulsarClient.builder()
  7. .serviceUrl("pulsar+ssl://my-broker.com:6651")
  8. .enableTls(true)
  9. .tlsTrustCertsFilePath("/path/to/cacert.pem")
  10. .authentication(tlsAuth)
  11. .build();

Athenz

要使用Athenz做为身份认证提供者,你需要use TLS并且在hash提供如下四个参数的值:

  • tenantDomain
  • tenantService
  • providerDomain
  • privateKey

You can also set an optional keyId. 这是一个示例配置:

  1. Map<String, String> authParams = new HashMap<>();
  2. authParams.put("tenantDomain", "shopping"); // Tenant domain name
  3. authParams.put("tenantService", "some_app"); // Tenant service name
  4. authParams.put("providerDomain", "pulsar"); // Provider domain name
  5. authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
  6. authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
  7. Authentication athenzAuth = AuthenticationFactory
  8. .create(AuthenticationAthenz.class.getName(), authParams);
  9. PulsarClient client = PulsarClient.builder()
  10. .serviceUrl("pulsar+ssl://my-broker.com:6651")
  11. .enableTls(true)
  12. .tlsTrustCertsFilePath("/path/to/cacert.pem")
  13. .authentication(athenzAuth)
  14. .build();

支持的格式:

privateKey参数支持如下三种格式: * file:///path/to/file * file:/path/to/file * data:application/x-pem-file;base64,<base64-encoded value>