The Pulsar Node.js client

Pulsar Node.js 客户端可用于在 Node.js 中创建 Pulsar producerconsumerreader

Node.js 客户端中 producerconsumerreader 中的所有方法都对线程可用。

For 1.3.0 or later versions, type definitions used in TypeScript are available.

安装

You can install the pulsar-client library via npm.

要求

Pulsar Node.js client library is based on the C++ client library. Follow these instructions and install the Pulsar C++ client library.

兼容性

Compatibility between each version of the Node.js client and the C++ client is as follows:

Node.js 客户端C++ client
1.0.02.3.0 及以上
1.1.02.4.0 or later
1.2.02.5.0 or later

If an incompatible version of the C++ client is installed, you may fail to build or run this library.

使用 npm 安装

Install the pulsar-client library via npm:

  1. $ npm install pulsar-client

Note

因为这个库使用 node-addon-api 模块来包装 C++ 库,因此只能在 Node.js 10.x 或更高版本中运行。

连接 URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here is an example for localhost:

  1. pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

If you are using TLS encryption or TLS Authentication, the URL looks like this:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

创建客户端

In order to interact with Pulsar, you first need a client object. You can create a client instance using a new operator and the Client method, passing in a client options object (more on configuration below).

Here is an example:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. const client = new Pulsar.Client({
  4. serviceUrl: 'pulsar://localhost:6650',
  5. });
  6. await client.close();
  7. })();

客户端配置

The following configurable parameters are available for Pulsar clients:

参数名说明默认值
serviceUrlThe connection URL for the Pulsar cluster. 更多详细信息,参阅这里
authenticationConfigure the authentication provider. (default: no authentication). 更多详细信息,参阅 TLS 认证
operationTimeoutSecondsNode.js 客户端操作的超时时间(创建 producer、订阅/取消订阅 topic)。 Retries occur until this threshold is reached, at which point the operation fails.30
ioThreads用于处理 Pulsar broker 连接的线程数。1
messageListenerThreadsconsumerreaders 监听消息使用的线程数。1
concurrentLookupRequestThe number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. 只有当客户端需要生产及/或订阅数千 Pulsar topic 时,才需要修改预设的默认值:50000 。50000
tlsTrustCertsFilePath信任的 TLS 证书的文件路径。
tlsValidateHostname设置是否启用 TLS 主机名验证。false
tlsAllowInsecureConnection设置是否让 Pulsar 客户端从 broker 接受不信任的 TLS 证书。false
statsIntervalInSeconds两次信息统计之间的时间间隔。 当 statsInterval 为正数时启用信息统计。 最小值为 1 秒。600
logA function that is used for logging.console.log

Producers

Pulsar producers publish messages to Pulsar topics. You can configure Node.js producers using a producer configuration object.

Here is an example:

  1. const producer = await client.createProducer({
  2. topic: 'my-topic',
  3. });
  4. await producer.send({
  5. data: Buffer.from("Hello, Pulsar"),
  6. });
  7. await producer.close();

Promise 操作

When you create a new Pulsar producer, the operation returns Promise object and get producer instance or an error through executor function.
In this example, using await operator instead of executor function.

Producer operations

Pulsar Node.js producers have the following methods available:

Method说明Return type
send(Object)Publishes a message to the producer’s topic. When the message is successfully acknowledged by the Pulsar broker, or an error is thrown, the Promise object whose result is the message ID runs executor function.Promise<Object>
flush()从发送队列发送消息给 Pulsar broker。 When the message is successfully acknowledged by the Pulsar broker, or an error is thrown, the Promise object runs executor function.Promise<null>
close()Closes the producer and releases all resources allocated to it. Once close() is called, no more messages are accepted from the publisher. This method returns a Promise object. It runs the executor function when all pending publish requests are persisted by Pulsar. If an error is thrown, no pending writes are retried.Promise<null>
getProducerName()Getter method of the producer name.string
getTopic()Getter method of the name of the topic.string

Producer 配置

参数名说明默认值
topicThe Pulsar topic to which the producer publishes messages.
producerNameA name for the producer. If you do not explicitly assign a name, Pulsar automatically generates a globally unique name. If you choose to explicitly assign a name, it needs to be unique across all Pulsar clusters, otherwise the creation operation throws an error.
sendTimeoutMsWhen publishing a message to a topic, the producer waits for an acknowledgment from the responsible Pulsar broker. If a message is not acknowledged within the threshold set by this parameter, an error is thrown. If you set sendTimeoutMs to -1, the timeout is set to infinity (and thus removed). Removing the send timeout is recommended when using Pulsar’s message de-duplication feature.30000
initialSequenceId消息的初始序列号 Producer 在发送消息时,将序列号添加到消息中。 只要发送消息,序列号就会相应增加。
maxPendingMessagesThe maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). By default, when the queue is full all calls to the send method fails unless blockIfQueueFull is set to true.1000
maxPendingMessagesAcrossPartitions所有分区等待队列空间总和的最大值。50000
blockIfQueueFullIf set to true, the producer’s send method waits when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the maxPendingMessages parameter); if set to false (the default), send operations fails and throw a error when the queue is full.false
messageRoutingMode消息路由逻辑(分区 topic 上的 producer 使用)。 This logic is applied only when no key is set on messages. 可以设置为循环(RoundRobinDistribution),或是发布所有消息到单个分区 (UseSinglePartition,默认值)。UseSinglePartition
hashingSchemeThe hashing function that determines the partition on which a particular message is published (partitioned topics only). 可以设置为 JavaStringHash(等同于 Java 中的 String.hashCode())、Murmur3_32Hash(应用了 Murmur3 散列函数)、或 BoostHash (应用了 C++ 中 Boost 库的散列函数)。BoostHash
compressionTypeThe message data compression type used by the producer. The available options are LZ4, and Zlib, ZSTD, SNAPPY.Compression None
batchingEnabled如果设置为 true,producer 将批量发送消息。true
batchingMaxPublishDelayMs批量发送消息的最大延迟。10
batchingMaxMessages批量发送消息时,消息大小的最大值。1000
propertiesProducer 的元数据。

生产者示例

This example creates a Node.js producer for the my-topic topic and sends 10 messages to that topic:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. });
  7. // Create a producer
  8. const producer = await client.createProducer({
  9. topic: 'my-topic',
  10. });
  11. // Send messages
  12. for (let i = 0; i < 10; i += 1) {
  13. const msg = `my-message-${i}`;
  14. producer.send({
  15. data: Buffer.from(msg),
  16. });
  17. console.log(`Sent message: ${msg}`);
  18. }
  19. await producer.flush();
  20. await producer.close();
  21. await client.close();
  22. })();

Consumers

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Node.js consumers using a consumer configuration object.

Here is an example:

  1. const consumer = await client.subscribe({
  2. topic: 'my-topic',
  3. subscription: 'my-subscription',
  4. });
  5. const msg = await consumer.receive();
  6. console.log(msg.getData().toString());
  7. consumer.acknowledge(msg);
  8. await consumer.close();

Promise 操作

When you create a new Pulsar consumer, the operation returns Promise object and get consumer instance or an error through executor function.
In this example, using await operator instead of executor function.

消费者操作

Pulsar Node.js consumers have the following methods available:

Method说明Return type
receive()Receives a single message from the topic. 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。Promise<Object>
receive(Number)在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。Promise<Object>
acknowledge(Object)通过消息对象向 Pulsar broker 确认收到一条消息。void
acknowledgeId(Object)通过消息 ID 向 Pulsar broker 确认收到一条消息。void
acknowledgeCumulative(Object)确认收到流中到指定消息为止的所有消息,包括指定消息。 The acknowledgeCumulative method returns void, and send the ack to the broker asynchronously. After that, the messages are not redelivered to the consumer. 共享订阅类型不支持累积确认。void
acknowledgeCumulativeId(Object)确认收到流中到指定消息为止所有消息的 ID,包括指定消息的 ID。void
negativeAcknowledge(Message)Negatively acknowledges a message to the Pulsar broker by message object.void
negativeAcknowledgeId(MessageId)Negatively acknowledges a message to the Pulsar broker by message ID object.void
close()关闭 consumer,使 consumer 不再从 broker 接收消息。Promise<null>
unsubscribe()Unsubscribes the subscription.Promise<null>

Consumer configuration

参数名说明默认值
topicThe Pulsar topic on which the consumer establishes a subscription and listen for messages.
topicsThe array of topics.
topicsPatternThe regular expression for topics.
subscriptionConsumer 的订阅名称。
subscriptionTypeAvailable options are Exclusive, Shared, Key_Shared, and Failover.Exclusive
subscriptionInitialPositionInitial position at which to set cursor when subscribing to a topic at first time.SubscriptionInitialPosition.Latest
ackTimeoutMs消息确认的超时时间(以毫秒为单位)。0
nAckRedeliverTimeoutMsDelay to wait before redelivering messages that failed to be processed.60000
receiverQueueSize设置 consumer 接收队列的大小,即在应用程序调用 receive 之前允许堆积的消息数。 A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization.1000
receiverQueueSizeAcrossPartitionsSet the max total receiver queue size across partitions. This setting is used to reduce the receiver queue size for individual partitions if the total exceeds this value.50000
consumerNameConsumer 的名称。 当前(v2.4.1)版本中, 灾备模式按照 consumer 名称排序。
propertiesConsumer 的元数据。
listenerA listener that is called for a message received.
readCompactedIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
false

消费者示例

This example creates a Node.js consumer with the my-subscription subscription on the my-topic topic, receives messages, prints the content that arrive, and acknowledges each message to the Pulsar broker for 10 times:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. });
  7. // Create a consumer
  8. const consumer = await client.subscribe({
  9. topic: 'my-topic',
  10. subscription: 'my-subscription',
  11. subscriptionType: 'Exclusive',
  12. });
  13. // Receive messages
  14. for (let i = 0; i < 10; i += 1) {
  15. const msg = await consumer.receive();
  16. console.log(msg.getData().toString());
  17. consumer.acknowledge(msg);
  18. }
  19. await consumer.close();
  20. await client.close();
  21. })();

Instead a consumer can be created with listener to process messages.

  1. // Create a consumer
  2. const consumer = await client.subscribe({
  3. topic: 'my-topic',
  4. subscription: 'my-subscription',
  5. subscriptionType: 'Exclusive',
  6. listener: (msg, msgConsumer) => {
  7. console.log(msg.getData().toString());
  8. msgConsumer.acknowledge(msg);
  9. },
  10. });

Reader

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recently unacked message). You can configure Node.js readers using a reader configuration object.

Here is an example:

  1. const reader = await client.createReader({
  2. topic: 'my-topic',
  3. startMessageId: Pulsar.MessageId.earliest(),
  4. });
  5. const msg = await reader.readNext();
  6. console.log(msg.getData().toString());
  7. await reader.close();

Reader operations

Pulsar Node.js readers have the following methods available:

Method说明Return type
readNext()接收 topic 中的下一条消息(类似于 consumer 中的 receive 方法)。 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。Promise<Object>
readNext(Number)在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。Promise<Object>
hasNext()返回 broker 是否在目标 topic 中存有下一条消息。Boolean
close()关闭 reader,使 reader 不再从 broker 接收消息。Promise<null>

Reader configuration

参数名说明默认值
topicThe Pulsar topic on which the reader establishes a subscription and listen for messages.
startMessageIdReader 的初始位置,即 reader 处理的第一条消息的位置。 可以设置为 Pulsar.MessageId.earliest(topic 中最早的可用消息),或 Pulsar.MessageId.latest (topic 中最新的可用消息),或是任一其他消息 ID。
receiverQueueSize设置 reader 接收队列的大小,即在应用程序调用 readNext 之前允许堆积的消息数。 A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization.1000
readerNameReader 的名称。
subscriptionRolePrefixThe subscription role prefix.
readCompactedIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
false

Reader 示例

This example creates a Node.js reader with the my-topic topic, reads messages, and prints the content that arrive for 10 times:

  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. operationTimeoutSeconds: 30,
  7. });
  8. // Create a reader
  9. const reader = await client.createReader({
  10. topic: 'my-topic',
  11. startMessageId: Pulsar.MessageId.earliest(),
  12. });
  13. // read messages
  14. for (let i = 0; i < 10; i += 1) {
  15. const msg = await reader.readNext();
  16. console.log(msg.getData().toString());
  17. }
  18. await reader.close();
  19. await client.close();
  20. })();

消息

In Pulsar Node.js client, you have to construct producer message object for producer.

Here is an example message:

  1. const msg = {
  2. data: Buffer.from('Hello, Pulsar'),
  3. partitionKey: 'key1',
  4. properties: {
  5. 'foo': 'bar',
  6. },
  7. eventTimestamp: Date.now(),
  8. replicationClusters: [
  9. 'cluster1',
  10. 'cluster2',
  11. ],
  12. }
  13. await producer.send(msg);

The following keys are available for producer message objects:

参数名说明
data消息数据的实际有效负载。
properties用于存储应用程序指定消息附带的所有元数据。
eventTimestamp与消息相关联的时间戳。
sequenceId消息的序列号。
partitionKey与消息关联的可选键值(对于 topic 压缩等尤其有用)。
replicationClustersThe clusters to which this message is replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
deliverAtThe absolute timestamp at or after which the message is delivered.
deliverAfterThe relative delay after which the message is delivered.

消息对象操作

In Pulsar Node.js client, you can receive (or read) message object as consumer (or reader).

The message object have the following methods available:

Method说明Return type
getTopicName()Topic 名称的 Getter 方法。String
getProperties()属性的 Getter 方法。数组<Object>
getData()消息数据的 Getter方法。Buffer
getMessageId()消息 id 对象的 Getter 方法。对象
getPublishTimestamp()发布时间戳的 Getter 方法。数值
getEventTimestamp()事件时间戳的 Getter 方法。数值
getRedeliveryCount()Getter method of redelivery count.数值
getPartitionKey()分区键的 Getter 方法。String

消息 ID 对象操作

In Pulsar Node.js client, you can get message id object from message object.

The message id object have the following methods available:

Method说明Return type
serialize()序列化消息 id 到缓存中进行存储。Buffer
toString()获取消息 id 字符串。String

The client has static method of message id object. You can access it as Pulsar.MessageId.someStaticMethod too.

The following static methods are available for the message id object:

Method说明Return type
earliest()MessageId 表示存储在 topic 中最早/最旧的可用消息。对象
latest()MessageId 表示存储在 topic 中最晚/最新的可用消息。对象
deserialize(Buffer)从缓存中反序列化出消息 id。对象

End-to-end encryption

End-to-end encryption allows applications to encrypt messages at producers and decrypt at consumers.

Configuration

If you want to use the end-to-end encryption feature in the Node.js client, you need to configure publicKeyPath and privateKeyPath for both producer and consumer.

  1. publicKeyPath: "./public.pem"
  2. privateKeyPath: "./private.pem"

教程

This section provides step-by-step instructions on how to use the end-to-end encryption feature in the Node.js client.

前提条件

  • Pulsar C++ client 2.7.1 or later

步骤

  1. Create both public and private key pairs.

    Input

    1. openssl genrsa -out private.pem 2048
    2. openssl rsa -in private.pem -pubout -out public.pem
  2. Create a producer to send encrypted messages.

    Input

    1. const Pulsar = require('pulsar-client');
    2. (async () => {
    3. // Create a client
    4. const client = new Pulsar.Client({
    5. serviceUrl: 'pulsar://localhost:6650',
    6. operationTimeoutSeconds: 30,
    7. });
    8. // Create a producer
    9. const producer = await client.createProducer({
    10. topic: 'persistent://public/default/my-topic',
    11. sendTimeoutMs: 30000,
    12. batchingEnabled: true,
    13. publicKeyPath: "./public.pem",
    14. privateKeyPath: "./private.pem",
    15. encryptionKey: "encryption-key"
    16. });
    17. console.log(producer.ProducerConfig)
    18. // Send messages
    19. for (let i = 0; i < 10; i += 1) {
    20. const msg = `my-message-${i}`;
    21. producer.send({
    22. data: Buffer.from(msg),
    23. });
    24. console.log(`Sent message: ${msg}`);
    25. }
    26. await producer.flush();
    27. await producer.close();
    28. await client.close();
    29. })();
  3. Create a consumer to receive encrypted messages.

    Input

    1. const Pulsar = require('pulsar-client');
    2. (async () => {
    3. // Create a client
    4. const client = new Pulsar.Client({
    5. serviceUrl: 'pulsar://172.25.0.3:6650',
    6. operationTimeoutSeconds: 30
    7. });
    8. // Create a consumer
    9. const consumer = await client.subscribe({
    10. topic: 'persistent://public/default/my-topic',
    11. subscription: 'sub1',
    12. subscriptionType: 'Shared',
    13. ackTimeoutMs: 10000,
    14. publicKeyPath: "./public.pem",
    15. privateKeyPath: "./private.pem"
    16. });
    17. console.log(consumer)
    18. // Receive messages
    19. for (let i = 0; i < 10; i += 1) {
    20. const msg = await consumer.receive();
    21. console.log(msg.getData().toString());
    22. consumer.acknowledge(msg);
    23. }
    24. await consumer.close();
    25. await client.close();
    26. })();
  4. Run the consumer to receive encrypted messages.

    Input

    1. node consumer.js
  5. In a new terminal tab, run the producer to produce encrypted messages.

    Input

    1. node producer.js

    Now you can see the producer sends messages and the consumer receives messages successfully.

    输出

    This is from the producer side.

    1. Sent message: my-message-0
    2. Sent message: my-message-1
    3. Sent message: my-message-2
    4. Sent message: my-message-3
    5. Sent message: my-message-4
    6. Sent message: my-message-5
    7. Sent message: my-message-6
    8. Sent message: my-message-7
    9. Sent message: my-message-8
    10. Sent message: my-message-9

    This is from the consumer side.

    1. my-message-0
    2. my-message-1
    3. my-message-2
    4. my-message-3
    5. my-message-4
    6. my-message-5
    7. my-message-6
    8. my-message-7
    9. my-message-8
    10. my-message-9