Flume Sources

Avro Source

Listens on Avro port and receives events from external Avro client streams.When paired with the built-in Avro Sink on another (previous hop) Flume agent,it can create tiered collection topologies.Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be avro
bindhostname or IP address to listen on
portPort # to bind to
threadsMaximum number of worker threads to spawn
selector.type
selector.
interceptorsSpace-separated list of interceptors
interceptors.
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
sslfalseSet this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section).
keystoreThis is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error).
keystore-passwordThe password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error).
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified.
include-protocolsSpace-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suitesSpace-separated list of cipher suites to exclude.
include-cipher-suitesSpace-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites.
ipFilterfalseSet this to true to enable ipFiltering for netty
ipFilterRulesDefine N netty ipFilter pattern rules with this config.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = avro
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.bind = 0.0.0.0
  6. a1.sources.r1.port = 4141

Example of ipFilterRules

ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern>orallow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.,allow:name:localhost,deny:ip:

Note that the first rule to match will apply as the example below shows from a client on the localhost

This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:”This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:

Thrift Source

Listens on Thrift port and receives events from external Thrift client streams.When paired with the built-in ThriftSink on another (previous hop) Flume agent,it can create tiered collection topologies.Thrift source can be configured to start in secure mode by enabling kerberos authentication.agent-principal and agent-keytab are the properties used by theThrift source to authenticate to the kerberos KDC.Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be thrift
bindhostname or IP address to listen on
portPort # to bind to
threadsMaximum number of worker threads to spawn
selector.type
selector.
interceptorsSpace separated list of interceptors
interceptors.
sslfalseSet this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section)
keystoreThis is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error).
keystore-passwordThe password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error).
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified.
include-protocolsSpace-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suitesSpace-separated list of cipher suites to exclude.
include-cipher-suitesSpace-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
kerberosfalseSet to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
agent-principalThe kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab—-The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = thrift
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.bind = 0.0.0.0
  6. a1.sources.r1.port = 4141

Exec Source

Exec source runs a given Unix command on start-up and expects that process tocontinuously produce data on standard out (stderr is simply discarded, unlessproperty logStdErr is set to true). If the process exits for any reason, the source also exits andwill produce no further data. This means configurations such as cat [named pipe]or tail -F [file] are going to produce the desired results where as datewill probably not - the former two commands produce streams of data where as thelatter produces a single event and exits.

Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be exec
commandThe command to execute
shellA shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle10000Amount of time (in millis) to wait before attempting a restart
restartfalseWhether the executed cmd should be restarted if it dies
logStdErrfalseWhether the command’s stderr should be logged
batchSize20The max number of lines to read and send to the channel at a time
batchTimeout3000Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.typereplicatingreplicating or multiplexing
selector. Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Warning

The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the eventinto the Channel the client knows about it. In such cases, the data willbe lost. As a for instance, one of the most commonly requested featuresis the tail -F [file]-like use case where an application writesto a log file on disk and Flume tails the file, sending each line as anevent. While this is possible, there’s an obvious problem; what happensif the channel fills up and Flume can’t send an event? Flume has no wayof indicating to the application writing the log file that it needs toretain the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application cannever guarantee data has been received when using a unidirectionalasynchronous interface such as ExecSource! As an extension of thiswarning - and to be completely clear - there is absolutely zero guaranteeof event delivery when using this source. For stronger reliabilityguarantees, consider the Spooling Directory Source, Taildir Source or direct integrationwith Flume via the SDK.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = exec
  4. a1.sources.r1.command = tail -F /var/log/secure
  5. a1.sources.r1.channels = c1

The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bashor Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. Thisallows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes,loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will beinvoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’,‘cmd /c’, ‘powershell -Command’, etc.

  1. a1.sources.tailsource-1.type = exec
  2. a1.sources.tailsource-1.shell = /bin/bash -c
  3. a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested with ActiveMQ.The JMS source provides configurable batch size, message selector, user/pass, and messageto flume event converter. Note that the vendor provided JMS jars should be included in theFlume classpath using plugins.d directory (preferred), –classpath on command line, orvia FLUME_CLASSPATH variable in flume-env.sh.

Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be jms
initialContextFactoryInital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactoryThe JNDI name the connection factory should appear as
providerURLThe JMS provider URL
destinationNameDestination name
destinationTypeDestination type (queue or topic)
messageSelectorMessage selector to use when creating the consumer
userNameUsername for the destination/provider
passwordFileFile containing the password for the destination/provider
batchSize100Number of messages to consume in one batch
converter.typeDEFAULTClass to use to convert messages to flume events. See below.
converter.*Converter properties.
converter.charsetUTF-8Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscriptionfalseWhether to create durable subscription. Durable subscription can only be used withdestinationType topic. If true, “clientId” and “durableSubscriptionName”have to be specified.
clientIdJMS client identifier set on Connection right after it is created.Required for durable subscriptions.
durableSubscriptionNameName used to identify the durable subscription. Required for durable subscriptions.
JMS message converter

The JMS source allows pluggable converters, though it’s likely the default converter will workfor most purposes. The default converter is able to convert Bytes, Text, and Object messagesto FlumeEvents. In all cases, the properties in the message are added as headers to theFlumeEvent.

  • BytesMessage:
  • Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GBof data per message.
  • TextMessage:
  • Text of message is converted to a byte array and copied to the body of theFlumeEvent. The default converter uses UTF-8 by default but this is configurable.
  • ObjectMessage:
  • Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream andthe resulting array is copied to the body of the FlumeEvent.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = jms
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
  6. a1.sources.r1.connectionFactory = GenericConnectionFactory
  7. a1.sources.r1.providerURL = tcp://mqserver:61616
  8. a1.sources.r1.destinationName = BUSINESS_DATA
  9. a1.sources.r1.destinationType = QUEUE
SSL and JMS Source

JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE(Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely theJMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMSserver has also been set up to use SSL).It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.

The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detaileddescriptions about the server side setup of the different JMS providers and also full working configuration examples onFlume Wiki.

SSL transport / server authentication:

If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s ownCA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done viathe global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.

Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg.ActiveMQ uses ssl:// URL prefix instead of tcp://).In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agentconfig file.

Client certificate authentication (two-way SSL):

JMS Source can authenticate to the JMS server through client certificate authentication instead of the usualuser/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).

The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parametersagain. For more details about the global SSL setup, see the SSL/TLS support section.

The keystore should contain only one key (if multiple keys are present, then the first one will be used).The key password must be the same as the keystore password.

In case of client certificate authentication, it is not needed to specify the userName / passwordFile propertiesfor the JMS Source in the Flume agent config file.

Please note:

There are no component level configuration parameters for JMS Source unlike in case of other components.No enable SSL flag either.SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absenceof the truststore / keystore.

Spooling Directory Source

This source lets you ingest data by placing files to be ingested into a“spooling” directory on disk.This source will watch the specified directory for new files, and will parseevents out of new files as they appear.The event parsing logic is pluggable.After a given file has been fully readinto the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is usedto keep track of processed files.

Unlike the Exec source, this source is reliable and will not miss data, even ifFlume is restarted or killed. In exchange for this reliability, only immutable,uniquely-named files must be dropped into the spooling directory. Flume triesto detect these problem conditions and will fail loudly if they are violated:

  • If a file is written to after being placed into the spooling directory,Flume will print an error to its log file and stop processing.
  • If a file name is reused at a later time, Flume will print an error to itslog file and stop processing.
    To avoid the above issues, it may be useful to add a unique identifier(such as a timestamp) to log file names when they are moved into the spoolingdirectory.

Despite the reliability guarantees of this source, there are stillcases in which events may be duplicated if certain downstream failures occur.This is consistent with the guarantees offered by other Flume components.

Property NameDefaultDescription
channels
typeThe component type name, needs to be spooldir.
spoolDirThe directory from which to read files from.
fileSuffix.COMPLETEDSuffix to append to completely ingested files
deletePolicyneverWhen to delete completed files: never or immediate
fileHeaderfalseWhether to add a header storing the absolute path filename.
fileHeaderKeyfileHeader key to use when appending absolute path filename to event header.
basenameHeaderfalseWhether to add a header storing the basename of the file.
basenameHeaderKeybasenameHeader Key to use when appending basename of file to event header.
includePattern^.$Regular expression specifying which files to include.It can used together with ignorePattern.If a file matches both ignorePattern and includePattern regex,the file is ignored.
ignorePattern^$Regular expression specifying which files to ignore (skip).It can used together with includePattern.If a file matches both ignorePattern and includePattern regex,the file is ignored.
trackerDir.flumespoolDirectory to store metadata related to processing of files.If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
trackingPolicyrenameThe tracking policy defines how file processing is tracked. It can be “rename” or“tracker_dir”. This parameter is only effective if the deletePolicy is “never”.“rename” - After processing files they get renamed according to the fileSuffix parameter.“tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir.The new tracker file name is derived from the ingested one plus the fileSuffix.
consumeOrderoldestIn which order files in the spooling directory will be consumed oldest,youngest and random. In case of oldest and youngest, the last modifiedtime of the files will be used to compare the files. In case of a tie, the filewith smallest lexicographical order will be consumed first. In case of random anyfile will be picked randomly. When using oldest and youngest the wholedirectory will be scanned to pick the oldest/youngest file, which might be slow if thereare a large number of files, while using random may cause old files to be consumedvery late if new files keep coming in the spooling directory.
pollDelay500Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearchfalseWhether to monitor sub directories for new files to read.
maxBackoff4000The maximum time (in millis) to wait between consecutive attempts towrite to the channel(s) if the channel is full. The source will start ata low backoff and increase it exponentially each time the channel throws aChannelException, upto the value specified by this parameter.
batchSize100Granularity at which to batch transfer to the channel
inputCharsetUTF-8Character set used by deserializers that treat the input file as text.
decodeErrorPolicyFAILWhat to do when we see a non-decodable character in the input file.FAIL: Throw an exception and fail to parse the file.REPLACE: Replace the unparseable character with the “replacement character” char,typically Unicode U+FFFD.IGNORE: Drop the unparseable character sequence.
deserializerLINESpecify the deserializer used to parse the file into events.Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder.
deserializer. Varies per event deserializer.
bufferMaxLines(Obselete) This option is now ignored.
bufferMaxLineLength5000(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.typereplicatingreplicating or multiplexing
selector. Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for an agent named agent-1:

  1. a1.channels = ch-1
  2. a1.sources = src-1
  3.  
  4. a1.sources.src-1.type = spooldir
  5. a1.sources.src-1.channels = ch-1
  6. a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
  7. a1.sources.src-1.fileHeader = true
Event Deserializers

The following event deserializers ship with Flume.

LINE

This deserializer generates one event per line of text input.

Property NameDefaultDescription
deserializer.maxLineLength2048Maximum number of characters to include in a single event.If a line exceeds this length, it is truncated, and theremaining characters on the line will appear in asubsequent event.
deserializer.outputCharsetUTF-8Charset to use for encoding events put into the channel.
AVRO

This deserializer is able to read an Avro container file, and it generatesone event per Avro record in the file.Each event is annotated with a header that indicates the schema used.The body of the event is the binary Avro record data, notincluding the schema or the rest of the container file elements.

Note that if the spool directory source must retry putting one of these eventsonto a channel (for example, because the channel is full), then it will resetand retry from the most recent Avro container file sync point. To reducepotential event duplication in such a failure scenario, write sync markers morefrequently in your Avro input files.

Property NameDefaultDescription
deserializer.schemaTypeHASHHow the schema is represented. By default, or when the value HASHis specified, the Avro schema is hashed andthe hash is stored in every event in the event header“flume.avro.schema.hash”. If LITERAL is specified, the JSON-encodedschema itself is stored in every event in the event header“flume.avro.schema.literal”. Using LITERAL mode is relativelyinefficient compared to HASH mode.
BlobDeserializer

This deserializer reads a Binary Large Object (BLOB) per event, typically one BLOB per file. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because the entire BLOB is buffered in RAM.

Property NameDefaultDescription
deserializerThe FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength100000000The maximum number of bytes to read and buffer for a given request

Taildir Source

Note

This source is provided as a preview feature. It does not work on Windows.

Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files.If the new lines are being written, this source will retry reading them in wait for the completion of the write.

This source is reliable and will not miss data even when the tailing files rotate.It periodically writes the last read position of each files on the given position file in JSON format.If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.

In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.When there is no position file on the specified path, it will start tailing from the first line of each files by default.

Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.

This source does not rename or delete or do any modifications to the file being tailed.Currently this source does not support tailing binary files. It reads text files line by line.

Property NameDefaultDescription
channels
typeThe component type name, needs to be TAILDIR.
filegroupsSpace-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName>Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile~/.flume/taildir_position.jsonFile in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey>Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeaderfalseWhether to add the byte offset of a tailed line to a header called ‘byteoffset’.
skipToEndfalseWhether to skip the position to EOF in the case of files not written on the position file.
idleTimeout120000Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval3000Interval time (ms) to write the last position of each file on the position file.
batchSize100Max number of lines to read and send to the channel at a time. Using the default is usually fine.
maxBatchCountLong.MAX_VALUEControls the number of batches being read consecutively from the same file.If the source is tailing multiple files and one of them is written at a fast rate,it can prevent other files to be processed, because the busy file would be read in an endless loop.In this case lower this value.
backoffSleepIncrement1000The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep5000The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatchingtrueListing directories and applying the filename regex pattern may be time consuming for directoriescontaining thousands of files. Caching the list of matching files can improve performance.The order in which files are consumed will also be cached.Requires that the file system keeps track of modification times with at least a 1-second granularity.
fileHeaderfalseWhether to add a header storing the absolute path filename.
fileHeaderKeyfileHeader key to use when appending absolute path filename to event header.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = TAILDIR
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
  6. a1.sources.r1.filegroups = f1 f2
  7. a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
  8. a1.sources.r1.headers.f1.headerKey1 = value1
  9. a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
  10. a1.sources.r1.headers.f2.headerKey1 = value2
  11. a1.sources.r1.headers.f2.headerKey2 = value2-2
  12. a1.sources.r1.fileHeader = true
  13. a1.sources.ri.maxBatchCount = 1000

Twitter 1% firehose Source (experimental)

Warning

This source is highly experimental and may change between minor versions of Flume.Use at your own risk.

Experimental source that connects via Streaming API to the 1% sample twitterfirehose, continously downloads tweets, converts them to Avro format andsends Avro events to a downstream Flume sink. Requires the consumer andaccess tokens and secrets of a Twitter developer account.Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be org.apache.flume.source.twitter.TwitterSource
consumerKeyOAuth consumer key
consumerSecretOAuth consumer secret
accessTokenOAuth access token
accessTokenSecretOAuth token secret
maxBatchSize1000Maximum number of twitter messages to put in a single batch
maxBatchDurationMillis1000Maximum number of milliseconds to wait before closing a batch

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
  6. a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
  7. a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
  8. a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
  9. a1.sources.r1.maxBatchSize = 10
  10. a1.sources.r1.maxBatchDurationMillis = 200

Kafka Source

Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.If you have multiple Kafka sources running, you can configure them with the same Consumer Groupso each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Property NameDefaultDescription
channels
typeThe component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversList of brokers in the Kafka cluster used by the source
kafka.consumer.group.idflumeUnique identified of consumer group. Setting the same id in multiple sources or agentsindicates that they are part of the same consumer group
kafka.topicsComma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regexRegex that defines set of topics the source is subscribed on. This property has higher prioritythan kafka.topics and overrides kafka.topics if exists.
batchSize1000Maximum number of messages written to Channel in one batch
batchDurationMillis1000Maximum time (in ms) before a batch will be written to ChannelThe batch will be written whenever the first of size and time will be reached.
backoffSleepIncrement1000Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal foringestion use cases but a lower value may be required for low latency operations withinterceptors.
maxBackoffSleep5000Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds isideal for ingestion use cases but a lower value may be required for low latency operationswith interceptors.
useFlumeEventFormatfalseBy default events are taken as bytes from the Kafka topic directly into the event body. Set totrue to read events as the Flume Avro binary format. Used in conjunction with the same propertyon the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserveany Flume headers sent on the producing side.
setTopicHeadertrueWhen set to true, stores the topic of the retrieved message into a header, defined by thetopicHeader property.
topicHeadertopicDefines the name of the header in which to store the name of the topic the message was receivedfrom, if the setTopicHeader property is set to true. Care should be taken if combiningwith the Kafka Sink topicHeader property so as to avoid sending the message back to the sametopic in a loop.
kafka.consumer.security.protocolPLAINTEXTSet to SASLPLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
_more consumer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additionalproperties that need to be set on consumer.
Other Kafka Consumer PropertiesThese properties are used to configure the Kafka Consumer. Any consumer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer.For example: kafka.consumer.auto.offset.reset

Note

The Kafka Source overrides two Kafka consumer parameters:auto.commit.enable is set to “false” by the source and every batch is committed. Kafka source guarantees at least oncestrategy of messages retrieval. The duplicates can be present when the source starts.The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Property NameDefaultDescription
topicUse kafka.topics
groupIdflumeUse kafka.consumer.group.id
zookeeperConnectIs no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.serversto establish connection with kafka cluster
migrateZookeeperOffsetstrueWhen no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.This should be true to support seamless Kafka client migration from older versions of Flume.Once migrated this can be set to false, though that should generally not be required.If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.resetdefines how offsets are handled.Check Kafka documentationfor details

Example for topic subscription by comma-separated topic list.

  1. tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. tier1.sources.source1.channels = channel1
  3. tier1.sources.source1.batchSize = 5000
  4. tier1.sources.source1.batchDurationMillis = 2000
  5. tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
  6. tier1.sources.source1.kafka.topics = test1, test2
  7. tier1.sources.source1.kafka.consumer.group.id = custom.g.id

Example for topic subscription by regex

  1. tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. tier1.sources.source1.channels = channel1
  3. tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
  4. tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
  5. # the default kafka.consumer.group.id=flume is used

Security and Kafka Source:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.consumer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled,the magnitude of which depends on the CPU type and the JVM implementation.Reference: Kafka security overviewand the jira for tracking this issue:KAFKA-2561

TLS and Kafka Source:

Please read the steps described in Configuring Kafka Clients SSLto learn about additional configuration settings for fine tuning for example any of the following:security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SSL
  6. # optional, the global truststore can be used alternatively
  7. a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
  8. a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead.For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithmis not defined, so hostname verification is not performed.In order to enable hostname verification, set the following properties

  1. a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN)against one of the following two fields:

  • Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  • Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
    If client side authentication is also required then additionally the following needs to be added to Flume agentconfiguration or the global SSL setup can be used (see SSL/TLS support section).Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers eitherindividually or by their signature chain. Common example is to sign each client certificate by a single Root CAwhich in turn is trusted by Kafka brokers.
  1. # optional, the global keystore can be used alternatively
  2. a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
  3. a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

If keystore and key use different password protection then ssl.key.password property willprovide the required additional secret for both consumer keystores:

  1. a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer.The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed.See Kafka docfor information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
  6. a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
  7. a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
  6. a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
  7. a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  8. # optional, the global truststore can be used alternatively
  9. a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
  10. a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)in Kafka documentation of SASL configuration.Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example.This won’t be needed unless you require offset migration, or you require this section for other secure components.Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };
  8.  
  9. KafkaClient {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. storeKey=true
  13. keyTab="/path/to/keytabs/flume.keytab"
  14. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  15. };

NetCat TCP Source

A netcat-like source that listens on a given port and turns each line of textinto an event. Acts like nc -k -l [host] [port]. In other words,it opens a specified port and listens for data. The expectation is that thesupplied data is newline separated text. Each line of text is turned into aFlume event and sent via the connected channel.

Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be netcat
bindHost name or IP address to bind to
portPort # to bind to
max-line-length512Max line length per event body (in bytes)
ack-every-eventtrueRespond with an “OK” for every event received
selector.typereplicatingreplicating or multiplexing
selector. Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = netcat
  4. a1.sources.r1.bind = 0.0.0.0
  5. a1.sources.r1.port = 6666
  6. a1.sources.r1.channels = c1

NetCat UDP Source

As per the original Netcat (TCP) source, this source that listens on a givenport and turns each line of text into an event and sent via the connected channel.Acts like nc -u -k -l [host] [port].

Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be netcatudp
bindHost name or IP address to bind to
portPort # to bind to
remoteAddressHeader
selector.typereplicatingreplicating or multiplexing
selector. Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = netcatudp
  4. a1.sources.r1.bind = 0.0.0.0
  5. a1.sources.r1.port = 6666
  6. a1.sources.r1.channels = c1

Sequence Generator Source

A simple sequence generator that continuously generates events with a counter that starts from 0,increments by 1 and stops at totalEvents. Retries when it can’t send events to the channel. Usefulmainly for testing. During retries it keeps the body of the retried messages the same as before sothat the number of unique events - after de-duplication at destination - is expected to beequal to the specified totalEvents. Required properties are in bold.

Property NameDefaultDescription
channels
typeThe component type name, needs to be seq
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.
batchSize1Number of events to attempt to process per request loop.
totalEventsLong.MAX_VALUENumber of unique events sent by the source.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = seq
  4. a1.sources.r1.channels = c1

Syslog Sources

Reads syslog data and generate Flume events. The UDP source treats an entiremessage as a single event. The TCP sources create a new event for each stringof characters separated by a newline (‘n’).

Required properties are in bold.

Syslog TCP Source

The original, tried-and-true syslog TCP source.

Property NameDefaultDescription
channels
typeThe component type name, needs to be syslogtcp
hostHost name or IP address to bind to
portPort # to bind to
eventSize2500Maximum size of a single event line, in bytes
keepFieldsnoneSetting this to ‘all’ will preserve the Priority,Timestamp and Hostname in the body of the event.A spaced separated list of fields to includeis allowed as well. Currently, the followingfields can be included: priority, version,timestamp, hostname. The values ‘true’ and ‘false’have been deprecated in favor of ‘all’ and ‘none’.
clientIPHeaderIf specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case.
clientHostnameHeaderIf specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case.
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.
sslfalseSet this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section).
keystoreThis is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error).
keystore-passwordThe password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error).
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified.
include-protocolsSpace-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suitesSpace-separated list of cipher suites to exclude.
include-cipher-suitesSpace-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites.

For example, a syslog TCP source for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = syslogtcp
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.host = localhost
  6. a1.sources.r1.channels = c1
Multiport Syslog TCP Source

This is a newer, faster, multi-port capable version of the Syslog TCP source.Note that the ports configuration setting has replaced port.Multi-port capability means that it can listen on many ports at once in anefficient manner. This source uses the Apache Mina library to do that.Provides support for RFC-3164 and many common RFC-5424 formatted messages.Also provides the capability to configure the character set used on a per-portbasis.

Property NameDefaultDescription
channels
typeThe component type name, needs to be multiportsyslogtcp
hostHost name or IP address to bind to.
portsSpace-separated list (one or more) of ports to bind to.
eventSize2500Maximum size of a single event line, in bytes.
keepFieldsnoneSetting this to ‘all’ will preserve thePriority, Timestamp and Hostname in the body of the event.A spaced separated list of fields to includeis allowed as well. Currently, the followingfields can be included: priority, version,timestamp, hostname. The values ‘true’ and ‘false’have been deprecated in favor of ‘all’ and ‘none’.
portHeaderIf specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.
clientIPHeaderIf specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like _host)because the event header will be overridden in that case.
clientHostnameHeaderIf specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case.
charset.defaultUTF-8Default character set used while parsing syslog events into strings.
charset.port.<port>Character set is configurable on a per-port basis.
batchSize100Maximum number of events to attempt to process per request loop. Using the default is usually fine.
readBufferSize1024Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine.
numProcessors(auto-detected)Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable.
selector.typereplicatingreplicating, multiplexing, or custom
selector.Depends on the selector.type value
interceptorsSpace-separated list of interceptors.
interceptors.
sslfalseSet this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section).
keystoreThis is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error).
keystore-passwordThe password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error).
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified.
include-protocolsSpace-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suitesSpace-separated list of cipher suites to exclude.
include-cipher-suitesSpace-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites.

For example, a multiport syslog TCP source for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = multiport_syslogtcp
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.host = 0.0.0.0
  6. a1.sources.r1.ports = 10001 10002 10003
  7. a1.sources.r1.portHeader = port
Syslog UDP Source
Property NameDefaultDescription
channels
typeThe component type name, needs to be syslogudp
hostHost name or IP address to bind to
portPort # to bind to
keepFieldsfalseSetting this to true will preserve the Priority,Timestamp and Hostname in the body of the event.
clientIPHeaderIf specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case.
clientHostnameHeaderIf specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case.
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

For example, a syslog UDP source for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = syslogudp
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.host = localhost
  6. a1.sources.r1.channels = c1

HTTP Source

A source which accepts Flume Events by HTTP POST and GET. GET should be usedfor experimentation only. HTTP requests are converted into flume events bya pluggable “handler” which must implement the HTTPSourceHandler interface.This handler takes a HttpServletRequest and returns a list offlume events. All events handled from one Http request are committed to the channelin one transaction, thus allowing for increased efficiency on channels likethe file channel. If the handler throws an exception, this source willreturn a HTTP status of 400. If the channel is full, or the source is unable toappend events to the channel, the source will return a HTTP 503 - Temporarilyunavailable status.

All events sent in one post request are considered to be one batch andinserted into the channel in one transaction.

This source is based on Jetty 9.4 and offers the ability to set additionalJetty-specific parameters which will be passed directly to the Jetty components.

Property NameDefaultDescription
type The component type name, needs to be http
portThe port the source should bind to.
bind0.0.0.0The hostname or IP address to listen on
handlerorg.apache.flume.source.http.JSONHandlerThe FQCN of the handler class.
handler.Config parameters for the handler
selector.typereplicatingreplicating or multiplexing
selector. Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.
sslfalseSet the property true, to enable SSL. HTTP Source does not support SSLv3.
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified.
include-protocolsSpace-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suitesSpace-separated list of cipher suites to exclude.
include-cipher-suitesSpace-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
keystore Location of the keystore including keystore file name.If SSL is enabled but the keystore is not specified here,then the global keystore will be used(if defined, otherwise configuration error).
keystore-password Keystore password.If SSL is enabled but the keystore password is not specified here,then the global keystore password will be used(if defined, otherwise configuration error).
keystore-typeJKSKeystore type. This can be “JKS” or “PKCS12”.
QueuedThreadPool. Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool.N.B. QueuedThreadPool will only be used if at least one property of this class is set.
HttpConfiguration. Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration
SslContextFactory. Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (onlyapplicable when ssl is set to true).
ServerConnector.* Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector

Deprecated Properties

Property NameDefaultDescription
keystorePasswordUse keystore-password. Deprecated value will be overwritten with the new one.
excludeProtocolsSSLv3Use exclude-protocols. Deprecated value will be overwritten with the new one.
enableSSLfalseUse ssl. Deprecated value will be overwritten with the new one.

N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes(QueuedThreadPool,HttpConfiguration,SslContextFactory andServerConnector).

When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will takeprecedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.

An example http source for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = http
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.channels = c1
  6. a1.sources.r1.handler = org.example.rest.RestHandler
  7. a1.sources.r1.handler.nickname = random props
  8. a1.sources.r1.HttpConfiguration.sendServerVersion = false
  9. a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

A handler is provided out of the box which can handle events represented inJSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handleraccepts an array of events (even if there is only one event, the event has to besent in an array) and converts them to a Flume event based on theencoding specified in the request. If no encoding is specified, UTF-8 is assumed.The JSON handler supports UTF-8, UTF-16 and UTF-32.Events are represented as follows.

  1. [{
  2. "headers" : {
  3. "timestamp" : "434324343",
  4. "host" : "random_host.example.com"
  5. },
  6. "body" : "random_body"
  7. },
  8. {
  9. "headers" : {
  10. "namenode" : "namenode.example.com",
  11. "datanode" : "random_datanode.example.com"
  12. },
  13. "body" : "really_random_body"
  14. }]

To set the charset, the request must have content type specified asapplication/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 asrequired).

One way to create an event in the format expected by this handler is touse JSONEvent provided in the Flume SDK and use Google Gson to create the JSONstring using the Gson#fromJson(Object, Type)method. The type token to pass as the 2nd argument of this methodfor list of events can be created by:

  1. Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.

Property NameDefaultDescription
handlerThe FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength100000000The maximum number of bytes to read and buffer for a given request

Stress Source

StressSource is an internal load-generating source implementation which is very useful forstress tests. It allows User to configure the size of Event payload, with empty headers.User can configure total number of events to be sent as well maximum number of SuccessfulEvent to be delivered.

Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be org.apache.flume.source.StressSource
size500Payload size of each Event. Unit:byte
maxTotalEvents-1Maximum number of Events to be sent
maxSuccessfulEvents-1Maximum number of Events successfully sent
batchSize1Number of Events to be sent in one batch
maxEventsPerSecond0When set to an integer greater than zero, enforces a rate limiter onto the source.

Example for agent named a1:

  1. a1.sources = stresssource-1
  2. a1.channels = memoryChannel-1
  3. a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
  4. a1.sources.stresssource-1.size = 10240
  5. a1.sources.stresssource-1.maxTotalEvents = 1000000
  6. a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume1.0 format, and stores them in the connected channel. The 0.9.4 eventproperties like timestamp, pri, host, nanos, etc get converted to 1.x eventheader attributes. The legacy source supports both Avro and Thrift RPCconnections. To use this bridge between two Flume versions, you need to start aFlume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agentshould have the agent Sink pointing to the host/port of the 1.x agent.

Note

The reliability semantics of Flume 1.x are different from that ofFlume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not besupported by the legacy source. The only supported 0.9.x mode is thebest effort, though the reliability setting of the 1.x flow will beapplicable to the events once they are saved into the Flume 1.xchannel by the legacy source.

Required properties are in bold.

Avro Legacy Source
Property NameDefaultDescription
channels
typeThe component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
hostThe hostname or IP address to bind to
portThe port # to listen on
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
  4. a1.sources.r1.host = 0.0.0.0
  5. a1.sources.r1.bind = 6666
  6. a1.sources.r1.channels = c1
Thrift Legacy Source
Property NameDefaultDescription
channels
typeThe component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
hostThe hostname or IP address to bind to
portThe port # to listen on
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
  4. a1.sources.r1.host = 0.0.0.0
  5. a1.sources.r1.bind = 6666
  6. a1.sources.r1.channels = c1

Custom Source

A custom source is your own implementation of the Source interface. A customsource’s class and its dependencies must be included in the agent’s classpathwhen starting the Flume agent. The type of the custom source is its FQCN.

Property NameDefaultDescription
channels
typeThe component type name, needs to be your FQCN
selector.type replicating or multiplexing
selector.replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.example.MySource
  4. a1.sources.r1.channels = c1

Scribe Source

Scribe is another type of ingest system. To adopt existing Scribe ingest system,Flume should use ScribeSource based on Thrift with compatible transfering protocol.For deployment of Scribe please follow the guide from Facebook.Required properties are in bold.

Property NameDefaultDescription
typeThe component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port1499Port that Scribe should be connected
maxReadBufferBytes16384000Thrift Default FrameBuffer Size
workerThreads5Handing threads number in Thrift
selector.type
selector.*

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
  4. a1.sources.r1.port = 1463
  5. a1.sources.r1.workerThreads = 5
  6. a1.sources.r1.channels = c1