kafka

You can use the Apache Kafka source (kafka) in Data Prepper to read records from one or more Kafka topics. These records hold events that your Data Prepper pipeline can ingest. The kafka source uses Kafka’s Consumer API to consume messages from the Kafka broker, which then creates Data Prepper events for further processing by the Data Prepper pipeline.

Usage

The following example shows the kafka source in a Data Prepper pipeline:

  1. kafka-pipeline:
  2. source:
  3. kafka:
  4. bootstrap_servers:
  5. - 127.0.0.1:9093
  6. topics:
  7. - name: Topic1
  8. group_id: groupID1
  9. - name: Topic2
  10. group_id: groupID1

Configuration

Use the following configuration options with the kafka source.

OptionRequiredTypeDescription
bootstrap_serversYes, when not using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as a cluster.IP addressThe host or port for the initial connection to the Kafka cluster. You can configure multiple Kafka brokers by using the IP address or port number for each broker. When using Amazon MSK as your Kafka cluster, the bootstrap server information is obtained from MSK using the MSK Amazon Resource Name (ARN) provided in the configuration.
topicsYesJSON arrayThe Kafka topics that the Data Prepper kafka source uses to read messages. You can configure up to 10 topics. For more information about topics configuration options, see Topics.
schemaNoJSON objectThe schema registry configuration. For more information, see Schema.
authenticationNoJSON objectSet the authentication options for both the pipeline and Kafka. For more information, see Authentication.
encryptionNoJSON objectThe encryption configuration. For more information, see Encryption.
awsNoJSON objectThe AWS configuration. For more information, see aws.
acknowledgmentsNoBooleanIf true, enables the kafka source to receive end-to-end acknowledgments when events are received by OpenSearch sinks. Default is false.
client_dns_lookupYes, when a DNS alias is used.StringSets Kafka’s client.dns.lookup option. Default is default.

Topics

Use the following options in the topics array.

OptionRequiredTypeDescription
nameYesStringThe name of each Kafka topic.
group_idYesStringSets Kafka’s group.id option.
workersNoIntegerThe number of multithreaded consumers associated with each topic. Default is 2. The maximum value is 200.
serde_formatNoStringIndicates the serialization and deserialization format of the messages in the topic. Default is plaintext.
auto_commitNoBooleanWhen false, the consumer’s offset will not be periodically committed to Kafka in the background. Default is false.
commit_intervalNoIntegerWhen auto_commit is set to true, sets how frequently, in seconds, the consumer offsets are auto-committed to Kafka through Kafka’s auto.commit.interval.ms option. Default is 5s.
session_timeoutNoIntegerThe amount of time during which the source detects client failures when using Kafka’s group management features, which can be used to balance the data stream. Default is 45s.
auto_offset_resetNoStringAutomatically resets the offset to an earlier or the latest offset through Kafka’s auto.offset.reset option. Default is latest.
thread_waiting_timeNoIntegerThe amount of time that threads wait for the preceding thread to complete its task and to signal the next thread. The Kafka consumer API poll timeout value is set to half of this setting. Default is 5s.
max_partition_fetch_bytesNoIntegerSets the maximum limit in megabytes for max data returns from each partition through Kafka’s max.partition.fetch.bytes setting. Default is 1mb.
heart_beat_intervalNoIntegerThe expected amount of time between heartbeats to the consumer coordinator when using Kafka’s group management facilities through Kafka’s heartbeat.interval.ms setting. Default is 5s.
fetch_max_waitNoIntegerThe maximum amount of time during which the server blocks a fetch request when there isn’t sufficient data to satisfy the fetch_min_bytes requirement through Kafka’s fetch.max.wait.ms setting. Default is 500ms.
fetch_max_bytesNoIntegerThe maximum record size accepted by the broker through Kafka’s fetch.max.bytes setting. Default is 50mb.
fetch_min_bytesNoIntegerThe minimum amount of data the server returns during a fetch request through Kafka’s retry.backoff.ms setting. Default is 1b.
retry_backoffNoIntegerThe amount of time to wait before attempting to retry a failed request to a given topic partition. Default is 10s.
max_poll_intervalNoIntegerThe maximum delay between invocations of a poll() when using group management through Kafka’s max.poll.interval.ms option. Default is 300s.
consumer_max_poll_recordsNoIntegerThe maximum number of records returned in a single poll() call through Kafka’s max.poll.records setting. Default is 500.
key_modeNoStringIndicates how the key field of the Kafka message should be handled. The default setting is include_as_field, which includes the key in the kafka_key event. The include_as_metadata setting includes the key in the event’s metadata. The discard setting discards the key.

Schema

The following option is required inside the schema configuration.

OptionTypeDescription
typeStringSets the type of schema based on your registry, either the AWS Glue Schema Registry, aws_glue, or the Confluent Schema Registry, confluent. When using the aws_glue registry, set any AWS configuration options.

The following configuration options are only required when using a confluent registry.

OptionTypeDescription
registry_urlStringDeserializes a record value from a bytearray into a string. Default is org.apache.kafka.common.serialization.StringDeserializer.
versionStringDeserializes a record key from a bytearray into a string. Default is org.apache.kafka.common.serialization.StringDeserializer.
schema_registry_api_keyStringThe schema registry API key.
schema_registry_api_secretStringThe schema registry API secret.

Authentication

The following option is required inside the authentication object.

OptionTypeDescription
saslJSON objectThe Simple Authentication and Security Layer (SASL) authentication configuration.

SASL

Use one of the following options when configuring SASL authentication.

OptionTypeDescription
plaintextJSON objectThe PLAINTEXT authentication configuration.
aws_msk_iamStringThe Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to role, the sts_role_arm set in the aws configuration is used. Default is default.

SASL PLAINTEXT

The following options are required when using the SASL PLAINTEXT protocol.

OptionTypeDescription
usernameStringThe username for the PLAINTEXT auth.
passwordStringThe password for the PLAINTEXT auth.

Encryption

Use the following options when setting SSL encryption.

OptionRequiredTypeDescription
typeNoStringThe encryption type. Use none to disable encryption. Default is ssl.
InsecureNoBooleanA Boolean flag used to turn off SSL certificate verification. If set to true, certificate authority (CA) certificate verification is turned off and insecure HTTP requests are sent. Default is false.

AWS

Use the following options when setting up authentication for aws services.

OptionRequiredTypeDescription
regionNoStringThe AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region.
sts_role_arnNoStringThe AWS Security Token Service (AWS STS) role to assume for requests to Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Storage Service (Amazon S3). Default is null, which will use the standard SDK behavior for credentials.
mskNoJSON objectThe MSK configuration settings.

MSK

Use the following options inside the msk object.

OptionRequiredTypeDescription
arnYesStringThe MSK ARN to use.
broker_connection_type NoStringThe type of connector to use with the MSK broker, either public, single_vpc, or multip_vpc. Default is single_vpc.