Flume Channels

Channels are the repositories where the events are staged on a agent.Source adds the events and Sink removes it.

Memory Channel

The events are stored in an in-memory queue with configurable max size. It’sideal for flows that need higher throughput and are prepared to lose the stageddata in the event of a agent failures.Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be memory
capacity100The maximum number of events stored in the channel
transactionCapacity100The maximum number of events the channel will take from a source or give to asink per transaction
keep-alive3Timeout in seconds for adding or removing an event
byteCapacityBufferPercentage20Defines the percent of buffer between byteCapacity and the estimated total sizeof all events in the channel, to account for data in headers. See below.
byteCapacitysee descriptionMaximum total bytes of memory allowed as a sum of all events in this channel.The implementation only counts the Event body, which is the reason forproviding the byteCapacityBufferPercentage configuration parameter as well.Defaults to a computed value equal to 80% of the maximum memory available tothe JVM (i.e. 80% of the -Xmx value passed on the command line).Note that if you have multiple memory channels on a single JVM, and they happento hold the same physical events (i.e. if you are using a replicating channelselector from a single source) then those event sizes may be double-counted forchannel byteCapacity purposes.Setting this value to 0 will cause this value to fall back to a hardinternal limit of about 200 GB.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = memory
  3. a1.channels.c1.capacity = 10000
  4. a1.channels.c1.transactionCapacity = 10000
  5. a1.channels.c1.byteCapacityBufferPercentage = 20
  6. a1.channels.c1.byteCapacity = 800000

JDBC Channel

The events are stored in a persistent storage that’s backed by a database.The JDBC channel currently supports embedded Derby. This is a durable channelthat’s ideal for flows where recoverability is important.Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be jdbc
db.typeDERBYDatabase vendor, needs to be DERBY.
driver.classorg.apache.derby.jdbc.EmbeddedDriverClass for vendor’s JDBC driver
driver.url(constructed from other properties)JDBC connection URL
db.username“sa”User id for db connection
db.passwordpassword for db connection
connection.properties.fileJDBC Connection property file path
create.schematrueIf true, then creates db schema if not there
create.indextrueCreate indexes to speed up lookups
create.foreignkeytrue
transaction.isolation“READ_COMMITTED”Isolation level for db session READ_UNCOMMITTED,READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections10Max connections allowed to db
maximum.capacity0 (unlimited)Max number of events in the channel
sysprop.* DB Vendor specific properties
sysprop.user.home Home path to store embedded Derby database

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = jdbc

Kafka Channel

The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability andreplication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks

The Kafka channel can be used for multiple scenarios:

  • With Flume source and sink - it provides a reliable and highly available channel for events
  • With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
  • With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr
    This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

The configuration parameters are organized as such:

  • Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
  • Configuration values related to Kafka or how the Channel operates are prefixed with “kafka.”, (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates
  • Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
  • Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks
    This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning messageis logged on startup when they are present in the configuration file.

Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.serversList of brokers in the Kafka cluster used by the channelThis can be a partial list of brokers, but we recommend at least two for HA.The format is comma separated list of hostname:port
kafka.topicflume-channelKafka topic which the channel will use
kafka.consumer.group.idflumeConsumer group ID the channel uses to register with Kafka.Multiple channels must use the same topic and group to ensure that when one agent fails another can get the dataNote that having non-channel consumers with the same ID can lead to data loss.
parseAsFlumeEventtrueExpecting Avro datums with FlumeEvent schema in the channel.This should be true if Flume source is writing to the channel and false if other producers arewriting into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by usingorg.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
pollTimeout500The amount of time(in milliseconds) to wait in the “poll()” call of the consumer.https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionIdSpecifies a Kafka partition ID (integer) for all events in this channel to be sent to, unlessoverriden by partitionIdHeader. By default, if this property is not set, events will bedistributed by the Kafka Producer’s partitioner - including by key if specified (or by apartitioner specified by kafka.partitioner.class).
partitionIdHeaderWhen set, the producer will take the value of the field named using the value of this propertyfrom the event header and send the message to the specified partition of the topic. If thevalue represents an invalid partition the event will not be accepted into the channel. If the header valueis present then this setting overrides defaultPartitionId.
kafka.consumer.auto.offset.resetlatestWhat to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server(e.g. because that data has been deleted):earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer’s groupanything else: throw exception to the consumer.
kafka.producer.security.protocolPLAINTEXTSet to SASLPLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
kafka.consumer.security.protocolPLAINTEXTSame as kafka.producer.security.protocol but for reading/consuming from Kafka.
_more producer/consumer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additionalproperties that need to be set on producer/consumer.

Deprecated Properties

Property NameDefaultDescription
brokerListList of brokers in the Kafka cluster used by the channelThis can be a partial list of brokers, but we recommend at least two for HA.The format is comma separated list of hostname:port
topicflume-channelUse kafka.topic
groupIdflumeUse kafka.consumer.group.id
readSmallestOffsetfalseUse kafka.consumer.auto.offset.reset
migrateZookeeperOffsetstrueWhen no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be setto false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.resetconfiguration defines how offsets are handled.

Note

Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up

Example for agent named a1:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer

Security and Kafka Channel:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer|consumer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled,the magnitude of which depends on the CPU type and the JVM implementation.Reference: Kafka security overviewand the jira for tracking this issue:KAFKA-2561

TLS and Kafka Channel:

Please read the steps described in Configuring Kafka Clients SSLto learn about additional configuration settings for fine tuning for example any of the following:security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SSL
  6. # optional, the global truststore can be used alternatively
  7. a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  8. a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
  9. a1.channels.channel1.kafka.consumer.security.protocol = SSL
  10. # optional, the global truststore can be used alternatively
  11. a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
  12. a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead.For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithmis not defined, so hostname verification is not performed.In order to enable hostname verification, set the following properties

  1. a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
  2. a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN)against one of the following two fields:

  • Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  • Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
    If client side authentication is also required then additionally the following needs to be added to Flume agentconfiguration or the global SSL setup can be used (see SSL/TLS support section).Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers eitherindividually or by their signature chain. Common example is to sign each client certificate by a single Root CAwhich in turn is trusted by Kafka brokers.
  1. # optional, the global keystore can be used alternatively
  2. a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
  3. a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
  4. # optional, the global keystore can be used alternatively
  5. a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
  6. a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property willprovide the required additional secret for both consumer and producer keystores:

  1. a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
  2. a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos and Kafka Channel:

To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol properties noted above for producer and/or consumer.The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed.See Kafka docfor information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
  6. a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
  7. a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
  8. a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
  9. a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
  10. a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
  6. a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
  7. a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
  8. # optional, the global truststore can be used alternatively
  9. a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  10. a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
  11. a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
  12. a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
  13. a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
  14. # optional, the global truststore can be used alternatively
  15. a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
  16. a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)in Kafka documentation of SASL configuration.Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example.This won’t be needed unless you require offset migration, or you require this section for other secure components.Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };
  8.  
  9. KafkaClient {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. storeKey=true
  13. keyTab="/path/to/keytabs/flume.keytab"
  14. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  15. };

File Channel

Required properties are in bold.

Property Name DefaultDescription
typeThe component type name, needs to be file.
checkpointDir~/.flume/file-channel/checkpointThe directory where checkpoint file will be stored
useDualCheckpointsfalseBackup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDirThe directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs~/.flume/file-channel/dataComma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
transactionCapacity10000The maximum size of transaction supported by the channel
checkpointInterval30000Amount of time (in millis) between checkpoints
maxFileSize2146435071Max size (in bytes) of a single log file
minimumRequiredSpace524288000Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity1000000Maximum capacity of the channel
keep-alive3Amount of time (in sec) to wait for a put operation
use-log-replay-v1falseExpert: Use old replay logic
use-fast-replayfalseExpert: Replay without using queue
checkpointOnClosetrueControls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKeyKey name used to encrypt new data
encryption.cipherProviderCipher provider type, supported types: AESCTRNOPADDING
encryption.keyProviderKey provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFilePath to the keystore file
encrpytion.keyProvider.keyStorePasswordFilePath to the keystore password file
encryption.keyProvider.keysList of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFilePath to the optional key password file

Note

By default the File Channel uses paths for checkpoint and datadirectories that are within the user home as specified above.As a result if you have more than one File Channel instancesactive within the agent, only one will be able to lock thedirectories and cause the other channel initialization to fail.It is therefore necessary that you provide explicit paths toall the configured channels, preferably on different disks.Furthermore, as file channel will sync to disk after every commit,coupling it with a sink/source that batches events together maybe necessary to provide good performance where multiple disks arenot available for checkpoint and data directories.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = file
  3. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  4. a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

  1. keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  2. -keysize 128 -validity 9000 -keystore test.keystore \
  3. -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

  1. keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  2. -keystore src/test/resources/test.keystore -storetype jceks \
  3. -storepass keyStorePassword
  1. a1.channels.c1.encryption.activeKey = key-0
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = key-provider-0
  4. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  5. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  6. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  7. a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

  1. a1.channels.c1.encryption.activeKey = key-1
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  4. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  5. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  6. a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has its own password:

  1. a1.channels.c1.encryption.activeKey = key-1
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  4. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  5. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  6. a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
  7. a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store and the disk as overflow.The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored inthe file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at thesame time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates.The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart,only the events stored on disk are recovered when the agent comes online. This channel is currently experimental andnot recommended for use in production.

Required properties are in bold. Please refer to file channel for additional required properties.

Property NameDefaultDescription
typeThe component type name, needs to be SPILLABLEMEMORY
memoryCapacity10000Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity100000000Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout3The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage20Defines the percent of buffer between byteCapacity and the estimated total sizeof all events in the channel, to account for data in headers. See below.
byteCapacitysee descriptionMaximum bytes of memory allowed as a sum of all events in the memory queue.The implementation only counts the Event body, which is the reason forproviding the byteCapacityBufferPercentage configuration parameter as well.Defaults to a computed value equal to 80% of the maximum memory available tothe JVM (i.e. 80% of the -Xmx value passed on the command line).Note that if you have multiple memory channels on a single JVM, and they happento hold the same physical events (i.e. if you are using a replicating channelselector from a single source) then those event sizes may be double-counted forchannel byteCapacity purposes.Setting this value to 0 will cause this value to fall back to a hardinternal limit of about 200 GB.
avgEventSize500Estimated average size of events, in bytes, going into the channel
<file channel properties>see file channelAny file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used.The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’to set the File channel’s capacity.

In-memory queue is considered full if either memoryCapacity or byteCapacity limit is reached.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 10000
  4. a1.channels.c1.overflowCapacity = 1000000
  5. a1.channels.c1.byteCapacity = 800000
  6. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  7. a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of the in-memory queue and function like a file channel:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 0
  4. a1.channels.c1.overflowCapacity = 1000000
  5. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  6. a1.channels.c1.dataDirs = /mnt/flume/data

To disable the use of overflow disk and function purely as a in-memory channel:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 100000
  4. a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

Warning

The Pseudo Transaction Channel is only for unit testing purposesand is NOT meant for production use.

Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity50The max number of events stored in the channel
keep-alive3Timeout in seconds for adding or removing an event

Custom Channel

A custom channel is your own implementation of the Channel interface. Acustom channel’s class and its dependencies must be included in the agent’sclasspath when starting the Flume agent. The type of the custom channel isits FQCN.Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be a FQCN

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = org.example.MyChannel