Apache Kafka

Detailed documentation on the Apache Kafka pubsub component

Component format

To setup Apache Kafka pubsub create a component of type pubsub.kafka. See this guide on how to create and apply a pubsub configuration. For details on using secretKeyRef, see the guide on how to reference secrets in components.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "password"
  18. - name: saslUsername # Required if authType is `password`.
  19. value: "adminuser"
  20. - name: saslPassword # Required if authType is `password`.
  21. secretKeyRef:
  22. name: kafka-secrets
  23. key: saslPasswordSecret
  24. - name: maxMessageBytes # Optional.
  25. value: 1024
  26. - name: consumeRetryInterval # Optional.
  27. value: 200ms
  28. - name: version # Optional.
  29. value: 0.10.2.0

Spec metadata fields

FieldRequiredDetailsExample
brokersYA comma-separated list of Kafka brokers.“localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093”
consumerGroupNA kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.“group1”
clientIDNA user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to “sarama”.“my-dapr-app”
authRequiredNDeprecated Enable SASL authentication with the Kafka brokers.“true”, “false”
authTypeYConfigure or disable authentication. Supported values: none, password, mtls, or oidc“password”, “none”
saslUsernameNThe SASL username used for authentication. Only required if authType is set to “password”.“adminuser”
saslPasswordNThe SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to “password”. | ““, “KeFg23!“</td><td></td></tr><tr><td>initialOffset</td><td>N</td><td>The initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”.</td><td><code>"oldest"</code></td></tr><tr><td>maxMessageBytes</td><td>N</td><td>The maximum size in bytes allowed for a single Kafka message. Defaults to 1024.</td><td><code>2048</code></td></tr><tr><td>consumeRetryInterval</td><td>N</td><td>The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms.</td><td><code>200ms</code></td></tr><tr><td>version</td><td>N</td><td>Kafka cluster version. Defaults to 2.0.0.0</td><td><code>0.10.2.0</code></td></tr><tr><td>caCert</td><td>N</td><td>Certificate authority certificate, required for using TLS. Can be <code>secretKeyRef</code> to use a secret reference</td><td><code>"-----BEGIN CERTIFICATE-----\n&lt;base64-encoded DER&gt;\n-----END CERTIFICATE-----"</code></td></tr><tr><td>clientCert</td><td>N</td><td>Client certificate, required for <code>authType</code> <code>mtls</code>. Can be <code>secretKeyRef</code> to use a secret reference</td><td><code>"-----BEGIN CERTIFICATE-----\n&lt;base64-encoded DER&gt;\n-----END CERTIFICATE-----"</code></td></tr><tr><td>clientKey</td><td>N</td><td>Client key, required for <code>authType</code> <code>mtls</code> Can be <code>secretKeyRef</code> to use a secret reference</td><td><code>"-----BEGIN RSA PRIVATE KEY-----\n&lt;base64-encoded PKCS8&gt;\n-----END RSA PRIVATE KEY-----"</code></td></tr><tr><td>skipVerify</td><td>N</td><td>Skip TLS verification, this is not recommended for use in production. Defaults to <code>"false"</code></td><td><code>"true"</code>, <code>"false"</code></td></tr><tr><td>disableTls</td><td>N</td><td>Disable TLS for transport security. This is not recommended for use in production. Defaults to <code>"false"</code></td><td><code>"true"</code>, <code>"false"</code></td></tr><tr><td>oidcTokenEndpoint</td><td>N</td><td>Full URL to an OAuth2 identity provider access token endpoint. Required when <code>authType</code> is set to <code>oidc</code></td><td>“<a href="https://identity.example.com/v1/token%22">https://identity.example.com/v1/token"</a></td></tr><tr><td>oidcClientID</td><td>N</td><td>The OAuth2 client ID that has been provisioned in the identity provider. Required when <code>authType is set to </code>oidc<code>|</code>dapr-kafka |
oidcClientSecretNThe OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc“KeFg23!”
oidcScopesNComma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to “openid”‘“openid,kafka-prod”` |

The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.

Authentication

Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired field has been deprecated from the v1.6 release and instead the authType field should be used. If authRequired is set to true, Dapr will attempt to configure authType correctly based on the value of saslPassword. There are four valid values for authType: none, password, mtls, and oidc. Note this is authentication only; authorization is still configured within Kafka.

None

Setting authType to none will disable any authentication. This is NOT recommended in production.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-noauth
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "none"
  18. - name: maxMessageBytes # Optional.
  19. value: 1024
  20. - name: consumeRetryInterval # Optional.
  21. value: 200ms
  22. - name: version # Optional.
  23. value: 0.10.2.0
  24. - name: disableTls
  25. value: "true"

SASL Password

Setting authType to password enables SASL authentication using the PLAIN mechanism. This requires setting the saslUsername and saslPassword fields.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-sasl
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "password"
  18. - name: saslUsername # Required if authType is `password`.
  19. value: "adminuser"
  20. - name: saslPassword # Required if authType is `password`.
  21. secretKeyRef:
  22. name: kafka-secrets
  23. key: saslPasswordSecret
  24. - name: maxMessageBytes # Optional.
  25. value: 1024
  26. - name: consumeRetryInterval # Optional.
  27. value: 200ms
  28. - name: version # Optional.
  29. value: 0.10.2.0
  30. - name: caCert
  31. secretKeyRef:
  32. name: kafka-tls
  33. key: caCert

Mutual TLS

Setting authType to mtls uses a x509 client certificate (the clientCert field) and key (the clientKey field) to authenticate. Note that mTLS as an authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls must be false), but securing the transport layer does not require using mTLS. See Communication using TLS for configuring underlying TLS transport.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-mtls
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "mtls"
  18. - name: caCert
  19. secretKeyRef:
  20. name: kafka-tls
  21. key: caCert
  22. - name: clientCert
  23. secretKeyRef:
  24. name: kafka-tls
  25. key: clientCert
  26. - name: clientKey
  27. secretKeyRef:
  28. name: kafka-tls
  29. key: clientKey
  30. - name: maxMessageBytes # Optional.
  31. value: 1024
  32. - name: consumeRetryInterval # Optional.
  33. value: 200ms
  34. - name: version # Optional.
  35. value: 0.10.2.0

OAuth2 or OpenID Connect

Setting authType to oidc enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or OIDC identity provider. Currenly only the client_credentials grant is supported. Configure oidcTokenEndpoint to the full URL for the identity provider access token endpoint. Set oidcClientID and oidcClientSecret to the client credentials provisioned in the identity provider. If caCert is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify is specified in the component configuration, verification will also be skipped when accessing the identity provider. By default, the only scope requested for the token is openid; it is highly recommended that additional scopes be specified via oidcScopes in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, a compromised Kafka broker could replay the token to access other services as the Dapr clientID.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "oidc"
  18. - name: oidcTokenEndpoint # Required if authType is `oidc`.
  19. value: "https://identity.example.com/v1/token"
  20. - name: oidcClientID # Required if authType is `oidc`.
  21. value: "dapr-myapp"
  22. - name: oidcClientSecret # Required if authType is `oidc`.
  23. secretKeyRef:
  24. name: kafka-secrets
  25. key: oidcClientSecret
  26. - name: oidcScopes # Recommended if authType is `oidc`.
  27. value: "openid,kafka-dev"
  28. - name: caCert # Also applied to verifying OIDC provider certificate
  29. secretKeyRef:
  30. name: kafka-tls
  31. key: caCert
  32. - name: maxMessageBytes # Optional.
  33. value: 1024
  34. - name: consumeRetryInterval # Optional.
  35. value: 200ms
  36. - name: version # Optional.
  37. value: 0.10.2.0

Communication using TLS

By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls to true. When TLS is enabled, you can control server certificate verification using skipVerify to disable verificaiton (NOT recommended in production environments) and caCert to specify a trusted TLS certificate authority (CA). If no caCert is specified, the system CA trust will be used. To also configure mTLS authentication, see the section under Authentication. Below is an example of a Kafka pubsub component configured to use transport layer TLS:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authType # Required.
  17. value: "password"
  18. - name: saslUsername # Required if authType is `password`.
  19. value: "adminuser"
  20. - name: consumeRetryInterval # Optional.
  21. value: 200ms
  22. - name: version # Optional.
  23. value: 0.10.2.0
  24. - name: saslPassword # Required if authRequired is `true`.
  25. secretKeyRef:
  26. name: kafka-secrets
  27. key: saslPasswordSecret
  28. - name: maxMessageBytes # Optional.
  29. value: 1024
  30. - name: caCert # Certificate authority certificate.
  31. secretKeyRef:
  32. name: kafka-tls
  33. key: caCert
  34. auth:
  35. secretStore: <SECRET_STORE_NAME>

Per-call metadata fields

Partition Key

When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata query param in the request url.

The param name is partitionKey.

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Message headers

All other metadata key/value pairs (that are not partitionKey) are set as headers in the Kafka message. Here is an example setting a correlationId for the message.

  1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Create a Kafka instance

You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide here.

To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

Last modified February 18, 2022: Update setup-jetstream.md (#2200) (428d8c2)