MQTT3

Detailed documentation on the MQTT3 pubsub component

Component format

To setup a MQTT3 pubsub create a component of type pubsub.mqtt3. See this guide on how to create and apply a pubsub configuration

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt3
  7. version: v1
  8. metadata:
  9. - name: url
  10. value: "tcp://[username][:password]@host.domain[:port]"
  11. # Optional
  12. - name: retain
  13. value: "false"
  14. - name: cleanSession
  15. value: "false"
  16. - name: qos
  17. value: "1"

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsExample
urlYAddress of the MQTT broker. Can be secretKeyRef to use a secret reference.
Use the tcp:// URI scheme for non-TLS communication.
Use the ssl:// URI scheme for TLS communication.
“tcp://[username][:password]@host.domain[:port]”
consumerIDNThe client ID used to connect to the MQTT broker. Defaults to the Dapr app ID.“myMqttClientApp”
retainNDefines whether the message is saved by the broker as the last known good value for a specified topic. Defaults to “false”.“true”, “false”
cleanSessionNSets the clean_session flag in the connection message to the MQTT broker if “true” (more info). Defaults to “false”.“true”, “false”
caCertRequired for using TLSCertificate Authority (CA) certificate in PEM format for verifying server TLS certificates.See example below
clientCertRequired for using TLSTLS client certificate in PEM format. Must be used with clientKey.See example below
clientKeyRequired for using TLSTLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference.See example below
qosNIndicates the Quality of Service Level (QoS) of the message (more info). Defaults to 1.0, 1, 2

Communication using TLS

To configure communication using TLS, ensure that the MQTT broker (e.g. emqx) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt3
  7. version: v1
  8. metadata:
  9. - name: url
  10. value: "ssl://host.domain[:port]"
  11. # TLS configuration
  12. - name: caCert
  13. value: |
  14. -----BEGIN CERTIFICATE-----
  15. ...
  16. -----END CERTIFICATE-----
  17. - name: clientCert
  18. value: |
  19. -----BEGIN CERTIFICATE-----
  20. ...
  21. -----END CERTIFICATE-----
  22. - name: clientKey
  23. secretKeyRef:
  24. name: myMqttClientKey
  25. key: myMqttClientKey
  26. # Optional
  27. - name: retain
  28. value: "false"
  29. - name: cleanSession
  30. value: "false"
  31. - name: qos
  32. value: 1

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Consuming a shared topic

When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag (which will give each instance a randomly generated value on start up) or {podName} (which will use the Pod’s name on Kubernetes). For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt3
  7. version: v1
  8. metadata:
  9. - name: consumerID
  10. value: "{uuid}"
  11. - name: cleanSession
  12. value: "true"
  13. - name: url
  14. value: "tcp://admin:public@localhost:1883"
  15. - name: qos
  16. value: 1
  17. - name: retain
  18. value: "false"

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Note that in the case, the value of the consumer ID is random every time Dapr restarts, so you should set cleanSession to true as well.

It is recommended to use StatefulSets with shared subscriptions.

Create a MQTT3 broker

You can run a MQTT broker like emqx locally using Docker:

  1. docker run -d -p 1883:1883 --name mqtt emqx:latest

You can then interact with the server using the client port: tcp://localhost:1883

You can run a MQTT3 broker in kubernetes using following yaml:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: mqtt-broker
  5. labels:
  6. app-name: mqtt-broker
  7. spec:
  8. replicas: 1
  9. selector:
  10. matchLabels:
  11. app-name: mqtt-broker
  12. template:
  13. metadata:
  14. labels:
  15. app-name: mqtt-broker
  16. spec:
  17. containers:
  18. - name: mqtt
  19. image: emqx:latest
  20. imagePullPolicy: IfNotPresent
  21. ports:
  22. - name: default
  23. containerPort: 1883
  24. protocol: TCP
  25. ---
  26. apiVersion: v1
  27. kind: Service
  28. metadata:
  29. name: mqtt-broker
  30. labels:
  31. app-name: mqtt-broker
  32. spec:
  33. type: ClusterIP
  34. selector:
  35. app-name: mqtt-broker
  36. ports:
  37. - port: 1883
  38. targetPort: default
  39. name: default
  40. protocol: TCP

You can then interact with the server using the client port: tcp://mqtt-broker.default.svc.cluster.local:1883

Last modified February 9, 2023: Subscribe with StatefulSet (25355f12)