Apache Kafka source example

Tutorial on how to build and deploy a KafkaSource event source.

Background

The KafkaSource reads all the messages, from all partitions, and sends those messages as CloudEvents through HTTP to its configured sink. The KafkaSource supports an ordered consumer delivery guaranty, which is a per-partition blocking consumer that waits for a successful response from the CloudEvent subscriber before it delivers the next message of the partition.

Note

If you need a more sophisticated Kafka Consumer, with direct access to specific partitions or offsets, you can implement a Kafka Consumer by using one of the available Apache Kafka SDKs.

Install the Kafka event source CRD

  • Set up a Kubernetes cluster with the Kafka event source installed. You can install the Kafka event source by using YAML or the Knative Operator.

Optional: Create a Kafka topic

If you are using Strimzi:

  1. Create a KafkaTopic YAML file:

    1. apiVersion: kafka.strimzi.io/v1beta2
    2. kind: KafkaTopic
    3. metadata:
    4. name: knative-demo-topic
    5. namespace: kafka
    6. labels:
    7. strimzi.io/cluster: my-cluster
    8. spec:
    9. partitions: 3
    10. replicas: 1
    11. config:
    12. retention.ms: 7200000
    13. segment.bytes: 1073741824
  2. Deploy the KafkaTopic YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of your KafkaTopic YAML file.

    Example output:

    1. kafkatopic.kafka.strimzi.io/knative-demo-topic created
  3. Ensure that the KafkaTopic is running by running the command:

    1. kubectl -n kafka get kafkatopics.kafka.strimzi.io

    Example output:

    1. NAME AGE
    2. knative-demo-topic 16s

Create a Service

  1. Clone the sample code GitHub repository, and navigate to the local directory of the repository:

    1. git clone -b "release-1.1" https://github.com/knative/docs knative-docs
    1. cd knative-docs/code-samples/eventing/kafka/source

    2. Create the event-display Service as a YAML file:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. namespace: default
    6. spec:
    7. template:
    8. spec:
    9. containers:
    10. - # This corresponds to
    11. # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
    12. image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
  2. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

    Example output:

    1. service.serving.knative.dev/event-display created
  3. Ensure that the Service Pod is running, by running the command:

    1. kubectl get pods

    The Pod name is prefixed with event-display:

    1. NAME READY STATUS RESTARTS AGE
    2. event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s

Kafka event source

  1. Modify source/event-source.yaml accordingly with bootstrap servers, topics, and so on:

    1. apiVersion: sources.knative.dev/v1beta1
    2. kind: KafkaSource
    3. metadata:
    4. name: kafka-source
    5. spec:
    6. consumerGroup: knative-group
    7. bootstrapServers:
    8. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
    9. topics:
    10. - knative-demo-topic
    11. sink:
    12. ref:
    13. apiVersion: serving.knative.dev/v1
    14. kind: Service
    15. name: event-display
  2. Deploy the event source:

    1. kubectl apply -f event-source.yaml

    Example output:

    1. kafkasource.sources.knative.dev/kafka-source created
  3. Verify that the event source Pod is running:

    1. kubectl get pods

    The Pod name is prefixed with kafka-source:

    1. NAME READY STATUS RESTARTS AGE
    2. kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
  4. Ensure that the Kafka event source started with the necessary configuration:

    1. kubectl logs --selector='knative-eventing-source-name=kafka-source'

    Example output:

    1. {"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}

Verify

  1. Produce a message ({"msg": "This is a test!"}) to the Apache Kafka topic as in the following example:

    1. kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic

    Tip

    If you don’t see a command prompt, try pressing Enter.

  2. Verify that the Kafka event source consumed the message and sent it to its Sink properly. Because these logs are captured in debug level, edit the key level of config-logging ConfigMap in knative-sources namespace to look like this:

    1. data:
    2. loglevel.controller: info
    3. loglevel.webhook: info
    4. zap-logger-config: |
    5. {
    6. "level": "debug",
    7. "development": false,
    8. "outputPaths": ["stdout"],
    9. "errorOutputPaths": ["stderr"],
    10. "encoding": "json",
    11. "encoderConfig": {
    12. "timeKey": "ts",
    13. "levelKey": "level",
    14. "nameKey": "logger",
    15. "callerKey": "caller",
    16. "messageKey": "msg",
    17. "stacktraceKey": "stacktrace",
    18. "lineEnding": "",
    19. "levelEncoder": "",
    20. "timeEncoder": "iso8601",
    21. "durationEncoder": "",
    22. "callerEncoder": ""
    23. }
    24. }
  3. Manually delete the Kafka source deployment and allow the kafka-controller-manager deployment running in the knative-sources namespace to redeploy it. Debug level logs should be visible now.

    1. kubectl logs --selector='knative-eventing-source-name=kafka-source'

    Example output:

    1. {"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Message claimed","topic":".","value":"."}
    2. {"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Message marked","topic":".","value":"."}
  4. Verify that the Service received the message from the event source:

    1. kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container

    Example output:

    1. ☁️ cloudevents.Event
    2. Validation: valid
    3. Context Attributes,
    4. specversion: 1.0
    5. type: dev.knative.kafka.event
    6. source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
    7. subject: partition:0#564
    8. id: partition:0/offset:564
    9. time: 2020-02-10T18:10:23.861866615Z
    10. datacontenttype: application/json
    11. Extensions,
    12. key:
    13. Data,
    14. {
    15. "msg": "This is a test!"
    16. }

Clean up steps

  1. Delete the Kafka event source:

    1. kubectl delete -f source/source.yaml kafkasource.sources.knative.dev

    Example output:

    1. "kafka-source" deleted
  2. Delete the event-display Service:

    1. kubectl delete -f source/event-display.yaml service.serving.knative.dev

    Example output:

    1. "event-display" deleted
  3. Remove the Kafka event controller:

    1. kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

    Example output:

    1. serviceaccount "kafka-controller-manager" deleted
    2. clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller"
    3. deleted clusterrolebinding.rbac.authorization.k8s.io
    4. "eventing-sources-kafka-controller" deleted
    5. customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev"
    6. deleted service "kafka-controller" deleted statefulset.apps
    7. "kafka-controller-manager" deleted
  4. Optional: Remove the Apache Kafka Topic

    1. kubectl delete -f kafka-topic.yaml

    Example output:

    1. kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted

Optional: Specify the key deserializer

When KafkaSource receives a message from Kafka, it dumps the key in the Event extension called Key and dumps Kafka message headers in the extensions starting with kafkaheader.

You can specify the key deserializer among four types:

  • string (default) for UTF-8 encoded strings
  • int for 32-bit & 64-bit signed integers
  • float for 32-bit & 64-bit floating points
  • byte-array for a Base64 encoded byte array

To specify the key deserializer, add the label kafkasources.sources.knative.dev/key-type to the KafkaSource definition, as shown in the following example:

  1. apiVersion: sources.knative.dev/v1beta1
  2. kind: KafkaSource
  3. metadata:
  4. name: kafka-source
  5. labels:
  6. kafkasources.sources.knative.dev/key-type: int
  7. spec:
  8. consumerGroup: knative-group
  9. bootstrapServers:
  10. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
  11. topics:
  12. - knative-demo-topic
  13. sink:
  14. ref:
  15. apiVersion: serving.knative.dev/v1
  16. kind: Service
  17. name: event-display

Optional: Specify the initial offset

By default the KafkaSource starts consuming from the latest offset in each partition. If you want to consume from the earliest offset, set the initialOffset field to earliest, for example:

  1. apiVersion: sources.knative.dev/v1beta1
  2. kind: KafkaSource
  3. metadata:
  4. name: kafka-source
  5. spec:
  6. consumerGroup: knative-group
  7. initialOffset: earliest
  8. bootstrapServers:
  9. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
  10. topics:
  11. - knative-demo-topic
  12. sink:
  13. ref:
  14. apiVersion: serving.knative.dev/v1
  15. kind: Service
  16. name: event-display

Note

The valid values for initialOffset are earliest and latest. Any other value results in a validation error. This field is honored only if there are no prior committed offsets for that consumer group.

Connecting to a TLS-enabled Kafka Broker

The KafkaSource supports TLS and SASL authentication methods. To enable TLS authentication, you must have the following files:

  • CA Certificate
  • Client Certificate and Key

KafkaSource expects these files to be in PEM format. If they are in another format, such as JKS, convert them to PEM.

  1. Create the certificate files as secrets in the namespace where KafkaSource is going to be set up, by running the commands:

    1. kubectl create secret generic cacert --from-file=caroot.pem
    1. kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
  2. Apply the KafkaSource. Modify the bootstrapServers and topics fields accordingly.

    1. apiVersion: sources.knative.dev/v1beta1
    2. kind: KafkaSource
    3. metadata:
    4. name: kafka-source-with-tls
    5. spec:
    6. net:
    7. tls:
    8. enable: true
    9. cert:
    10. secretKeyRef:
    11. key: tls.crt
    12. name: kafka-secret
    13. key:
    14. secretKeyRef:
    15. key: tls.key
    16. name: kafka-secret
    17. caCert:
    18. secretKeyRef:
    19. key: caroot.pem
    20. name: cacert
    21. consumerGroup: knative-group
    22. bootstrapServers:
    23. - my-secure-kafka-bootstrap.kafka:443
    24. topics:
    25. - knative-demo-topic
    26. sink:
    27. ref:
    28. apiVersion: serving.knative.dev/v1
    29. kind: Service
    30. name: event-display