RabbitMQ

Detailed documentation on the RabbitMQ pubsub component

Component format

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: rabbitmq-pubsub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "amqp://localhost:5672"
  11. - name: consumerID
  12. value: myapp
  13. - name: durable
  14. value: false
  15. - name: deletedWhenUnused
  16. value: false
  17. - name: autoAck
  18. value: false
  19. - name: deliveryMode
  20. value: 0
  21. - name: requeueInFailure
  22. value: false
  23. - name: prefetchCount
  24. value: 0
  25. - name: reconnectWait
  26. value: 0
  27. - name: concurrencyMode
  28. value: parallel
  29. - name: publisherConfirm
  30. value: false
  31. - name: backOffPolicy
  32. value: exponential
  33. - name: backOffInitialInterval
  34. value: 100
  35. - name: backOffMaxRetries
  36. value: 16
  37. - name: enableDeadLetter # Optional enable dead Letter or not
  38. value: true
  39. - name: maxLen # Optional max message count in a queue
  40. value: 3000
  41. - name: maxLenBytes # Optional maximum length in bytes of a queue.
  42. value: 10485760
  43. - name: exchangeKind
  44. value: fanout

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsExample
hostYConnection-string for the rabbitmq hostamqp://user:pass@localhost:5672
consumerIDNConsumer ID a.k.a consumer tag organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer, i.e. a message is processed only once by one of the consumers in the group. If the consumer ID is not set, the dapr runtime will set it to the dapr application ID.
durableNWhether or not to use durable queues. Defaults to “false”“true”, “false”
deletedWhenUnusedNWhether or not the queue should be configured to auto-delete Defaults to “true”“true”, “false”
autoAckNWhether or not the queue consumer should auto-ack messages. Defaults to “false”“true”, “false”
deliveryModeNPersistence mode when publishing messages. Defaults to “0”. RabbitMQ treats “2” as persistent, all other numbers as non-persistent“0”, “2”
requeueInFailureNWhether or not to requeue when sending a negative acknowledgement in case of a failure. Defaults to “false”“true”, “false”
prefetchCountNNumber of messages to prefetch. Consider changing this to a non-zero value for production environments. Defaults to “0”, which means that all available messages will be pre-fetched.“2”
publisherConfirmNIf enabled, client waits for publisher confirms after publishing a message. Defaults to “false”“true”, “false”
reconnectWaitNHow long to wait (in seconds) before reconnecting if a connection failure occurs“0”
concurrencyModeNparallel is the default, and allows processing multiple messages in parallel (limited by the app-max-concurrency annotation, if configured). Set to single to disable parallel processing. In most situations there’s no reason to change this.parallel, single
backOffPolicyNRetry policy, “constant” is a backoff policy that always returns the same backoff delay. “exponential” is a backoff policy that increases the backoff period for each retry attempt using a randomization function that grows exponentially. Defaults to “constant”.constantexponential
backOffDurationNThe fixed interval only takes effect when the policy is constant. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that will be processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “5s”.“5s”“5000”
backOffInitialIntervalNThe backoff initial interval on retry. Only takes effect when the policy is exponential. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that will be processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “500”“50”
backOffMaxIntervalNThe backoff initial interval on retry. Only takes effect when the policy is exponential. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that will be processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “60s”“60000”
backOffMaxRetriesNThe maximum number of retries to process the message before returning an error. Defaults to “0” which means the component will not retry processing the message. “-1” will retry indefinitely until the message is processed or the application is shutdown. Any positive number is treated as the maximum retry count.“3”
backOffRandomizationFactorNRandomization factor, between 1 and 0, including 0 but not 1. Randomized interval = RetryInterval * (1 ± backOffRandomizationFactor). Defaults to “0.5”.“0.5”
backOffMultiplierNBackoff multiplier for the policy. Increments the interval by multiplying it with the multiplier. Defaults to “1.5”“1.5”
backOffMaxElapsedTimeNAfter MaxElapsedTime the ExponentialBackOff returns Stop. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that will be processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “15m”“15m”
enableDeadLetterNEnable forwarding Messages that cannot be handled to a dead-letter topic. Defaults to “false”“true”, “false”
maxLenNThe maximum number of messages of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.“1000”
maxLenBytesNMaximum length in bytes of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.“1048576”
exchangeKindNExchange kind of the rabbitmq exchange. Defaults to “fanout”.“fanout”,“topic”

Backoff policy introduction

Backoff retry strategy can instruct the dapr sidecar how to resend the message. By default, the retry strategy is turned off, which means that the sidecar will send a message to the service once. When the service returns a result, the message will be marked as consumption regardless of whether it is correct or not. The above is based on the condition of autoAck and requeueInFailure is setting to false(if requeueInFailure is set to true, the message will get a second chance).

But in some cases, you may want dapr to retry pushing message with an (exponential or constant) backoff strategy until the message is processed normally or the number of retries is exhausted. This maybe useful when your service breaks down abnormally but the sidecar is not stopped together. Adding backoff policy will retry the message pushing during the service downtime, instead of marking these message as consumed.

Create a RabbitMQ server

You can run a RabbitMQ server locally using Docker:

  1. docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

You can then interact with the server using the client port: localhost:5672.

The easiest way to install RabbitMQ on Kubernetes is by using the Helm chart:

  1. helm install rabbitmq stable/rabbitmq

Look at the chart output and get the username and password.

This will install RabbitMQ into the default namespace. To interact with RabbitMQ, find the service with: kubectl get svc rabbitmq.

For example, if installing using the example above, the RabbitMQ server client address would be:

rabbitmq.default.svc.cluster.local:5672

Use topic exchange to route messages

Setting exchangeKind to "topic" uses the topic exchanges, which are commonly used for the multicast routing of messages. Messages with a routing key will be routed to one or many queues based on the routing key defined in the metadata when subscribing. The routing key is defined by the routingKey metadata. For example, if an app is configured with a routing key keyA:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order_pub_sub
  5. spec:
  6. topic: B
  7. route: /B
  8. pubsubname: pubsub
  9. metadata:
  10. routingKey: keyA

It will receive messages with routing key keyA, and messages with other routing keys are not received.

  1. // publish messages with routing key `keyA`, and these will be received by the above example.
  2. client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
  3. // publish messages with routing key `keyB`, and these will not be received by the above example.
  4. client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))

Bind multiple routingKey

Multiple routing keys can be separated by commas.
The example below binds three routingKey: keyA, keyB, and "". Note the binding method of empty keys.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order_pub_sub
  5. spec:
  6. topic: B
  7. route: /B
  8. pubsubname: pubsub
  9. metadata:
  10. routingKey: keyA,keyB,

For more information see rabbitmq exchanges.

Last modified July 27, 2022: Remove namespace element from component examples (#2647) (ff9de5c8)