Apache Kafka Sink

This page shows how to install and configure an Apache KafkaSink.

Prerequisites

You must have access to a Kubernetes cluster with Knative Eventing installed.

Installation

  1. Install the Kafka controller:

    1. kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.24.0/eventing-kafka-controller.yaml
  2. Install the KafkaSink data plane:

    1. kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.24.0/eventing-kafka-sink.yaml
  3. Verify that kafka-controller and kafka-sink-receiver Deployments are running:

    1. kubectl get deployments.apps -n knative-eventing

    Example output:

    1. NAME READY UP-TO-DATE AVAILABLE AGE
    2. eventing-controller 1/1 1 1 10s
    3. eventing-webhook 1/1 1 1 9s
    4. kafka-controller 1/1 1 1 3s
    5. kafka-sink-receiver 1/1 1 1 5s

KafkaSink example

A KafkaSink object looks similar to the following:

  1. apiVersion: eventing.knative.dev/v1alpha1
  2. kind: KafkaSink
  3. metadata:
  4. name: my-kafka-sink
  5. namespace: default
  6. spec:
  7. topic: mytopic
  8. bootstrapServers:
  9. - my-cluster-kafka-bootstrap.kafka:9092

Security

Knative supports the following Apache Kafka security features:

Enabling security features

To enable security features, in the KafkaSink spec, you can reference a secret:

  1. apiVersion: eventing.knative.dev/v1alpha1
  2. kind: KafkaSink
  3. metadata:
  4. name: my-kafka-sink
  5. namespace: default
  6. spec:
  7. topic: mytopic
  8. bootstrapServers:
  9. - my-cluster-kafka-bootstrap.kafka:9092
  10. auth:
  11. secret:
  12. ref:
  13. name: my_secret

Note

The secret my_secret must exist in the same namespace of the KafkaSink. Certificates and keys must be in PEM format._

Authentication using SASL

Knative supports the following SASL mechanisms:

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

To use a specific SASL mechanism replace <sasl_mechanism> with the mechanism of your choice.

Authentication using SASL without encryption

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SASL_PLAINTEXT \
  3. --from-literal=sasl.mechanism=<sasl_mechanism> \
  4. --from-literal=user=<my_user> \
  5. --from-literal=password=<my_password>

Authentication using SASL and encryption using SSL

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SASL_SSL \
  3. --from-literal=sasl.mechanism=<sasl_mechanism> \
  4. --from-file=ca.crt=caroot.pem \
  5. --from-literal=user=<my_user> \
  6. --from-literal=password=<my_password>

Encryption using SSL without client authentication

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SSL \
  3. --from-file=ca.crt=<my_caroot.pem_file_path> \
  4. --from-literal=user.skip=true

Authentication and encryption using SSL

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SSL \
  3. --from-file=ca.crt=<my_caroot.pem_file_path> \
  4. --from-file=user.crt=<my_cert.pem_file_path> \
  5. --from-file=user.key=<my_key.pem_file_path>

Note

The ca.crt can be omitted to enable fallback and use the system’s root CA set.

Kafka Producer configurations

A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. You can change the configuration for Kafka Producers in your cluster by modifying the config-kafka-sink-data-plane ConfigMap in the knative-eventing namespace.

Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, Producer configurations.

Enable debug logging for data plane components

To enable debug logging for data plane components change the logging level to DEBUG in the kafka-config-logging ConfigMap.

  1. Apply the kafka-config-logging ConfigMap by running the command:

    1. kubectl apply -f - <<EOF
    2. apiVersion: v1
    3. kind: ConfigMap
    4. metadata:
    5. name: kafka-config-logging
    6. namespace: knative-eventing
    7. data:
    8. config.xml: |
    9. <configuration>
    10. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
    11. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
    12. </appender>
    13. <root level="DEBUG">
    14. <appender-ref ref="jsonConsoleAppender"/>
    15. </root>
    16. </configuration>
    17. EOF
  2. Restart the kafka-sink-receiver:

    1. kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver