Pulsar

Detailed documentation on the Pulsar pubsub component

Component format

To set up Apache Pulsar pub/sub, create a component of type pubsub.pulsar. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

For more information on Apache Pulsar, read the official docs.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pulsar-pubsub
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: enableTLS
  12. value: "false"
  13. - name: tenant
  14. value: "public"
  15. - name: token
  16. value: "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
  17. - name: consumerID
  18. value: "channel1"
  19. - name: namespace
  20. value: "default"
  21. - name: persistent
  22. value: "true"
  23. - name: disableBatching
  24. value: "false"
  25. - name: <topic-name>.jsonschema # sets a json schema validation for the configured topic
  26. value: |
  27. {
  28. "type": "record",
  29. "name": "Example",
  30. "namespace": "test",
  31. "fields": [
  32. {"name": "ID","type": "int"},
  33. {"name": "Name","type": "string"}
  34. ]
  35. }
  36. - name: <topic-name>.avroschema # sets an avro schema validation for the configured topic
  37. value: |
  38. {
  39. "type": "record",
  40. "name": "Example",
  41. "namespace": "test",
  42. "fields": [
  43. {"name": "ID","type": "int"},
  44. {"name": "Name","type": "string"}
  45. ]
  46. }

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets. This component supports storing the token parameter and any other sensitive parameter and data as Kubernetes Secrets.

Spec metadata fields

FieldRequiredDetailsExample
hostYAddress of the Pulsar broker. Default is “localhost:6650”“localhost:6650” OR http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080
enableTLSNEnable TLS. Default: “false”“true”, “false”
tenantNThe topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: “public”“public”
consumerIDNUsed to set the subscription name or consumer ID.“channel1”
namespaceNThe administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: “default”“default”
persistentNPulsar supports two kinds of topics: persistent and non-persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks.
disableBatchingNdisable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting disableBatching: true will make the producer to send messages individually. Default: “false”“true”, “false”
batchingMaxPublishDelayNbatchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Default: “10ms”“10ms”, “10”
batchingMaxMessagesNbatchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: “1000”“1000”
batchingMaxSizeNbatchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: “128KB”“131072”
.jsonschemaNEnforces JSON schema validation for the configured topic.
.avroschemaNEnforces Avro schema validation for the configured topic.
publicKeyNA public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
privateKeyNA private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
keysNA comma delimited string containing names of Pulsar session keys. Used in conjunction with publicKey for publisher encryption
processModeNEnable processing multiple messages at once. Default: “async”“async”, “sync”
subscribeTypeNPulsar supports four kinds of subscription types. Default: “shared”“shared”, “exclusive”, “failover”, “key_shared”
partitionKeyNSets the key of the message for routing policy. Default: “”
maxConcurrentHandlersNDefines the maximum number of concurrent message handlers. Default: 10010

Authenticate using Token

To authenticate to pulsar using a static JWT token, you can use the following metadata field:

FieldRequiredDetailsExample
tokenNToken used for authentication.How to create Pulsar token
  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "pulsar.example.com:6650"
  11. - name: token
  12. secretKeyRef:
  13. name: pulsar
  14. key: token

Authenticate using OIDC

Since v3.0, Pulsar supports OIDC authentication. To enable OIDC authentication, you need to provide the following OAuth2 parameters to the component spec. OAuth2 authentication cannot be used in combination with token authentication. It is recommended that you use a secret reference for the client secret. The pulsar OAuth2 authenticator is not specifically complaint with OIDC so it is your responsibility to ensure fields are compliant. For example, the issuer URL must use the https protocol, the requested scopes include openid, etc. If the oauth2TokenCAPEM field is omitted then the system’s certificate pool is used for connecting to the OAuth2 issuer if using https.

FieldRequiredDetailsExample
oauth2TokenURLNURL to request the OIDC client_credentials token from. Must not be empty.https://oauth.example.com/o/oauth2/token”`
oauth2TokenCAPEMNCA PEM certificate bundle to connect to the OAuth2 issuer. If not defined, the system’s certificate pool will be used.“—-BEGIN CERTIFICATE—-\n…\n—-END CERTIFICATE—-“
oauth2ClientIDNOIDC client ID. Must not be empty.“my-client-id”
oauth2ClientSecretNOIDC client secret. Must not be empty.“my-client-secret”
oauth2AudiencesNComma separated list of audiences to request for. Must not be empty.“my-audience-1,my-audience-2”
oauth2ScopesNComma separated list of scopes to request. Must not be empty.“openid,profile,email”
  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "pulsar.example.com:6650"
  11. - name: oauth2TokenURL
  12. value: https://oauth.example.com/o/oauth2/token
  13. - name: oauth2TokenCAPEM
  14. value: "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
  15. - name: oauth2ClientID
  16. value: my-client-id
  17. - name: oauth2ClientSecret
  18. secretKeyRef:
  19. name: pulsar-oauth2
  20. key: my-client-secret
  21. - name: oauth2Audiences
  22. value: "my.pulsar.example.com,another.pulsar.example.com"
  23. - name: oauth2Scopes
  24. value: "openid,profile,email"

Enabling message delivery retries

The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.

Delay queue

When invoking the Pulsar pub/sub, it’s possible to provide an optional delay queue by using the metadata query parameters in the request url.

These optional parameter names are metadata.deliverAt or metadata.deliverAfter:

  • deliverAt: Delay message to deliver at a specified time (RFC3339 format); for example, "2021-09-01T10:00:00Z"
  • deliverAfter: Delay message to deliver after a specified amount of time; for example,"4h5m3s"

Examples:

  1. curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt='2021-09-01T10:00:00Z' \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Or

  1. curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter='4h5m3s' \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

E2E Encryption

Dapr supports setting public and private key pairs to enable Pulsar’s end-to-end encryption feature.

Enabling publisher encryption from file certs

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: publicKey
  12. value: ./public.key
  13. - name: keys
  14. value: myapp.key

Enabling consumer encryption from file certs

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: publicKey
  12. value: ./public.key
  13. - name: privateKey
  14. value: ./private.key

Enabling publisher encryption from value

Note: It is recommended to reference the public key from a secret.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: publicKey
  12. value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  13. - name: keys
  14. value: myapp.key

Enabling consumer encryption from value

Note: It is recommended to reference the public and private keys from a secret.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: publicKey
  12. value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  13. - name: privateKey
  14. value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"

Partition Key

When invoking the Pulsar pub/sub, it’s possible to provide an optional partition key by using the metadata query parameter in the request url.

The parameter name is partitionKey.

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Message headers

All other metadata key/value pairs (that are not partitionKey) are set as headers in the Pulsar message. For example, set a correlationId for the message:

  1. curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Order guarantee

To ensure that messages arrive in order for each consumer subscribed to a specific key, three conditions must be met.

  1. subscribeType should be set to key_shared.
  2. partitionKey must be set.
  3. processMode should be set to sync.

Create a Pulsar instance

  1. docker run -it \
  2. -p 6650:6650 \
  3. -p 8080:8080 \
  4. --mount source=pulsardata,target=/pulsar/data \
  5. --mount source=pulsarconf,target=/pulsar/conf \
  6. apachepulsar/pulsar:2.5.1 \
  7. bin/pulsar standalone

Refer to the following Helm chart Documentation.

Last modified March 21, 2024: Merge pull request #4082 from newbe36524/v1.13 (f4b0938)