Pulsar WebSocket API

Pulsar WebSocket API provides a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSocket, you can publish and consume messages and use features available on the Client Features Matrix page.

You can use Pulsar WebSocket API with any WebSocket client library. See examples for Python and Node.js below.

Running the WebSocket service

The standalone variant of Pulsar that we recommend using for local development already has the WebSocket service enabled.

In non-standalone mode, there are two ways to deploy the WebSocket service:

Embedded with a Pulsar broker

In this mode, the WebSocket service will run within the same HTTP service that’s already running in the broker. To enable this mode, set the webSocketServiceEnabled parameter in the conf/broker.conf configuration file in your installation.

  1. webSocketServiceEnabled=true

As a separate component

In this mode, the WebSocket service will be run from a Pulsar broker as a separate service. Configuration for this mode is handled in the conf/websocket.conf configuration file. You’ll need to set at least the following parameters:

Here’s an example:

  1. configurationMetadataStoreUrl=zk1:2181,zk2:2181,zk3:2181
  2. webServicePort=8080
  3. clusterName=my-cluster

Security settings

To enable TLS encryption on WebSocket service:

  1. tlsEnabled=true
  2. tlsAllowInsecureConnection=false
  3. tlsCertificateFilePath=/path/to/client-websocket.cert.pem
  4. tlsKeyFilePath=/path/to/client-websocket.key-pk8.pem
  5. tlsTrustCertsFilePath=/path/to/ca.cert.pem

Starting the broker

When the configuration is set, you can start the service using the pulsar-daemon tool:

  1. $ bin/pulsar-daemon start websocket

API Reference

Pulsar’s WebSocket API offers three endpoints for producing messages, consuming messages and reading messages.

All exchanges via the WebSocket API use JSON.

Authentication

Browser javascript WebSocket client

Use the query param token transport the authentication token.

  1. ws://broker-service-url:8080/path?token=token

Producer endpoint

The producer endpoint requires you to specify a tenant, namespace, and topic in the URL:

  1. ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
Query param
KeyTypeRequired?Explanation
sendTimeoutMillislongnoSend timeout (default: 30 secs)
batchingEnabledbooleannoEnable batching of messages (default: false)
batchingMaxMessagesintnoMaximum number of messages permitted in a batch (default: 1000)
maxPendingMessagesintnoSet the max size of the internal-queue holding the messages (default: 1000)
batchingMaxPublishDelaylongnoTime period within which the messages will be batched (default: 10ms)
messageRoutingModestringnoMessage routing mode for the partitioned producer: SinglePartition, RoundRobinPartition
compressionTypestringnoCompression type: LZ4, ZLIB
producerNamestringnoSpecify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic
initialSequenceIdlongnoSet the baseline for the sequence ids for messages published by the producer.
hashingSchemestringnoHashing function to use when publishing on a partitioned topic: JavaStringHash, Murmur3_32Hash
tokenstringnoAuthentication token, this is used for the browser javascript client

Publishing a message

  1. {
  2. "payload": "SGVsbG8gV29ybGQ=",
  3. "properties": {"key1": "value1", "key2": "value2"},
  4. "context": "1"
  5. }
KeyTypeRequired?Explanation
payloadstringyesBase-64 encoded payload
propertieskey-value pairsnoApplication-defined properties
contextstringnoApplication-defined request identifier
keystringnoFor partitioned topics, decides which partition to use
replicationClustersarraynoRestrict replication to this list of clusters, specified by name
Example success response
  1. {
  2. "result": "ok",
  3. "messageId": "CAAQAw==",
  4. "context": "1"
  5. }
Example failure response
  1. {
  2. "result": "send-error:3",
  3. "errorMsg": "Failed to de-serialize from JSON",
  4. "context": "1"
  5. }
KeyTypeRequired?Explanation
resultstringyesok if successful or an error message if unsuccessful
messageIdstringyesMessage ID assigned to the published message
contextstringnoApplication-defined request identifier

Consumer endpoint

The consumer endpoint requires you to specify a tenant, namespace, and topic, as well as a subscription, in the URL:

  1. ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
Query param
KeyTypeRequired?Explanation
ackTimeoutMillislongnoSet the timeout for unacked messages (default: 0)
subscriptionTypestringnoSubscription type: Exclusive, Failover, Shared, Key_Shared
receiverQueueSizeintnoSize of the consumer receive queue (default: 1000)
consumerNamestringnoConsumer name
priorityLevelintnoDefine a priority for the consumer
maxRedeliverCountintnoDefine a maxRedeliverCount for the consumer (default: 0). Activates Dead Letter Topic feature.
deadLetterTopicstringnoDefine a deadLetterTopic for the consumer (default: {topic}-{subscription}-DLQ). Activates Dead Letter Topic feature.
pullModebooleannoEnable pull mode (default: false). See “Flow Control” below.
negativeAckRedeliveryDelayintnoWhen a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). The default value is 60000.
tokenstringnoAuthentication token, this is used for the browser javascript client

NB: these parameter (except pullMode) apply to the internal consumer of the WebSocket service. So messages will be subject to the redelivery settings as soon as the get into the receive queue, even if the client doesn’t consume on the WebSocket.

Receiving messages

Server will push messages on the WebSocket session:

  1. {
  2. "messageId": "CAMQADAA",
  3. "payload": "hvXcJvHW7kOSrUn17P2q71RA5SdiXwZBqw==",
  4. "properties": {},
  5. "publishTime": "2021-10-29T16:01:38.967-07:00",
  6. "redeliveryCount": 0,
  7. "encryptionContext": {
  8. "keys": {
  9. "client-rsa.pem": {
  10. "keyValue": "jEuwS+PeUzmCo7IfLNxqoj4h7txbLjCQjkwpaw5AWJfZ2xoIdMkOuWDkOsqgFmWwxiecakS6GOZHs94x3sxzKHQx9Oe1jpwBg2e7L4fd26pp+WmAiLm/ArZJo6JotTeFSvKO3u/yQtGTZojDDQxiqFOQ1ZbMdtMZA8DpSMuq+Zx7PqLo43UdW1+krjQfE5WD+y+qE3LJQfwyVDnXxoRtqWLpVsAROlN2LxaMbaftv5HckoejJoB4xpf/dPOUqhnRstwQHf6klKT5iNhjsY4usACt78uILT0pEPd14h8wEBidBz/vAlC/zVMEqiDVzgNS7dqEYS4iHbf7cnWVCn3Hxw==",
  11. "metadata": {}
  12. }
  13. },
  14. "param": "Tfu1PxVm6S9D3+Hk",
  15. "compressionType": "NONE",
  16. "uncompressedMessageSize": 0,
  17. "batchSize": {
  18. "empty": false,
  19. "present": true
  20. }
  21. }
  22. }

Below are the parameters in the WebSocket consumer response.

  • General parameters

    KeyTypeRequired?Explanation
    messageIdstringyesMessage ID
    payloadstringyesBase-64 encoded payload
    publishTimestringyesPublish timestamp
    redeliveryCountnumberyesNumber of times this message was already delivered
    propertieskey-value pairsnoApplication-defined properties
    keystringnoOriginal routing key set by producer
    encryptionContextEncryptionContextnoEncryption context that consumers can use to decrypt received messages
    paramstringnoInitialization vector for cipher (Base64 encoding)
    batchSizestringnoNumber of entries in a message (if it is a batch message)
    uncompressedMessageSizestringnoMessage size before compression
    compressionTypestringnoAlgorithm used to compress the message payload
  • encryptionContext related parameter

    KeyTypeRequired?Explanation
    keyskey-EncryptionKey pairsyesKey in key-EncryptionKey pairs is an encryption key name. Value in key-EncryptionKey pairs is an encryption key object.
  • encryptionKey related parameters

    KeyTypeRequired?Explanation
    keyValuestringyesEncryption key (Base64 encoding)
    metadatakey-value pairsnoApplication-defined metadata

Acknowledging the message

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.

  1. {
  2. "messageId": "CAAQAw=="
  3. }
KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Negatively acknowledging messages

  1. {
  2. "type": "negativeAcknowledge",
  3. "messageId": "CAAQAw=="
  4. }
KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Flow control

Push Mode

By default (pullMode=false), the consumer endpoint will use the receiverQueueSize parameter both to size its internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client. In this mode, if you don’t send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching receiverQueueSize unacked messages sent to the WebSocket client.

Pull Mode

If you set pullMode to true, the WebSocket client will need to send permit commands to permit the Pulsar WebSocket service to send more messages.

  1. {
  2. "type": "permit",
  3. "permitMessages": 100
  4. }
KeyTypeRequired?Explanation
typestringyesType of command. Must be permit
permitMessagesintyesNumber of messages to permit

NB: in this mode it’s possible to acknowledge messages in a different connection.

Check if reach end of topic

Consumer can check if it has reached end of topic by sending isEndOfTopic request.

Request

  1. {
  2. "type": "isEndOfTopic"
  3. }
KeyTypeRequired?Explanation
typestringyesType of command. Must be isEndOfTopic

Response

  1. {
  2. "endOfTopic": "true/false"
  3. }

Reader endpoint

The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:

  1. ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
Query param
KeyTypeRequired?Explanation
readerNamestringnoReader name
receiverQueueSizeintnoSize of the consumer receive queue (default: 1000)
messageIdint or enumnoMessage ID to start from, earliest or latest (default: latest)
tokenstringnoAuthentication token, this is used for the browser javascript client
Receiving messages

Server will push messages on the WebSocket session:

  1. {
  2. "messageId": "CAAQAw==",
  3. "payload": "SGVsbG8gV29ybGQ=",
  4. "properties": {"key1": "value1", "key2": "value2"},
  5. "publishTime": "2016-08-30 16:45:57.785",
  6. "redeliveryCount": 4
  7. }
KeyTypeRequired?Explanation
messageIdstringyesMessage ID
payloadstringyesBase-64 encoded payload
publishTimestringyesPublish timestamp
redeliveryCountnumberyesNumber of times this message was already delivered
propertieskey-value pairsnoApplication-defined properties
keystringnoOriginal routing key set by producer

Acknowledging the message

In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don’t send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.

  1. {
  2. "messageId": "CAAQAw=="
  3. }
KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Check if reach end of topic

Consumer can check if it has reached end of topic by sending isEndOfTopic request.

Request

  1. {
  2. "type": "isEndOfTopic"
  3. }
KeyTypeRequired?Explanation
typestringyesType of command. Must be isEndOfTopic

Response

  1. {
  2. "endOfTopic": "true/false"
  3. }

Error codes

In case of error the server will close the WebSocket session using the following error codes:

Error CodeError Message
1Failed to create producer
2Failed to subscribe
3Failed to deserialize from JSON
4Failed to serialize to JSON
5Failed to authenticate client
6Client is not authorized
7Invalid payload encoding
8Unknown error

The application is responsible for re-establishing a new WebSocket session after a backoff period.

Client examples

Below you’ll find code examples for the Pulsar WebSocket API in Python and Node.js.

Python

This example uses the websocket-client package. You can install it using pip:

  1. $ pip install websocket-client

You can also download it from PyPI.

Python producer

Here’s an example Python producer that sends a simple message to a Pulsar topic:

  1. import websocket, base64, json
  2. # If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. enable_TLS = False
  4. scheme = 'ws'
  5. if enable_TLS:
  6. scheme = 'wss'
  7. TOPIC = scheme + '://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
  8. ws = websocket.create_connection(TOPIC)
  9. # encode message
  10. s = "Hello World"
  11. firstEncoded = s.encode("UTF-8")
  12. binaryEncoded = base64.b64encode(firstEncoded)
  13. payloadString = binaryEncoded.decode('UTF-8')
  14. # Send one message as JSON
  15. ws.send(json.dumps({
  16. 'payload' : payloadString,
  17. 'properties': {
  18. 'key1' : 'value1',
  19. 'key2' : 'value2'
  20. },
  21. 'context' : 5
  22. }))
  23. response = json.loads(ws.recv())
  24. if response['result'] == 'ok':
  25. print( 'Message published successfully')
  26. else:
  27. print('Failed to publish message:', response)
  28. ws.close()

Python consumer

Here’s an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:

  1. import websocket, base64, json
  2. # If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. enable_TLS = False
  4. scheme = 'ws'
  5. if enable_TLS:
  6. scheme = 'wss'
  7. TOPIC = scheme + '://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
  8. ws = websocket.create_connection(TOPIC)
  9. while True:
  10. msg = json.loads(ws.recv())
  11. if not msg: break
  12. print( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
  13. # Acknowledge successful processing
  14. ws.send(json.dumps({'messageId' : msg['messageId']}))
  15. ws.close()

Python reader

Here’s an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:

  1. import websocket, base64, json
  2. # If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. enable_TLS = False
  4. scheme = 'ws'
  5. if enable_TLS:
  6. scheme = 'wss'
  7. TOPIC = scheme + '://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
  8. ws = websocket.create_connection(TOPIC)
  9. while True:
  10. msg = json.loads(ws.recv())
  11. if not msg: break
  12. print ( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
  13. # Acknowledge successful processing
  14. ws.send(json.dumps({'messageId' : msg['messageId']}))
  15. ws.close()

Node.js

This example uses the ws package. You can install it using npm:

  1. $ npm install ws

Node.js producer

Here’s an example Node.js producer that sends a simple message to a Pulsar topic:

  1. const WebSocket = require('ws');
  2. // If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. const enableTLS = false;
  4. const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/producer/persistent/public/default/my-topic`;
  5. const ws = new WebSocket(topic);
  6. var message = {
  7. "payload" : new Buffer("Hello World").toString('base64'),
  8. "properties": {
  9. "key1" : "value1",
  10. "key2" : "value2"
  11. },
  12. "context" : "1"
  13. };
  14. ws.on('open', function() {
  15. // Send one message
  16. ws.send(JSON.stringify(message));
  17. });
  18. ws.on('message', function(message) {
  19. console.log('received ack: %s', message);
  20. });

Node.js consumer

Here’s an example Node.js consumer that listens on the same topic used by the producer above:

  1. const WebSocket = require('ws');
  2. // If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. const enableTLS = false;
  4. const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub`;
  5. const ws = new WebSocket(topic);
  6. ws.on('message', function(message) {
  7. var receiveMsg = JSON.parse(message);
  8. console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
  9. var ackMsg = {"messageId" : receiveMsg.messageId};
  10. ws.send(JSON.stringify(ackMsg));
  11. });

NodeJS reader

  1. const WebSocket = require('ws');
  2. // If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  3. const enableTLS = false;
  4. const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/reader/persistent/public/default/my-topic`;
  5. const ws = new WebSocket(topic);
  6. ws.on('message', function(message) {
  7. var receiveMsg = JSON.parse(message);
  8. console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
  9. var ackMsg = {"messageId" : receiveMsg.messageId};
  10. ws.send(JSON.stringify(ackMsg));
  11. });