The Pulsar Node.js client

Pulsar Node.js 客户端可用于在 Node.js 中创建 Pulsar producerconsumerreader

Node.js 客户端中 producerconsumerreader 中的所有方法都对线程可用。

对于1.3.0或者更高版本,TypeScript中定义的类型同样可以使用。

安装

You can install the pulsar-client library via npm.

要求

Pulsar Node.js client library is based on the C++ client library. Follow these instructions and install the Pulsar C++ client library.

兼容性

Compatibility between each version of the Node.js client and the C++ client is as follows:

Node.js 客户端C++ client
1.0.02.3.0 及以上
1.1.02.4.0 及以上
1.2.02.5.0 及以上

If an incompatible version of the C++ client is installed, you may fail to build or run this library.

使用 npm 安装

Install the pulsar-client library via npm:

  1. $ npm install pulsar-client

Note

因为这个库使用 node-addon-api 模块来包装 C++ 库,因此只能在 Node.js 10.x 或更高版本中运行。

连接 URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here is an example for localhost:

  1. pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

如果使用TLS加密TLS认证,URL地址如下:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

创建客户端

与Pulsar服务端交互之前,需要建立Pulsar客户端。 You can create a client instance using a new operator and the Client method, passing in a client options object (more on configuration below).

Here is an example:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. const client = new Pulsar.Client({
  4. serviceUrl: 'pulsar://localhost:6650',
  5. });
  6. await client.close();
  7. })();

客户端配置

The following configurable parameters are available for Pulsar clients:

参数名说明默认值
serviceUrlThe connection URL for the Pulsar cluster. 更多详细信息,参阅这里
authenticationConfigure the authentication provider. (default: no authentication). 更多详细信息,参阅 TLS 认证
operationTimeoutSecondsNode.js 客户端操作的超时时间(创建 producer、订阅/取消订阅 topic)。 未超时时会重试,超时后执行失败30
ioThreads用于处理 Pulsar broker 连接的线程数。1
messageListenerThreadsconsumerreaders 监听消息使用的线程数。1
concurrentLookupRequestThe number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. 只有当客户端需要生产及/或订阅数千 Pulsar topic 时,才需要修改预设的默认值:50000 。50000
tlsTrustCertsFilePath信任的 TLS 证书的文件路径。
tlsValidateHostname设置是否启用 TLS 主机名验证。false
tlsAllowInsecureConnection设置是否让 Pulsar 客户端从 broker 接受不信任的 TLS 证书。false
statsIntervalInSeconds两次信息统计之间的时间间隔。 当 statsInterval 为正数时启用信息统计。 最小值为 1 秒。600
log记录日志console.log

Producers

Pulsar producers publish messages to Pulsar topics. You can configure Node.js producers using a producer configuration object.

Here is an example:

  1. const producer = await client.createProducer({
  2. topic: 'my-topic',
  3. });
  4. await producer.send({
  5. data: Buffer.from("Hello, Pulsar"),
  6. });
  7. await producer.close();

Promise 操作

When you create a new Pulsar producer, the operation returns Promise object and get producer instance or an error through executor function.
In this example, using await operator instead of executor function.

Producer operations

Pulsar Node.js producers have the following methods available:

Method说明Return type
send(Object)Publishes a message to the producer’s topic. 当消息被broker成功确认或者出错抛出异常后,Promise对象运行执行器方法,该对象的返回值是消息ID。Promise<Object>
flush()将消息从发送队列发送到Pulsar broker。 当消息被broker成功确认或者出错抛出异常后,Promise对象运行执行器方法,该对象的返回值是消息ID。Promise<null>
close()Closes the producer and releases all resources allocated to it. close()被调用以后,不再接受来自publisher的消息。 调用后,返回Promise对象。 等所有待发布的请求被持久化到Pulsar后,会运行执行器函数。 如果出错,写操作不会重试。Promise<null>
getProducerName()获取producer名称的方法string
getTopic()获取topic名称的方法string

Producer 配置

参数名说明默认值
topicPulsar topic接收从producer发送过来的消息。 Topic格式是<topic-name>或者<tenant-name>/<namespace-name>/<topic-name>。 例如:sample/ns1/my-topic
producerNameA name for the producer. 如果topic名称没有指定,Pulsar会自动地生成全局唯一的名称。 当明确地给topic指定名称的时候,该名称需要在所有的Pulsar集群中唯一,否则会出错。
sendTimeoutMsProducer发送消息给topic后,会等待存放该消息的Pulsarbroker的确认信息。 如果在这个参数设定的阈值内没有确认消息,将抛出错误。 如果将 sendTimeoutMs 设为 -1,那么超时时间将为无穷大(因而移除超时限定)。 Removing the send timeout is recommended when using Pulsar’s message de-duplication feature.30000
initialSequenceId消息的初始序列号 Producer 在发送消息时,将序列号添加到消息中。 只要发送消息,序列号就会相应增加。
maxPendingMessagesThe maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). 默认情况下,当队列已满时,调用send 方法都会失败,除非blockIfQueueFull 设置为 true1000
maxPendingMessagesAcrossPartitions所有分区等待队列空间总和的最大值。50000
blockIfQueueFull如果参数值设置为 true,当消息传出队列已满时,producer 的 send 方法会进行等待,而不是抛出异常(该队列大小由 maxPendingMessages 参数决定);如果设置为 false (默认值),当队列已满时,不能调用 send,并抛出异常。false
messageRoutingMode消息路由逻辑(分区 topic 上的 producer 使用)。 This logic is applied only when no key is set on messages. 可以设置为循环(RoundRobinDistribution),或是发布所有消息到单个分区 (UseSinglePartition,默认值)。UseSinglePartition
hashingSchemeThe hashing function that determines the partition on which a particular message is published (partitioned topics only). 可以设置为 JavaStringHash(等同于 Java 中的 String.hashCode())、Murmur3_32Hash(应用了 Murmur3 散列函数)、或 BoostHash (应用了 C++ 中 Boost 库的散列函数)。BoostHash
compressionTypeThe message data compression type used by the producer. 可用的选项有:LZ4ZlibZSTD,和SNAPPYCompression None
batchingEnabled如果设置为 true,producer 将批量发送消息。true
batchingMaxPublishDelayMs批量发送消息的最大延迟。10
batchingMaxMessages批量发送消息时,消息大小的最大值。1000
propertiesProducer 的元数据。

生产者示例

This example creates a Node.js producer for the my-topic topic and sends 10 messages to that topic:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. });
  7. // Create a producer
  8. const producer = await client.createProducer({
  9. topic: 'my-topic',
  10. });
  11. // Send messages
  12. for (let i = 0; i < 10; i += 1) {
  13. const msg = `my-message-${i}`;
  14. producer.send({
  15. data: Buffer.from(msg),
  16. });
  17. console.log(`Sent message: ${msg}`);
  18. }
  19. await producer.flush();
  20. await producer.close();
  21. await client.close();
  22. })();

消费者

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Node.js consumers using a consumer configuration object.

Here is an example:

  1. const consumer = await client.subscribe({
  2. topic: 'my-topic',
  3. subscription: 'my-subscription',
  4. });
  5. const msg = await consumer.receive();
  6. console.log(msg.getData().toString());
  7. consumer.acknowledge(msg);
  8. await consumer.close();

Promise 操作

When you create a new Pulsar consumer, the operation returns Promise object and get consumer instance or an error through executor function.
In this example, using await operator instead of executor function.

消费者操作

Pulsar Node.js consumers have the following methods available:

Method说明Return type
receive()Receives a single message from the topic. 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。Promise<Object>
receive(Number)在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。Promise<Object>
acknowledge(Object)通过消息对象向 Pulsar broker 确认收到一条消息。void
acknowledgeId(Object)通过消息 ID 向 Pulsar broker 确认收到一条消息。void
acknowledgeCumulative(Object)确认收到流中到指定消息为止的所有消息,包括指定消息。 acknowledgeCumulative 方法返回 void,并向 broker 异步发送确认信息。 此后,这些消息不会重新发送给 consumer。 共享订阅类型不支持累积确认。void
acknowledgeCumulativeId(Object)确认收到流中到指定消息为止所有消息的 ID,包括指定消息的 ID。void
negativeAcknowledge(Message)通过消息对象向 Pulsar broker 确认一条消息有异常。void
negativeAcknowledgeId(MessageId)通过消息ID对象向 Pulsar broker 确认 一条消息有异常。void
close()关闭 consumer,使 consumer 不再从 broker 接收消息。Promise<null>
unsubscribe()取消订阅Promise<null>

Consumer configuration

参数名说明默认值
topic使用Pulsar topic的consumer可以订阅或者监听消息。
topicsTopics数组
topicsPatternTopics的正则表达式
subscriptionConsumer 的订阅名称。
subscriptionTypeAvailable options are Exclusive, Shared, Key_Shared, and Failover.Exclusive
subscriptionInitialPositionInitial position at which to set cursor when subscribing to a topic at first time.SubscriptionInitialPosition.Latest
ackTimeoutMs消息确认的超时时间(以毫秒为单位)。0
nAckRedeliverTimeoutMsDelay to wait before redelivering messages that failed to be processed.60000
receiverQueueSize设置 consumer 接收队列的大小,即在应用程序调用 receive 之前允许堆积的消息数。 A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization.1000
receiverQueueSizeAcrossPartitionsSet the max total receiver queue size across partitions. 当接收队列大小超过此值时,使用该参数减小每个分区接收队列的大小。50000
consumerNameConsumer 的名称。 当前(v2.4.1)版本中, 灾备模式按照 consumer 名称排序。
propertiesConsumer 的元数据。
listenerA listener that is called for a message received.
readCompactedIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
false

消费者示例

This example creates a Node.js consumer with the my-subscription subscription on the my-topic topic, receives messages, prints the content that arrive, and acknowledges each message to the Pulsar broker for 10 times:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. });
  7. // Create a consumer
  8. const consumer = await client.subscribe({
  9. topic: 'my-topic',
  10. subscription: 'my-subscription',
  11. subscriptionType: 'Exclusive',
  12. });
  13. // Receive messages
  14. for (let i = 0; i < 10; i += 1) {
  15. const msg = await consumer.receive();
  16. console.log(msg.getData().toString());
  17. consumer.acknowledge(msg);
  18. }
  19. await consumer.close();
  20. await client.close();
  21. })();

Instead a consumer can be created with listener to process messages.

  1. // Create a consumer
  2. const consumer = await client.subscribe({
  3. topic: 'my-topic',
  4. subscription: 'my-subscription',
  5. subscriptionType: 'Exclusive',
  6. listener: (msg, msgConsumer) => {
  7. console.log(msg.getData().toString());
  8. msgConsumer.acknowledge(msg);
  9. },
  10. })

Reader

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recently unacked message). You can configure Node.js readers using a reader configuration object.

Here is an example:

  1. const reader = await client.createReader({
  2. topic: 'my-topic',
  3. startMessageId: Pulsar.MessageId.earliest(),
  4. });
  5. const msg = await reader.readNext();
  6. console.log(msg.getData().toString());
  7. await reader.close();

Reader operations

Pulsar Node.js readers have the following methods available:

Method说明Return type
readNext()接收 topic 中的下一条消息(类似于 consumer 中的 receive 方法)。 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。Promise<Object>
readNext(Number)在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。Promise<Object>
hasNext()返回 broker 是否在目标 topic 中存有下一条消息。Boolean
close()关闭 reader,使 reader 不再从 broker 接收消息。Promise<null>

Reader configuration

参数名说明默认值
topic在Pulsartopic上,用户可以订阅或者监听消息。
startMessageIdReader 的初始位置,即 reader 处理的第一条消息的位置。 可以设置为 Pulsar.MessageId.earliest(topic 中最早的可用消息),或 Pulsar.MessageId.latest (topic 中最新的可用消息),或是任一其他消息 ID。
receiverQueueSize设置 reader 接收队列的大小,即在应用程序调用 readNext 之前允许堆积的消息数。 A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization.1000
readerNameReader 的名称。
subscriptionRolePrefixThe subscription role prefix.
readCompactedIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
false

Reader 示例

This example creates a Node.js reader with the my-topic topic, reads messages, and prints the content that arrive for 10 times:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. operationTimeoutSeconds: 30,
  7. });
  8. // Create a reader
  9. const reader = await client.createReader({
  10. topic: 'my-topic',
  11. startMessageId: Pulsar.MessageId.earliest(),
  12. });
  13. // read messages
  14. for (let i = 0; i < 10; i += 1) {
  15. const msg = await reader.readNext();
  16. console.log(msg.getData().toString());
  17. }
  18. await reader.close();
  19. await client.close();
  20. })();

消息

In Pulsar Node.js client, you have to construct producer message object for producer.

Here is an example message:

  1. const msg = {
  2. data: Buffer.from('Hello, Pulsar'),
  3. partitionKey: 'key1',
  4. properties: {
  5. 'foo': 'bar',
  6. },
  7. eventTimestamp: Date.now(),
  8. replicationClusters: [
  9. 'cluster1',
  10. 'cluster2',
  11. ],
  12. }
  13. await producer.send(msg);

The following keys are available for producer message objects:

参数名说明
data消息数据的实际有效负载。
properties用于存储应用程序指定消息附带的所有元数据。
eventTimestamp与消息相关联的时间戳。
sequenceId消息的序列号。
partitionKey与消息关联的可选键值(对于 topic 压缩等尤其有用)。
replicationClusters使用该参数后,消息会被复制到指定集群。 Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
deliverAt延迟消息在确定时间或该时间之后发送
deliverAfter延迟多久后发送消息

消息对象操作

In Pulsar Node.js client, you can receive (or read) message object as consumer (or reader).

The message object have the following methods available:

Method说明Return type
getTopicName()Topic 名称的 Getter 方法。String
getProperties()属性的 Getter 方法。数组<Object>
getData()消息数据的 Getter方法。Buffer
getMessageId()消息 id 对象的 Getter 方法。对象
getPublishTimestamp()发布时间戳的 Getter 方法。数值
getEventTimestamp()事件时间戳的 Getter 方法。数值
getRedeliveryCount()获取重新投递数量的Getter方法数值
getPartitionKey()分区键的 Getter 方法。String

消息 ID 对象操作

In Pulsar Node.js client, you can get message id object from message object.

The message id object have the following methods available:

Method说明Return type
serialize()序列化消息 id 到缓存中进行存储。Buffer
toString()获取消息 id 字符串。String

The client has static method of message id object. You can access it as Pulsar.MessageId.someStaticMethod too.

The following static methods are available for the message id object:

Method说明Return type
earliest()MessageId 表示存储在 topic 中最早/最旧的可用消息。对象
latest()MessageId 表示存储在 topic 中最晚/最新的可用消息。对象
deserialize(Buffer)从缓存中反序列化出消息 id。对象

端到端加密

End-to-end encryption allows applications to encrypt messages at producers and decrypt at consumers.

Configuration

如果想在Node.js客户端配置端到端加密功能,那么需要在producer端和consumer端都配置publicKeyPathprivateKeyPath

  1. publicKeyPath: "./public.pem"
  2. privateKeyPath: "./private.pem"

教程

本节提供详细步骤,说明在 Node.js客户端中如何使用端到端加密功能。

前提条件

  • Pulsar C++ 客户端版本为2.7.1或者更高

步骤

  1. 创建公钥和私钥密钥对。

    输入

    1. openssl genrsa -out private.pem 2048
    2. openssl rsa -in private.pem -pubout -out public.pem
  2. 创建一个生产者用来发送加密消息。

    输入

    1. const Pulsar = require('pulsar-client');
    2. (async () => {
    3. // Create a client
    4. const client = new Pulsar.Client({
    5. serviceUrl: 'pulsar://localhost:6650',
    6. operationTimeoutSeconds: 30,
    7. });
    8. // Create a producer
    9. const producer = await client.createProducer({
    10. topic: 'persistent://public/default/my-topic',
    11. sendTimeoutMs: 30000,
    12. batchingEnabled: true,
    13. publicKeyPath: "./public.pem",
    14. privateKeyPath: "./private.pem",
    15. encryptionKey: "encryption-key"
    16. });
    17. console.log(producer.ProducerConfig)
    18. // Send messages
    19. for (let i = 0; i < 10; i += 1) {
    20. const msg = `my-message-${i}`;
    21. producer.send({
    22. data: Buffer.from(msg),
    23. });
    24. console.log(`Sent message: ${msg}`);
    25. }
    26. await producer.flush();
    27. await producer.close();
    28. await client.close();
    29. })();
  3. 创建消费者接收加密消息。

    输入

    1. const Pulsar = require('pulsar-client');
    2. (async () => {
    3. // Create a client
    4. const client = new Pulsar.Client({
    5. serviceUrl: 'pulsar://172.25.0.3:6650',
    6. operationTimeoutSeconds: 30
    7. });
    8. // Create a consumer
    9. const consumer = await client.subscribe({
    10. topic: 'persistent://public/default/my-topic',
    11. subscription: 'sub1',
    12. subscriptionType: 'Shared',
    13. ackTimeoutMs: 10000,
    14. publicKeyPath: "./public.pem",
    15. privateKeyPath: "./private.pem"
    16. });
    17. console.log(consumer)
    18. // Receive messages
    19. for (let i = 0; i < 10; i += 1) {
    20. const msg = await consumer.receive();
    21. console.log(msg.getData().toString());
    22. consumer.acknowledge(msg);
    23. }
    24. await consumer.close();
    25. await client.close();
    26. })();
  4. 运行消费者接收加密消息。

    输入

    1. node consumer.js
  5. 在一个新的终端窗口中,运行生产者来生成加密消息。

    输入

    1. node producer.js

    现在你可以看到生产者发送消息,消费者成功收到消息。

    输出

    生产端:

    1. Sent message: my-message-0
    2. Sent message: my-message-1
    3. Sent message: my-message-2
    4. Sent message: my-message-3
    5. Sent message: my-message-4
    6. Sent message: my-message-5
    7. Sent message: my-message-6
    8. Sent message: my-message-7
    9. Sent message: my-message-8
    10. Sent message: my-message-9

    消费端:

    1. my-message-0
    2. my-message-1
    3. my-message-2
    4. my-message-3
    5. my-message-4
    6. my-message-5
    7. my-message-6
    8. my-message-7
    9. my-message-8
    10. my-message-9