Kafka binding spec

Detailed documentation on the Kafka binding component

Component format

To setup Kafka binding create a component of type bindings.kafka. See this guide on how to create and apply a binding configuration. For details on using secretKeyRef, see the guide on how to reference secrets in components.

All component metadata field values can carry templated metadata values, which are resolved on Dapr sidecar startup. For example, you can choose to use {namespace} as the consumerGroup, to enable using the same appId in different namespaces using the same topics as described in this article.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-binding
  5. spec:
  6. type: bindings.kafka
  7. version: v1
  8. metadata:
  9. - name: topics # Optional. Used for input bindings.
  10. value: "topic1,topic2"
  11. - name: brokers # Required.
  12. value: "localhost:9092,localhost:9093"
  13. - name: consumerGroup # Optional. Used for input bindings.
  14. value: "group1"
  15. - name: publishTopic # Optional. Used for output bindings.
  16. value: "topic3"
  17. - name: authRequired # Required.
  18. value: "true"
  19. - name: saslUsername # Required if authRequired is `true`.
  20. value: "user"
  21. - name: saslPassword # Required if authRequired is `true`.
  22. secretKeyRef:
  23. name: kafka-secrets
  24. key: "saslPasswordSecret"
  25. - name: saslMechanism
  26. value: "SHA-512"
  27. - name: initialOffset # Optional. Used for input bindings.
  28. value: "newest"
  29. - name: maxMessageBytes # Optional.
  30. value: "1024"
  31. - name: version # Optional.
  32. value: "2.0.0"
  33. - name: direction
  34. value: "input, output"
  35. - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
  36. value: http://localhost:8081
  37. - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
  38. value: XYAXXAZ
  39. - name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
  40. value: "ABCDEFGMEADFF"
  41. - name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
  42. value: true
  43. - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
  44. value: 5m

Spec metadata fields

FieldRequiredBinding supportDetailsExample
topicsNInputA comma-separated string of topics.“mytopic1,topic2”
brokersYInput/OutputA comma-separated string of Kafka brokers.“localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093”
clientIDNInput/OutputA user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.“my-dapr-app”
consumerGroupNInputA kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.“group1”
consumeRetryEnabledNInput/OutputEnable consume retry by setting to “true”. Default to false in Kafka binding component.“true”, “false”
publishTopicYOutputThe topic to publish to.“mytopic”
authRequiredNDeprecatedEnable SASL authentication with the Kafka brokers.“true”, “false”
authTypeYInput/OutputConfigure or disable authentication. Supported values: none, password, mtls, or oidc“password”, “none”
saslUsernameNInput/OutputThe SASL username used for authentication. Only required if authRequired is set to “true”.“adminuser”
saslPasswordNInput/OutputThe SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authRequired is set to “true”.“”, “KeFg23!”
saslMechanismNInput/OutputThe SASL authentication mechanism you’d like to use. Only required if authtype is set to “password”. If not provided, defaults to PLAINTEXT, which could cause a break for some services, like Amazon Managed Service for Kafka.“SHA-512”, “SHA-256”, “PLAINTEXT”
initialOffsetNInputThe initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”.“oldest”
maxMessageBytesNInput/OutputThe maximum size in bytes allowed for a single Kafka message. Defaults to 1024.“2048”
oidcTokenEndpointNInput/OutputFull URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidchttps://identity.example.com/v1/token”
oidcClientIDNInput/OutputThe OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc“dapr-kafka”
oidcClientSecretNInput/OutputThe OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc“KeFg23!”
oidcScopesNInput/OutputComma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to “openid”“openid,kafka-prod”
versionNInput/OutputKafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to 1.0.0 for EventHubs with Kafka.“1.0.0”
directionNInput/OutputThe direction of the binding.“input”, “output”, “input, output”
oidcExtensionsNInput/OutputString containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token{“cluster”:”kafka”,”poolid”:”kafkapool”}
schemaRegistryURLNRequired when using Schema Registry Avro serialization/deserialization. The Schema Registry URL.http://localhost:8081
schemaRegistryAPIKeyNWhen using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key.XYAXXAZ
schemaRegistryAPISecretNWhen using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.ABCDEFGMEADFF
schemaCachingEnabledNWhen using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is truetrue
schemaLatestVersionCacheTTLNWhen using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min5m

Note

The metadata version must be set to 1.0.0 when using Azure EventHubs with Kafka.

Binding support

This component supports both input and output binding interfaces.

This component supports output binding with the following operations:

  • create

Authentication

Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. Learn more about Kafka’s authentication method for both the Kafka binding and Kafka pub/sub components.

Specifying a partition key

When invoking the Kafka binding, its possible to provide an optional partition key by using the metadata section in the request body.

The field name is partitionKey.

Example:

  1. curl -X POST http://localhost:3500/v1.0/bindings/myKafka \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. },
  7. "metadata": {
  8. "partitionKey": "key1"
  9. },
  10. "operation": "create"
  11. }'

Response

An HTTP 204 (No Content) and empty body will be returned if successful.

Last modified March 21, 2024: Merge pull request #4082 from newbe36524/v1.13 (f4b0938)