Add Kafka as Receiver (aka Collector)

KubeSphere supports using Elasticsearch, Kafka and Fluentd as log receivers. This doc will demonstrate:

  • Deploy strimzi-kafka-operator and then create a Kafka cluster and a Kafka topic by creating Kafka and KafkaTopic CRDs.
  • Add Kafka log receiver to receive logs sent from Fluent Bit
  • Verify whether the Kafka cluster is receiving logs using Kafkacat

Prerequisite

Before adding a log receiver, you need to enable any of the logging, events or auditing components following Enable Pluggable Components. The logging component is enabled as an example in this doc.

Step 1: Create a Kafka cluster and a Kafka topic

备注

If you already have a Kafka cluster, you can start from Step 2.

You can use strimzi-kafka-operator to create a Kafka cluster and a Kafka topic

  1. Install strimzi-kafka-operator to the default namespace:
  1. helm repo add strimzi https://strimzi.io/charts/
  2. helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
  1. Create a Kafka cluster and a Kafka topic in the default namespace:

To deploy a Kafka cluster and create a Kafka topic, you simply need to open the kubectl console in KubeSphere Toolbox and run the following command:

备注

The following will create Kafka and Zookeeper clusters with storage type ephemeral which is emptydir for demo purpose. You should use other storage types for production, please refer to kafka-persistent

  1. cat <<EOF | kubectl apply -f -
  2. apiVersion: kafka.strimzi.io/v1beta1
  3. kind: Kafka
  4. metadata:
  5. name: my-cluster
  6. namespace: default
  7. spec:
  8. kafka:
  9. version: 2.5.0
  10. replicas: 3
  11. listeners:
  12. plain: {}
  13. tls: {}
  14. config:
  15. offsets.topic.replication.factor: 3
  16. transaction.state.log.replication.factor: 3
  17. transaction.state.log.min.isr: 2
  18. log.message.format.version: '2.5'
  19. storage:
  20. type: ephemeral
  21. zookeeper:
  22. replicas: 3
  23. storage:
  24. type: ephemeral
  25. entityOperator:
  26. topicOperator: {}
  27. userOperator: {}
  28. ---
  29. apiVersion: kafka.strimzi.io/v1beta1
  30. kind: KafkaTopic
  31. metadata:
  32. name: my-topic
  33. namespace: default
  34. labels:
  35. strimzi.io/cluster: my-cluster
  36. spec:
  37. partitions: 3
  38. replicas: 3
  39. config:
  40. retention.ms: 7200000
  41. segment.bytes: 1073741824
  42. EOF
  1. Run the following command to wait for Kafka and Zookeeper pods are all up and runing:
  1. kubectl -n default get pod
  2. NAME READY STATUS RESTARTS AGE
  3. my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
  4. my-cluster-kafka-0 2/2 Running 0 69m
  5. my-cluster-kafka-1 2/2 Running 0 69m
  6. my-cluster-kafka-2 2/2 Running 0 69m
  7. my-cluster-zookeeper-0 1/1 Running 0 71m
  8. my-cluster-zookeeper-1 1/1 Running 1 71m
  9. my-cluster-zookeeper-2 1/1 Running 1 71m
  10. strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m

Then run the follwing command to find out metadata of kafka cluster

  1. kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092
  1. Add Kafka as logs receiver: Click Add Log Collector and then select Kafka, input Kafka broker address and port like below:
  1. my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092
  2. my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092
  3. my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092

Add Kafka

  1. Run the following command to verify whether the Kafka cluster is receiving logs sent from Fluent Bit:
  1. # Start a util container
  2. kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
  3. # Install Kafkacat in the util container
  4. apt-get install kafkacat
  5. # Run the following command to consume log messages from kafka topic: my-topic
  6. kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic