RocketMQ

Detailed documentation on the RocketMQ pubsub component

Component format

To setup RocketMQ pubsub, create a component of type pubsub.rocketmq. See this guide on how to create and apply a pubsub configuration.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: rocketmq-pubsub
  5. spec:
  6. type: pubsub.rocketmq
  7. version: v1
  8. metadata:
  9. - name: instanceName
  10. value: dapr-rocketmq-test
  11. - name: consumerGroup
  12. value: dapr-rocketmq-test-g-c
  13. - name: producerGroup
  14. value: dapr-rocketmq-test-g-p
  15. - name: nameSpace
  16. value: dapr-test
  17. - name: nameServer
  18. value: "127.0.0.1:9876,127.0.0.2:9876"
  19. - name: retries
  20. value: 3
  21. - name: consumerModel
  22. value: "clustering"
  23. - name: consumeOrderly
  24. value: false

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsdefaultExample
instanceNameNInstance nametime.Now().String()dapr-rocketmq-test
consumerGroupNConsumer group name. Recommend. If producerGroup is nullgroupName is used.dapr-rocketmq-test-g-c
producerGroup (consumerID)NProducer group name. Recommended. If producerGroup is nullconsumerID is used. If consumerID also is null, groupName is used.dapr-rocketmq-test-g-p
groupNameNConsumer/Producer group name. Depreciated.dapr-rocketmq-test-g
nameSpaceNRocketMQ namespacedapr-rocketmq
nameServerDomainNRocketMQ name server domainhttps://my-app.net:8080/nsaddr
nameServerNRocketMQ name server, separated by “,” or “;”127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKeyNAccess Key (Username)“admin”
secretKeyNSecret Key (Password)“password”
securityTokenNSecurity Token
retriesNNumber of retries to send a message to broker33
producerQueueSelector (queueSelector)NProducer Queue selector. There are five implementations of queue selector: hash, random, manual, roundRobin, dapr.daprhash
consumerModelNMessage model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clustering and broadcasting.clusteringbroadcasting , clustering
fromWhere (consumeFromWhere)NConsuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMPCONSUME_FROM_LAST_OFFSETCONSUME_FROM_LAST_OFFSET
consumeOrderlyNDetermines if it’s an ordered message using FIFO order.falsefalse
consumeMessageBatchMaxSizeNBatch consumption size out of range [1, 1024]51210
maxReconsumeTimesNMax re-consume times. -1 means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue.Orderly message is MaxInt32; Concurrently message is 1616
autoCommitNEnable auto committruefalse
pullIntervalNMessage pull interval100100
pullBatchSizeNThe number of messages pulled from the broker at a time. If pullBatchSize is null, use ConsumerBatchSize. pullBatchSize out of range [1, 1024]3210
content-typeNMessage content type.“text/plain”“application/cloudevents+json; charset=utf-8”, “application/octet-stream”
logLevelNLog levelwarninfo
sendTimeOutNSend message timeout to connect RocketMQ’s broker, measured in nanoseconds. Deprecated.3 seconds10000000000
sendTimeOutSecNTimeout duration for publishing a message in seconds. If sendTimeOutSec is null, sendTimeOut is used.3 seconds3
mspPropertiesNThe RocketMQ message properties in this collection are passed to the APP in Data Separate multiple properties with “,”key,mkey

For backwards-compatibility reasons, the following values in the metadata are supported, although their use is discouraged.

Field (supported but deprecated)RequiredDetailsExample
groupNameNProducer group name for RocketMQ publishers“my_unique_group_name”
sendTimeOutNTimeout duration for publishing a message in nanoseconds0
consumerBatchSizeNThe number of messages pulled from the broker at a time32

Setup RocketMQ

See https://rocketmq.apache.org/docs/quick-start/ to setup a local RocketMQ instance.

Per-call metadata fields

Partition Key

When invoking the RocketMQ pub/sub, it’s possible to provide an optional partition key by using the metadata query param in the request url.

You need to specify rocketmq-tag , "rocketmq-key" , rocketmq-shardingkey , rocketmq-queue in metadata

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

QueueSelector

The RocketMQ component contains a total of five queue selectors. The RocketMQ client provides the following queue selectors:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

To learn more about these RocketMQ client queue selectors, read the RocketMQ documentation.

The Dapr RocketMQ component implements the following queue selector:

  • DaprQueueSelector

This article focuses on the design of DaprQueueSelector.

DaprQueueSelector

DaprQueueSelector integrates three queue selectors:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector gets the queue id from the request parameter. You can set the queue id by running the following:

  1. http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

The ManualQueueSelector is implemented using the method above.

Next, the DaprQueueSelector tries to:

  • Get a ShardingKey
  • Hash the ShardingKey to determine the queue id.

You can set the ShardingKey by doing the following:

  1. http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

If the ShardingKey does not exist, the RoundRobin algorithm is used to determine the queue id.

Last modified December 14, 2022: Modified the RocketMQ document based on V1.10 (#2951) (7d5c4bc6)