RocketMQ

Detailed documentation on the RocketMQ pubsub component

Component format

To set up RocketMQ pub/sub, create a component of type pubsub.rocketmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: rocketmq-pubsub
  5. spec:
  6. type: pubsub.rocketmq
  7. version: v1
  8. metadata:
  9. - name: instanceName
  10. value: dapr-rocketmq-test
  11. - name: consumerGroup
  12. value: dapr-rocketmq-test-g-c
  13. - name: producerGroup
  14. value: dapr-rocketmq-test-g-p
  15. - name: consumerID
  16. value: topic
  17. - name: nameSpace
  18. value: dapr-test
  19. - name: nameServer
  20. value: "127.0.0.1:9876,127.0.0.2:9876"
  21. - name: retries
  22. value: 3
  23. - name: consumerModel
  24. value: "clustering"
  25. - name: consumeOrderly
  26. value: false

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsdefaultExample
instanceNameNInstance nametime.Now().String()dapr-rocketmq-test
consumerGroupNConsumer group name. Recommend. If producerGroup is nullgroupName is used.dapr-rocketmq-test-g-c
producerGroup (consumerID)NProducer group name. Recommended. If producerGroup is nullconsumerID is used. If consumerID also is null, groupName is used.dapr-rocketmq-test-g-p
consumerIDNConsumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.“channel1”
groupNameNConsumer/Producer group name. Depreciated.dapr-rocketmq-test-g
nameSpaceNRocketMQ namespacedapr-rocketmq
nameServerDomainNRocketMQ name server domainhttps://my-app.net:8080/nsaddr
nameServerNRocketMQ name server, separated by “,” or “;”127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKeyNAccess Key (Username)“admin”
secretKeyNSecret Key (Password)“password”
securityTokenNSecurity Token
retriesNNumber of retries to send a message to broker33
producerQueueSelector (queueSelector)NProducer Queue selector. There are five implementations of queue selector: hash, random, manual, roundRobin, dapr.daprhash
consumerModelNMessage model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clustering and broadcasting.clusteringbroadcasting , clustering
fromWhere (consumeFromWhere)NConsuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMPCONSUME_FROM_LAST_OFFSETCONSUME_FROM_LAST_OFFSET
consumeTimestampNBacktracks consumption time with second precision. Time format is yyyymmddhhmmss. For example, 20131223171201 implies the time of 17:12:01 and date of December 23, 2013time.Now().Add(time.Minute * (-30)).Format(“20060102150405”)20131223171201
consumeOrderlyNDetermines if it’s an ordered message using FIFO order.falsefalse
consumeMessageBatchMaxSizeNBatch consumption size out of range [1, 1024]51210
consumeConcurrentlyMaxSpanNConcurrently max span offset. This has no effect on sequential consumption. Range: [1, 65535]10001000
maxReconsumeTimesNMax re-consume times. -1 means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue.Orderly message is MaxInt32; Concurrently message is 1616
autoCommitNEnable auto committruefalse
consumeTimeoutNMaximum amount of time a message may block the consuming thread. Time unit: Minute1515
consumerPullTimeoutNThe socket timeout in milliseconds
pullIntervalNMessage pull interval100100
pullBatchSizeNThe number of messages pulled from the broker at a time. If pullBatchSize is null, use ConsumerBatchSize. pullBatchSize out of range [1, 1024]3210
pullThresholdForQueueNFlow control threshold on queue level. Each message queue will cache a maximum of 1000 messages by default. Consider the PullBatchSize - the instantaneous value may exceed the limit. Range: [1, 65535]10241000
pullThresholdForTopicNFlow control threshold on topic level. The value of pullThresholdForQueue will be overwritten and calculated based on pullThresholdForTopic if it isn’t unlimited. For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, then pullThresholdForQueue will be set to 100. Range: [1, 6553500]-1(Unlimited)10
pullThresholdSizeForQueueNLimit the cached message size on queue level. Consider the pullBatchSize - the instantaneous value may exceed the limit. The size of a message is only measured by message body, so it’s not accurate. Range: [1, 1024]100100
pullThresholdSizeForTopicNLimit the cached message size on topic level. The value of pullThresholdSizeForQueue will be overwritten and calculated based on pullThresholdSizeForTopic if it isn’t unlimited. For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB. Range: [1, 102400]-1100
content-typeNMessage content type.“text/plain”“application/cloudevents+json; charset=utf-8”, “application/octet-stream”
logLevelNLog levelwarninfo
sendTimeOutNSend message timeout to connect RocketMQ’s broker, measured in nanoseconds. Deprecated.3 seconds10000000000
sendTimeOutSecNTimeout duration for publishing a message in seconds. If sendTimeOutSec is null, sendTimeOut is used.3 seconds3
mspPropertiesNThe RocketMQ message properties in this collection are passed to the APP in Data Separate multiple properties with “,”key,mkey

For backwards-compatibility reasons, the following values in the metadata are supported, although their use is discouraged.

Field (supported but deprecated)RequiredDetailsExample
groupNameNProducer group name for RocketMQ publishers“my_unique_group_name”
sendTimeOutNTimeout duration for publishing a message in nanoseconds0
consumerBatchSizeNThe number of messages pulled from the broker at a time32

Setup RocketMQ

See https://rocketmq.apache.org/docs/quick-start/ to setup a local RocketMQ instance.

Per-call metadata fields

Partition Key

When invoking the RocketMQ pub/sub, it’s possible to provide an optional partition key by using the metadata query param in the request url.

You need to specify rocketmq-tag , "rocketmq-key" , rocketmq-shardingkey , rocketmq-queue in metadata

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

QueueSelector

The RocketMQ component contains a total of five queue selectors. The RocketMQ client provides the following queue selectors:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

To learn more about these RocketMQ client queue selectors, read the RocketMQ documentation.

The Dapr RocketMQ component implements the following queue selector:

  • DaprQueueSelector

This article focuses on the design of DaprQueueSelector.

DaprQueueSelector

DaprQueueSelector integrates three queue selectors:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector gets the queue id from the request parameter. You can set the queue id by running the following:

  1. http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

The ManualQueueSelector is implemented using the method above.

Next, the DaprQueueSelector tries to:

  • Get a ShardingKey
  • Hash the ShardingKey to determine the queue id.

You can set the ShardingKey by doing the following:

  1. http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

If the ShardingKey does not exist, the RoundRobin algorithm is used to determine the queue id.

Last modified October 12, 2023: Update config.toml (#3826) (0ffc2e7)