Apache Kafka

Detailed documentation on the Apache Kafka pubsub component

Component format

To set up Apache Kafka pub/sub, create a component of type pubsub.kafka. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

All component metadata field values can carry templated metadata values, which are resolved on Dapr sidecar startup. For example, you can choose to use {namespace} as the consumerGroup to enable using the same appId in different namespaces using the same topics as described in this article.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "{namespace}"
  13. - name: consumerID # Optional. If not supplied, runtime will create one.
  14. value: "channel1"
  15. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  16. value: "my-dapr-app-id"
  17. - name: authType # Required.
  18. value: "password"
  19. - name: saslUsername # Required if authType is `password`.
  20. value: "adminuser"
  21. - name: saslPassword # Required if authType is `password`.
  22. secretKeyRef:
  23. name: kafka-secrets
  24. key: saslPasswordSecret
  25. - name: saslMechanism
  26. value: "SHA-512"
  27. - name: maxMessageBytes # Optional.
  28. value: 1024
  29. - name: consumeRetryInterval # Optional.
  30. value: 200ms
  31. - name: version # Optional.
  32. value: 2.0.0
  33. - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
  34. value: "true"
  35. - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
  36. value: http://localhost:8081
  37. - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
  38. value: XYAXXAZ
  39. - name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
  40. value: "ABCDEFGMEADFF"
  41. - name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
  42. value: true
  43. - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
  44. value: 5m

For details on using secretKeyRef, see the guide on how to reference secrets in components.

Spec metadata fields

FieldRequiredDetailsExample
brokersYA comma-separated list of Kafka brokers.“localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093”
consumerGroupNA kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead.“group1”
consumerIDNConsumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead.“channel1”
clientIDNA user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to “namespace.appID” for Kubernetes mode or “appID” for Self-Hosted mode.“my-namespace.my-dapr-app”, “my-dapr-app”
authRequiredNDeprecated Enable SASL authentication with the Kafka brokers.“true”, “false”
authTypeYConfigure or disable authentication. Supported values: none, password, mtls, oidc or awsiam“password”, “none”
saslUsernameNThe SASL username used for authentication. Only required if authType is set to “password”.“adminuser”
saslPasswordNThe SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to “password”`.“”, “KeFg23!”
saslMechanismNThe SASL Authentication Mechanism you wish to use. Only required if authType is set to “password”. Defaults to PLAINTEXT“SHA-512”, “SHA-256”, “PLAINTEXT”
initialOffsetNThe initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”.“oldest”
maxMessageBytesNThe maximum size in bytes allowed for a single Kafka message. Defaults to 1024.2048
consumeRetryIntervalNThe interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms.200ms
consumeRetryEnabledNDisable consume retry by setting “false”“true”, “false”
versionNKafka cluster version. Defaults to 2.0.0. Note that this must be set to 1.0.0 if you are using Azure EventHubs with Kafka.0.10.2.0
caCertNCertificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientCertNClient certificate, required for authType mtls. Can be secretKeyRef to use a secret reference“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientKeyNClient key, required for authType mtls Can be secretKeyRef to use a secret reference“——-BEGIN RSA PRIVATE KEY——-\n<base64-encoded PKCS8>\n——-END RSA PRIVATE KEY——-“
skipVerifyNSkip TLS verification, this is not recommended for use in production. Defaults to “false”“true”, “false”
disableTlsNDisable TLS for transport security. To disable, you’re not required to set value to “true”. This is not recommended for use in production. Defaults to “false”.“true”, “false”
oidcTokenEndpointNFull URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidchttps://identity.example.com/v1/token”
oidcClientIDNThe OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidcdapr-kafka
oidcClientSecretNThe OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc“KeFg23!”
oidcScopesNComma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to “openid”“openid,kafka-prod”
oidcExtensionsNInput/OutputString containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token
awsRegionNThe AWS region where the Kafka cluster is deployed to. Required when authType is set to awsiamus-west-1
awsAccessKeyNAWS access key associated with an IAM account.“accessKey”
awsSecretKeyNThe secret key associated with the access key.“secretKey”
awsSessionTokenNAWS session token to use. A session token is only required if you are using temporary security credentials.“sessionToken”
awsIamRoleArnNIAM role that has access to AWS Managed Streaming for Apache Kafka (MSK). This is another option to authenticate with MSK aside from the AWS Credentials.“arn:aws:iam::123456789:role/mskRole”
awsStsSessionNameNRepresents the session name for assuming a role.“MSKSASLDefaultSession”
schemaRegistryURLNRequired when using Schema Registry Avro serialization/deserialization. The Schema Registry URL.http://localhost:8081
schemaRegistryAPIKeyNWhen using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key.XYAXXAZ
schemaRegistryAPISecretNWhen using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.ABCDEFGMEADFF
schemaCachingEnabledNWhen using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is truetrue
schemaLatestVersionCacheTTLNWhen using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min5m

The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.

Note

The metadata version must be set to 1.0.0 when using Azure EventHubs with Kafka.

Authentication

Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired field has been deprecated from the v1.6 release and instead the authType field should be used. If authRequired is set to true, Dapr will attempt to configure authType correctly based on the value of saslPassword. The valid values for authType are:

  • none
  • password
  • certificate
  • mtls
  • oidc
  • awsiam

Note

authType is authentication only. Authorization is still configured within Kafka, except for awsiam, which can also drive authorization decisions configured in AWS IAM.

None

Setting authType to none will disable any authentication. This is NOT recommended in production.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-noauth
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "none"
  17. - name: maxMessageBytes # Optional.
  18. value: 1024
  19. - name: consumeRetryInterval # Optional.
  20. value: 200ms
  21. - name: version # Optional.
  22. value: 0.10.2.0
  23. - name: disableTls
  24. value: "true"

SASL Password

Setting authType to password enables SASL authentication. This requires setting the saslUsername and saslPassword fields.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-sasl
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "password"
  17. - name: saslUsername # Required if authType is `password`.
  18. value: "adminuser"
  19. - name: saslPassword # Required if authType is `password`.
  20. secretKeyRef:
  21. name: kafka-secrets
  22. key: saslPasswordSecret
  23. - name: saslMechanism
  24. value: "SHA-512"
  25. - name: maxMessageBytes # Optional.
  26. value: 1024
  27. - name: consumeRetryInterval # Optional.
  28. value: 200ms
  29. - name: version # Optional.
  30. value: 0.10.2.0
  31. - name: caCert
  32. secretKeyRef:
  33. name: kafka-tls
  34. key: caCert

Mutual TLS

Setting authType to mtls uses a x509 client certificate (the clientCert field) and key (the clientKey field) to authenticate. Note that mTLS as an authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls must be false), but securing the transport layer does not require using mTLS. See Communication using TLS for configuring underlying TLS transport.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-mtls
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "mtls"
  17. - name: caCert
  18. secretKeyRef:
  19. name: kafka-tls
  20. key: caCert
  21. - name: clientCert
  22. secretKeyRef:
  23. name: kafka-tls
  24. key: clientCert
  25. - name: clientKey
  26. secretKeyRef:
  27. name: kafka-tls
  28. key: clientKey
  29. - name: maxMessageBytes # Optional.
  30. value: 1024
  31. - name: consumeRetryInterval # Optional.
  32. value: 200ms
  33. - name: version # Optional.
  34. value: 0.10.2.0

OAuth2 or OpenID Connect

Setting authType to oidc enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or OIDC identity provider. Currently, only the client_credentials grant is supported.

Configure oidcTokenEndpoint to the full URL for the identity provider access token endpoint.

Set oidcClientID and oidcClientSecret to the client credentials provisioned in the identity provider.

If caCert is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify is specified in the component configuration, verification will also be skipped when accessing the identity provider.

By default, the only scope requested for the token is openid; it is highly recommended that additional scopes be specified via oidcScopes in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, a compromised Kafka broker could replay the token to access other services as the Dapr clientID.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "oidc"
  17. - name: oidcTokenEndpoint # Required if authType is `oidc`.
  18. value: "https://identity.example.com/v1/token"
  19. - name: oidcClientID # Required if authType is `oidc`.
  20. value: "dapr-myapp"
  21. - name: oidcClientSecret # Required if authType is `oidc`.
  22. secretKeyRef:
  23. name: kafka-secrets
  24. key: oidcClientSecret
  25. - name: oidcScopes # Recommended if authType is `oidc`.
  26. value: "openid,kafka-dev"
  27. - name: caCert # Also applied to verifying OIDC provider certificate
  28. secretKeyRef:
  29. name: kafka-tls
  30. key: caCert
  31. - name: maxMessageBytes # Optional.
  32. value: 1024
  33. - name: consumeRetryInterval # Optional.
  34. value: 200ms
  35. - name: version # Optional.
  36. value: 0.10.2.0

AWS IAM

Authenticating with AWS IAM is supported with MSK. Setting authType to awsiam uses AWS SDK to generate auth tokens to authenticate.

Note

The only required metadata field is awsRegion. If no awsAccessKey and awsSecretKey are provided, you can use AWS IAM roles for service accounts to have password-less authentication to your Kafka cluster.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-awsiam
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "awsiam"
  17. - name: awsRegion # Required.
  18. value: "us-west-1"
  19. - name: awsAccessKey # Optional.
  20. value: <AWS_ACCESS_KEY>
  21. - name: awsSecretKey # Optional.
  22. value: <AWS_SECRET_KEY>
  23. - name: awsSessionToken # Optional.
  24. value: <AWS_SESSION_KEY>
  25. - name: awsIamRoleArn # Optional.
  26. value: "arn:aws:iam::123456789:role/mskRole"
  27. - name: awsStsSessionName # Optional.
  28. value: "MSKSASLDefaultSession"

Communication using TLS

By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls to true. When TLS is enabled, you can control server certificate verification using skipVerify to disable verification (NOT recommended in production environments) and caCert to specify a trusted TLS certificate authority (CA). If no caCert is specified, the system CA trust will be used. To also configure mTLS authentication, see the section under Authentication. Below is an example of a Kafka pubsub component configured to use transport layer TLS:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: brokers # Required. Kafka broker connection setting
  10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  11. - name: consumerGroup # Optional. Used for input bindings.
  12. value: "group1"
  13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  14. value: "my-dapr-app-id"
  15. - name: authType # Required.
  16. value: "certificate"
  17. - name: consumeRetryInterval # Optional.
  18. value: 200ms
  19. - name: version # Optional.
  20. value: 0.10.2.0
  21. - name: maxMessageBytes # Optional.
  22. value: 1024
  23. - name: caCert # Certificate authority certificate.
  24. secretKeyRef:
  25. name: kafka-tls
  26. key: caCert
  27. auth:
  28. secretStore: <SECRET_STORE_NAME>

Consuming from multiple topics

When consuming from multiple topics using a single pub/sub component, there is no guarantee about how the consumers in your consumer group are balanced across the topic partitions.

For instance, let’s say you are subscribing to two topics with 10 partitions per topic and you have 20 replicas of your service consuming from the two topics. There is no guarantee that 10 will be assigned to the first topic and 10 to the second topic. Instead, the partitions could be divided unequally, with more than 10 assigned to the first topic and the rest assigned to the second topic.

This can result in idle consumers listening to the first topic and over-extended consumers on the second topic, or vice versa. This same behavior can be observed when using auto-scalers such as HPA or KEDA.

If you run into this particular issue, it is recommended that you configure a single pub/sub component per topic with uniquely defined consumer groups per component. This guarantees that all replicas of your service are fully allocated to the unique consumer group, where each consumer group targets one specific topic.

For example, you may define two Dapr components with the following configuration:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-topic-one
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: consumerGroup
  10. value: "{appID}-topic-one"
  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub-topic-two
  5. spec:
  6. type: pubsub.kafka
  7. version: v1
  8. metadata:
  9. - name: consumerGroup
  10. value: "{appID}-topic-two"

Sending and receiving multiple messages

Apache Kafka component supports sending and receiving multiple messages in a single operation using the bulk Pub/sub API.

Configuring bulk subscribe

When subscribing to a topic, you can configure bulkSubscribe options. Refer to Subscribing messages in bulk for more details. Learn more about the bulk subscribe API.

Apache Kafka supports the following bulk metadata options:

ConfigurationDefault
maxBulkAwaitDurationMs10000 (10s)
maxBulkSubCount80

Per-call metadata fields

Partition Key

When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata query param in the request url.

The param name is partitionKey.

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Message headers

All other metadata key/value pairs (that are not partitionKey) are set as headers in the Kafka message. Here is an example setting a correlationId for the message.

  1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Avro Schema Registry serialization/deserialization

You can configure pub/sub to publish or consume data encoded using Avro binary serialization, leveraging an Apache Schema Registry (for example, Confluent Schema Registry, Apicurio).

Configuration

Important

Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the rawPayload=true metadata must be passed.

When configuring the Kafka pub/sub component metadata, you must define:

  • The schema registry URL
  • The API key/secret, if applicable

Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named my-topic, the schema subject will be my-topic-value. When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component. Date/Datetime fields must be passed as their Epoch Unix timestamp equivalent (rather than typical Iso8601). For example:

  • 2024-01-10T04:36:05.986Z should be passed as 1704861365986 (the number of milliseconds since Jan 1st, 1970)
  • 2024-01-10 should be passed as 19732 (the number of days since Jan 1st, 1970)

Publishing Avro messages

In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the valueSchemaType metadata must be set to Avro.

  1. curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. req_data = {
  4. 'order_number': '345',
  5. 'created_date': 1704861365986
  6. }
  7. # Create a typed message with content type and body
  8. resp = d.publish_event(
  9. pubsub_name='pubsub',
  10. topic_name='my-topic',
  11. data=json.dumps(req_data),
  12. publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
  13. )
  14. # Print the request
  15. print(req_data, flush=True)

Subscribing to Avro topics

In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the valueSchemaType metadata must be set to Avro in the subscription metadata.

  1. from fastapi import APIRouter, Body, Response, status
  2. import json
  3. import sys
  4. app = FastAPI()
  5. router = APIRouter()
  6. @router.get('/dapr/subscribe')
  7. def subscribe():
  8. subscriptions = [{'pubsubname': 'pubsub',
  9. 'topic': 'my-topic',
  10. 'route': 'my_topic_subscriber',
  11. 'metadata': {
  12. 'rawPayload': 'true',
  13. 'valueSchemaType': 'Avro',
  14. } }]
  15. return subscriptions
  16. @router.post('/my_topic_subscriber')
  17. def my_topic_subscriber(event_data=Body()):
  18. print(event_data, flush=True)
  19. return Response(status_code=status.HTTP_200_OK)
  20. app.include_router(router)

Create a Kafka instance

You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide here.

To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

Last modified March 21, 2024: Merge pull request #4082 from newbe36524/v1.13 (f4b0938)