Add Kafka as a Receiver (i.e. Collector)

You can use Elasticsearch, Kafka and Fluentd as log receivers in KubeSphere. This tutorial demonstrates:

  • Deploy strimzi-kafka-operator and then create a Kafka cluster and a Kafka topic by creating Kafka and KafkaTopic CRDs.
  • Add Kafka as a log receiver to receive logs sent from Fluent Bit.
  • Verify whether the Kafka cluster is receiving logs using Kafkacat.

Prerequisites

  • You need an account granted a role including the authorization of Cluster Management. For example, you can log in to the console as admin directly or create a new role with the authorization and assign it to an account.
  • Before adding a log receiver, you need to enable any of the logging, events or auditing components. For more information, see Enable Pluggable Components. logging is enabled as an example in this tutorial.

Step 1: Create a Kafka Cluster and a Kafka Topic

You can use strimzi-kafka-operator to create a Kafka cluster and a Kafka topic. If you already have a Kafka cluster, you can start from the next step.

  1. Install strimzi-kafka-operator in the default namespace:

    1. helm repo add strimzi https://strimzi.io/charts/
    1. helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
  2. Create a Kafka cluster and a Kafka topic in the default namespace by running the following commands. The commands create Kafka and Zookeeper clusters with storage type ephemeral which is emptyDir for demonstration purposes. For other storage types in a production environment, refer to kafka-persistent.

    1. cat <<EOF | kubectl apply -f -
    2. apiVersion: kafka.strimzi.io/v1beta1
    3. kind: Kafka
    4. metadata:
    5. name: my-cluster
    6. namespace: default
    7. spec:
    8. kafka:
    9. version: 2.5.0
    10. replicas: 3
    11. listeners:
    12. plain: {}
    13. tls: {}
    14. config:
    15. offsets.topic.replication.factor: 3
    16. transaction.state.log.replication.factor: 3
    17. transaction.state.log.min.isr: 2
    18. log.message.format.version: '2.5'
    19. storage:
    20. type: ephemeral
    21. zookeeper:
    22. replicas: 3
    23. storage:
    24. type: ephemeral
    25. entityOperator:
    26. topicOperator: {}
    27. userOperator: {}
    28. ---
    29. apiVersion: kafka.strimzi.io/v1beta1
    30. kind: KafkaTopic
    31. metadata:
    32. name: my-topic
    33. namespace: default
    34. labels:
    35. strimzi.io/cluster: my-cluster
    36. spec:
    37. partitions: 3
    38. replicas: 3
    39. config:
    40. retention.ms: 7200000
    41. segment.bytes: 1073741824
    42. EOF
  3. Run the following command to check Pod status and wait for Kafka and Zookeeper are all up and running.

    1. $ kubectl -n default get pod
    2. NAME READY STATUS RESTARTS AGE
    3. my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
    4. my-cluster-kafka-0 2/2 Running 0 69m
    5. my-cluster-kafka-1 2/2 Running 0 69m
    6. my-cluster-kafka-2 2/2 Running 0 69m
    7. my-cluster-zookeeper-0 1/1 Running 0 71m
    8. my-cluster-zookeeper-1 1/1 Running 1 71m
    9. my-cluster-zookeeper-2 1/1 Running 1 71m
    10. strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m

    Run the following command to check the metadata of the Kafka cluster:

    1. kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092

Step 2: Add Kafka as a Log Receiver

  1. Log in to KubeSphere as admin. Click Platform in the top left corner and select Cluster Management.

  2. If you have enabled the multi-cluster feature, you can select a specific cluster. If you have not enabled the feature, refer to the next step directly.

  3. On the Cluster Management page, go to Log Collections in Cluster Settings.

  4. Click Add Log Collector and select Kafka. Input the Kafka broker address and port as below, and then click OK to continue.

    1. my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092
    2. my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092
    3. my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092

    add-kafka

  5. Run the following commands to verify whether the Kafka cluster is receiving logs sent from Fluent Bit:

    1. # Start a util container
    2. kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
    3. # Install Kafkacat in the util container
    4. apt-get install kafkacat
    5. # Run the following command to consume log messages from kafka topic: my-topic
    6. kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic