Apache Kafka

Detailed documentation on the Apache Kafka pubsub component

Component format

To setup Apache Kafka pubsub create a component of type pubsub.kafka. See this guide on how to create and apply a pubsub configuration. For details on using secretKeyRef, see the guide on how to reference secrets in components.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authRequired # Required.
  17. value: "true"
  18. - name: saslUsername # Required if authRequired is `true`.
  19. value: "adminuser"
  20. - name: saslPassword # Required if authRequired is `true`.
  21. secretKeyRef:
  22. name: kafka-secrets
  23. key: saslPasswordSecret
  24. - name: maxMessageBytes # Optional.
  25. value: 1024
  26. - name: consumeRetryInterval # Optional.
  27. value: 200ms
  28. - name: version # Optional.
  29. value: 0.10.2.0

Spec metadata fields

FieldRequiredDetailsExample
brokersYA comma-separated list of Kafka brokers.“localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093”
consumerGroupNA kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.“group1”
clientIDNA user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to “sarama”.“my-dapr-app”
authRequiredYEnable SASL authentication with the Kafka brokers.“true”, “false”
saslUsernameNThe SASL username used for authentication. Only required if authRequired is set to “true”.“adminuser”
saslPasswordNThe SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authRequired is set to “true”.“”, “KeFg23!”
initialOffsetNThe initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”.“oldest”
maxMessageBytesNThe maximum size in bytes allowed for a single Kafka message. Defaults to 1024.2048
consumeRetryIntervalNThe interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms.200ms
versionNKafka cluster version. Defaults to 2.0.0.00.10.2.0
caCertNCertificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientCertNClient certificate, required for using TLS. Can be secretKeyRef to use a secret reference“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientKeyNClient key, required for using TLS. Can be secretKeyRef to use a secret reference“——-BEGIN RSA PRIVATE KEY——-\n<base64-encoded PKCS8>\n——-END RSA PRIVATE KEY——-“
skipVerifyNSkip TLS verification, this is not recommended for use in production. Defaults to “false”“true”, “false”

Communication using TLS

To configure communication using TLS, ensure the Kafka broker is configured to support certificates. Pre-requisite includes certficate authority certificate, ca issued client certificate, client private key. Below is an example of a Kafka pubsub component configured to use TLS:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: kafka-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.kafka
  8. version: v1
  9. metadata:
  10. - name: brokers # Required. Kafka broker connection setting
  11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
  12. - name: consumerGroup # Optional. Used for input bindings.
  13. value: "group1"
  14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
  15. value: "my-dapr-app-id"
  16. - name: authRequired # Required.
  17. value: "true"
  18. - name: saslUsername # Required if authRequired is `true`.
  19. value: "adminuser"
  20. - name: consumeRetryInterval # Optional.
  21. value: 200ms
  22. - name: version # Optional.
  23. value: 0.10.2.0
  24. - name: saslPassword # Required if authRequired is `true`.
  25. secretKeyRef:
  26. name: kafka-secrets
  27. key: saslPasswordSecret
  28. - name: maxMessageBytes # Optional.
  29. value: 1024
  30. - name: caCert # Certificate authority certificate.
  31. secretKeyRef:
  32. name: kafka-tls
  33. key: caCert
  34. - name: clientCert # Client certificate.
  35. secretKeyRef:
  36. name: kafka-tls
  37. key: clientCert
  38. - name: clientKey # Client key.
  39. secretKeyRef:
  40. name: kafka-tls
  41. key: clientKey
  42. auth:
  43. secretStore: <SECRET_STORE_NAME>

The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.

Per-call metadata fields

Partition Key

When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata query param in the request url.

The param name is partitionKey.

Example:

  1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Create a Kafka instance

You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide here.

To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

Last modified November 12, 2021 : Merge pull request #1949 from willtsai/az-staticwebapp-versioning (c40e456)