kafka

Use sink kafka to send log data to downstream Kafka.

Example

  1. sink:
  2. type: kafka
  3. brokers: ["127.0.0.1:6400"]
  4. topic: "log-${fields.topic}"

brokers

fieldtyperequireddefaultdescription
brokersstring arraytruenoneBrokers address for sending logs to Kafka

topic

fieldtyperequireddefaultdescription
topicstringfalseloggieKafka topic

Use${a.b} to get the field value in the event as the specific topic name.

For example, an event is:

  1. {
  2. "topic": "loggie",
  3. "hello": "world"
  4. }

Configure topic: ${topic}. At this time, the topic of Kafka is “loggie”.

Also nested selection is supported:

  1. {
  2. "fields": {
  3. "topic": "loggie"
  4. },
  5. "hello": "world"
  6. }

Configure topic: ${fields.topic}, and the topic of Kafka is “loggie”.

balance

fieldtyperequireddefaultdescription
balancestringfalseroundRobinLoad balancing strategy, which can be hash, roundRobin, leastBytes

compression

fieldtyperequireddefaultdescription
compressionstringfalsegzipCompression strategy for sending logs to Kafka, which can be gzip, snappy, lz4, zstd

maxAttempts

fieldtyperequireddefaultdescription
maxAttemptsintfalse10max retries

batchSize

fieldtyperequireddefaultdescription
batchSizeintfalse100The maximum number of data contained in each batch when sending

batchBytes

fieldtyperequireddefaultdescription
batchBytesintfalse1048576Maximum number of bytes included in each send request

batchTimeout

fieldtyperequireddefaultdescription
batchTimeouttime.Durationfalse1sMaximum time to form each send batch

readTimeout

fieldtyperequireddefaultdescription
readTimeouttime.Durationfalse10sread timeout

writeTimeout

fieldtyperequireddefaultdescription
writeTimeouttime.Durationfalse10swrite timeout

requiredAcks

fieldtyperequireddefaultdescription
requiredAcksintfalse0ack waiting parameter, which can be 0,1, -1
  • 0: don’t ask for ack
  • 1: wait for leader partition ack
  • -1: wait for all replica acks in the ISR

sasl

fieldtyperequireddefaultdescription
saslfalseSASL authentication
sasl.typestringfalseSASL type, which can be plain, scram
sasl.userNamestringfalseusername
sasl.passwordstringfalsepassword
sasl.algorithmstringtrue when type=scramAlgorithm to use when type=scram. sha256 or sha512

partitionKey

fieldtyperequireddefaultdescription
partitionKeystringfalsecontroller which partition of topic to send message

Similar with topic. Use${a.b} to get the field value in the event as the specific partition key.