Apache Kafka Broker

The Apache Kafka Broker is a native Broker implementation, that reduces network hops, supports any Kafka version, and has a better integration with Apache Kafka for the Knative Broker and Trigger model.

Notable features are:

Prerequisites

  1. Installing Eventing using YAML files.
  2. An Apache Kafka cluster (if you’re just getting started you can follow Strimzi Quickstart page).

Installation

  1. Install the Kafka controller by entering the following command:

    1. kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.22.0/eventing-kafka-controller.yaml
  2. Install the Kafka Broker data plane by entering the following command:

    1. kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.22.0/eventing-kafka-broker.yaml
  3. Verify that kafka-controller, kafka-broker-receiver and kafka-broker-dispatcher are running, by entering the following command:

    1. kubectl get deployments.apps -n knative-eventing

    Example output:

    1. NAME READY UP-TO-DATE AVAILABLE AGE
    2. eventing-controller 1/1 1 1 10s
    3. eventing-webhook 1/1 1 1 9s
    4. kafka-controller 1/1 1 1 3s
    5. kafka-broker-dispatcher 1/1 1 1 4s
    6. kafka-broker-receiver 1/1 1 1 5s

Create a Kafka Broker

A Kafka Broker object looks like this:

  1. apiVersion: eventing.knative.dev/v1
  2. kind: Broker
  3. metadata:
  4. annotations:
  5. # case-sensitive
  6. eventing.knative.dev/broker.class: Kafka
  7. name: default
  8. namespace: default
  9. spec:
  10. # Configuration specific to this broker.
  11. config:
  12. apiVersion: v1
  13. kind: ConfigMap
  14. name: kafka-broker-config
  15. namespace: knative-eventing
  16. # Optional dead letter sink, you can specify either:
  17. # - deadLetterSink.ref, which is a reference to a Callable
  18. # - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  19. delivery:
  20. deadLetterSink:
  21. ref:
  22. apiVersion: serving.knative.dev/v1
  23. kind: Service
  24. name: dlq-service

spec.config should reference any ConfigMap that looks like the following:

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: kafka-broker-config
  5. namespace: knative-eventing
  6. data:
  7. # Number of topic partitions
  8. default.topic.partitions: "10"
  9. # Replication factor of topic messages.
  10. default.topic.replication.factor: "1"
  11. # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  12. bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

The above ConfigMap is installed in the cluster. You can edit the configuration or create a new one with the same values depending on your needs.

NOTE: The default.topic.replication.factor value must be less than or equal to the number of Kafka broker instances in your cluster. For example, if you only have one Kafka broker, the default.topic.replication.factor value should not be more than 1.

Set as default broker implementation

To set the Kafka broker as the default implementation for all brokers in the Knative deployment, you can apply global settings by modifying the config-br-defaults ConfigMap in the knative-eventing namespace.

This allows you to avoid configuring individual or per-namespace settings for each broker, such as metadata.annotations.eventing.knative.dev/broker.class or spec.config.

The following YAML is an example of a config-br-defaults ConfigMap using Kafka broker as the default implementation.

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: config-br-defaults
  5. namespace: knative-eventing
  6. data:
  7. default-br-config: |
  8. clusterDefault:
  9. brokerClass: Kafka
  10. apiVersion: v1
  11. kind: ConfigMap
  12. name: kafka-broker-config
  13. namespace: knative-eventing
  14. namespaceDefaults:
  15. namespace1:
  16. brokerClass: Kafka
  17. apiVersion: v1
  18. kind: ConfigMap
  19. name: kafka-broker-config
  20. namespace: knative-eventing
  21. namespace2:
  22. brokerClass: Kafka
  23. apiVersion: v1
  24. kind: ConfigMap
  25. name: kafka-broker-config
  26. namespace: knative-eventing

Security

Apache Kafka supports different security features, Knative supports the followings:

To enable security features, in the ConfigMap referenced by broker.spec.config, we can reference a Secret:

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: kafka-broker-config
  5. namespace: knative-eventing
  6. data:
  7. # Other configurations
  8. # ...
  9. # Reference a Secret called my_secret
  10. auth.secret.ref.name: my_secret

The Secret my_secret must exist in the same namespace of the ConfigMap referenced by broker.spec.config, in this case: knative-eventing.

Note: Certificates and keys must be in PEM format.

Authentication using SASL

Knative supports the following SASL mechanisms:

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

To use a specific SASL mechanism replace <sasl_mechanism> with the mechanism of your choice.

Authentication using SASL without encryption

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SASL_PLAINTEXT \
  3. --from-literal=sasl.mechanism=<sasl_mechanism> \
  4. --from-literal=user=<my_user> \
  5. --from-literal=password=<my_password>

Authentication using SASL and encryption using SSL

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SASL_SSL \
  3. --from-literal=sasl.mechanism=<sasl_mechanism> \
  4. --from-file=ca.crt=caroot.pem \
  5. --from-literal=user=<my_user> \
  6. --from-literal=password=<my_password>

Encryption using SSL without client authentication

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SSL \
  3. --from-file=ca.crt=<my_caroot.pem_file_path> \
  4. --from-literal=user.skip=true

Authentication and encryption using SSL

  1. kubectl create secret --namespace <namespace> generic <my_secret> \
  2. --from-literal=protocol=SSL \
  3. --from-file=ca.crt=<my_caroot.pem_file_path> \
  4. --from-file=user.crt=<my_cert.pem_file_path> \
  5. --from-file=user.key=<my_key.pem_file_path>

NOTE: ca.crt can be omitted to fallback to use system’s root CA set.

Kafka Producer and Consumer configurations

Knative exposes all available Kafka producer and consumer configurations that can be modified to suit your workloads.

You can change these configurations by modifying the config-kafka-broker-data-plane ConfigMap in the knative-eventing namespace.

Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, Producer configurations and Consumer configurations.

Enable debug logging for data plane components

The following YAML shows the default logging configuration for data plane components, that is created during the installation step:

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: kafka-config-logging
  5. namespace: knative-eventing
  6. data:
  7. config.xml: |
  8. <configuration>
  9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
  10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
  11. </appender>
  12. <root level="INFO">
  13. <appender-ref ref="jsonConsoleAppender"/>
  14. </root>
  15. </configuration>

To change the logging level to DEBUG, you must:

  1. Apply the following kafka-config-logging ConfigMap or replace level="INFO" with level="DEBUG" to the ConfigMap kafka-config-logging:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-config-logging
    5. namespace: knative-eventing
    6. data:
    7. config.xml: |
    8. <configuration>
    9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
    10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
    11. </appender>
    12. <root level="DEBUG">
    13. <appender-ref ref="jsonConsoleAppender"/>
    14. </root>
    15. </configuration>
  2. Restart the kafka-broker-receiver and the kafka-broker-dispatcher, by entering the following commands:

    1. kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
    2. kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher

Additional information