Flume Sinks

HDFS Sink

This sink writes events into the Hadoop Distributed File System (HDFS). Itcurrently supports creating text and sequence files. It supports compression inboth file types. The files can be rolled (close current file and create a newone) periodically based on the elapsed time or size of data or number of events.It also buckets/partitions data by attributes like timestamp or machinewhere the event originated. The HDFS directory path may contain formattingescape sequences that will replaced by the HDFS sink to generate adirectory/file name to store the events. Using this sink requires hadoop to beinstalled so that Flume can use the Hadoop jars to communicate with the HDFScluster. Note that a version of Hadoop that supports the sync() call isrequired.

The following are the escape sequences supported:

AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, …)
%Alocale’s full weekday name (Monday, Tuesday, …)
%blocale’s short month name (Jan, Feb, …)
%Blocale’s long month name (January, February, …)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%eday of month without padding (1)
%Ddate; same as %m/%d/%y
%Hhour (00..23)
%Ihour (01..12)
%jday of year (001..366)
%khour ( 0..23)
%mmonth (01..12)
%nmonth without padding (1..12)
%Mminute (00..59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00..59)
%ylast two digits of year (00..99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)
%[localhost]Substitute the hostname of the host where the agent is running
%[IP]Substitute the IP address of the host where the agent is running
%[FQDN]Substitute the canonical hostname of the host where the agent is running

Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability to obtain thehostname, which may fail in some networking environments.

The file in use will have the name mangled to include ”.tmp” at the end. Oncethe file is closed, this extension is removed. This allows excluding partiallycomplete files in the directory.Required properties are in bold.

Note

For all of the time related escape sequences, a header with the key“timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to addthis automatically is to use the TimestampInterceptor.

NameDefaultDescription
channel
typeThe component type name, needs to be hdfs
hdfs.pathHDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataName prefixed to files created by Flume in hdfs directory
hdfs.fileSuffixSuffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefixPrefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix.tmpSuffix that is used for temporal files that flume actively writes into
hdfs.emptyInUseSuffixfalseIf false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead.
hdfs.rollInterval30Number of seconds to wait before rolling current file(0 = never roll based on time interval)
hdfs.rollSize1024File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount10Number of events written to file before it rolled(0 = never roll based on number of events)
hdfs.idleTimeout0Timeout after which inactive files get closed(0 = disable automatic closing of idle files)
hdfs.batchSize100number of events written to file before it is flushed to HDFS
hdfs.codeCCompression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileTypeSequenceFileFile format: currently SequenceFile, DataStream or CompressedStream(1)DataStream will not compress output file and please don’t set codeC(2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles5000Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicasSpecify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormatWritableFormat for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.
hdfs.threadsPoolSize10Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize1Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipalKerberos user principal for accessing secure HDFS
hdfs.kerberosKeytabKerberos keytab for accessing secure HDFS
hdfs.proxyUser
hdfs.roundfalseShould the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnitsecondThe unit of the round down value - second, minute or hour.
hdfs.timeZoneLocal TimeName of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStampfalseUse the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries0Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename(due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension.If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try).The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval180Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will notattempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializerTEXTOther possible options include avro_event or thefully-qualified class name of an implementation of theEventSerializer.Builder interface.
serializer.*

Deprecated Properties

Name Default Description====================== ============ ======================================================================hdfs.callTimeout 30000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.====================== ============ ======================================================================

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = hdfs
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
  6. a1.sinks.k1.hdfs.filePrefix = events-
  7. a1.sinks.k1.hdfs.round = true
  8. a1.sinks.k1.hdfs.roundValue = 10
  9. a1.sinks.k1.hdfs.roundUnit = minute

The above configuration will round down the timestamp to the last 10th minute. For example, an event withtimestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

Hive Sink

This sink streams events containing delimited text or JSON data directly into a Hive table or partition.Events are written using Hive transactions. As soon as a set of events are committed to Hive, they becomeimmediately visible to Hive queries. Partitions to which flume will stream to can either be pre-createdor, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped tocorresponding columns in the Hive table.

NameDefaultDescription
channel
typeThe component type name, needs to be hive
hive.metastoreHive metastore URI (eg thrift://a.b.com:9083 )
hive.databaseHive database name
hive.tableHive table name
hive.partitionComma separate list of partition values identifying the partition to write to. May contain escapesequences. E.g: If the table is partitioned by (continent: string, country :string, time : string)then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21
hive.txnsPerBatchAsk100Hive grants a batch of transactions instead of single transactions to streaming clients like Flume.This setting configures the number of desired transactions per Transaction Batch. Data from alltransactions in a single batch end up in a single file. Flume will write a maximum of batchSize eventsin each transaction in the batch. This setting in conjunction with batchSize provides control over thesize of each file. Note that eventually Hive will transparently compact these files into larger files.
heartBeatInterval240(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring.Set this value to 0 to disable heartbeats.
autoCreatePartitionstrueFlume will automatically create the necessary Hive partitions to stream to
batchSize15000Max number of events written to Hive in a single Hive transaction
maxOpenConnections500Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout10000(In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table.Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON
roundUnitminuteThe unit of the round down value - second, minute or hour.
roundValue1Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZoneLocal TimeName of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStampfalseUse the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Following serializers are provided for Hive sink:

JSON: Handles UTF8 encoded Json (strict syntax) events and requires no configration. Object namesin the JSON are mapped directly to columns with the same name in the Hive table.Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of the Serde of the Hive table.This serializer requires HCatalog to be installed.

DELIMITED: Handles simple delimited textual events.Internally uses LazySimpleSerde but is independent of the Serde of the Hive table.

NameDefaultDescription
serializer.delimiter,(Type: string) The field delimiter in the incoming data. To use specialcharacters, surround them with double quotes like “\t”
serializer.fieldnamesThe mapping from input fields to columns in hive table. Specified as acomma separated list (no spaces) of hive table columns names, identifyingthe input fields in order of their occurrence. To skip fields leave thecolumn name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rdand 4th fields in input map to time, ip and message columns in the hive table.
serializer.serdeSeparatorCtrl-A(Type: character) Customizes the separator used by underlying serde. Therecan be a gain in efficiency if the fields in serializer.fieldnames are insame order as table columns, the serializer.delimiter is same as theserializer.serdeSeparator and number of fields in serializer.fieldnamesis less than or equal to number of table columns, as the fields in incomingevent body do not need to be reordered to match order of table columns.Use single quotes for special characters like ‘\t’.Ensure input fields do not contain this character. NOTE: If serializer.delimiteris a single character, preferably set this to the same character

The following are the escape sequences supported:

AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, …)
%Alocale’s full weekday name (Monday, Tuesday, …)
%blocale’s short month name (Jan, Feb, …)
%Blocale’s long month name (January, February, …)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%Ddate; same as %m/%d/%y
%Hhour (00..23)
%Ihour (01..12)
%jday of year (001..366)
%khour ( 0..23)
%mmonth (01..12)
%Mminute (00..59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00..59)
%ylast two digits of year (00..99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)

Note

For all of the time related escape sequences, a header with the key“timestamp” must exist among the headers of the event (unless useLocalTimeStamp is set to true). One way to addthis automatically is to use the TimestampInterceptor.

Example Hive table :

  1. create table weblogs ( id int , msg string )
  2. partitioned by (continent string, country string, time string)
  3. clustered by (id) into 5 buckets
  4. stored as orc;

Example for agent named a1:

  1. a1.channels = c1
  2. a1.channels.c1.type = memory
  3. a1.sinks = k1
  4. a1.sinks.k1.type = hive
  5. a1.sinks.k1.channel = c1
  6. a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
  7. a1.sinks.k1.hive.database = logsdb
  8. a1.sinks.k1.hive.table = weblogs
  9. a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
  10. a1.sinks.k1.useLocalTimeStamp = false
  11. a1.sinks.k1.round = true
  12. a1.sinks.k1.roundValue = 10
  13. a1.sinks.k1.roundUnit = minute
  14. a1.sinks.k1.serializer = DELIMITED
  15. a1.sinks.k1.serializer.delimiter = "\t"
  16. a1.sinks.k1.serializer.serdeSeparator = '\t'
  17. a1.sinks.k1.serializer.fieldnames =id,,msg

The above configuration will round down the timestamp to the last 10th minute. For example, an event withtimestamp header set to 11:54:34 AM, June 12, 2012 and ‘country’ header set to ‘india’ will evaluate to thepartition (continent=’asia’,country=’india’,time=‘2012-06-12-11-50’. The serializer is configured toaccept tab separated input containing three fields and to skip the second field.

Logger Sink

Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties arein bold. This sink is the only exception which doesn’t require the extra configurationexplained in the Logging raw data section.

Property NameDefaultDescription
channel
typeThe component type name, needs to be logger
maxBytesToLog16Maximum number of bytes of the Event body to log

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = logger
  4. a1.sinks.k1.channel = c1

Avro Sink

This sink forms one half of Flume’s tiered collection support. Flume eventssent to this sink are turned into Avro events and sent to the configuredhostname / port pair. The events are taken from the configured Channel inbatches of the configured batch size.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be avro.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
reset-connection-intervalnoneAmount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level6The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
sslfalseSet to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.
trust-all-certsfalseIf this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.
truststoreThe path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-passwordThe password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-typeJKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers2 * the number of available processors in the machineThe maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = avro
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = 10.10.10.10
  6. a1.sinks.k1.port = 4545

Thrift Sink

This sink forms one half of Flume’s tiered collection support. Flume eventssent to this sink are turned into Thrift events and sent to the configuredhostname / port pair. The events are taken from the configured Channel inbatches of the configured batch size.

Thrift sink can be configured to start in secure mode by enabling kerberos authentication.To communicate with a Thrift source started in secure mode, the Thrift sink should alsooperate in secure mode. client-principal and client-keytab are the properties used by theThrift sink to authenticate to the kerberos KDC. The server-principal represents theprincipal of the Thrift source this sink is configured to connect to in secure mode.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be thrift.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
connection-reset-intervalnoneAmount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
sslfalseSet to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststoreThe path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-passwordThe password for the truststore. If not specified, then the global keystore password will be used (if defined).
truststore-typeJKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude
kerberosfalseSet to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source.
client-principal—-The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab—-The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principalThe kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = thrift
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = 10.10.10.10
  6. a1.sinks.k1.port = 4545

IRC Sink

The IRC sink takes messages from attached channel and relays those toconfigured IRC destinations.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be irc
hostnameThe hostname or IP address to connect to
port6667The port number of remote host to connect
nickNick name
userUser name
passwordUser password
chanchannel
name
splitlines(boolean)
splitcharsnline separator (if you were to enter the default valueinto the config file, then you would need to escape thebackslash, like this: “\n”)

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = irc
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = irc.yourdomain.com
  6. a1.sinks.k1.nick = flume
  7. a1.sinks.k1.chan = #flume

File Roll Sink

Stores events on the local filesystem.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be file_roll.
sink.directoryThe directory where files will be stored
sink.pathManagerDEFAULTThe PathManager implementation to use.
sink.pathManager.extensionThe file extension if the default PathManager is used.
sink.pathManager.prefixA character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval30Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializerTEXTOther possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
sink.batchSize100

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = file_roll
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

Discards all events it receives from the channel.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be null.
batchSize100

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = null
  4. a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

This sink writes data to HBase. The Hbase configuration is picked up from the firsthbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializerwhich is specified by the configuration is used to convert the events intoHBase puts and/or increments. These puts and increments are then writtento HBase. This sink provides the same consistency guarantees as HBase,which is currently row-wise atomicity. In the event of Hbase failing towrite certain events, the sink will replay all events in that transaction.

The HBaseSink supports writing data to secure HBase. To write to secure HBase, the userthe agent is running as must have write permissions to the table the sink is configuredto write to. The principal and keytab to use to authenticate against the KDC can be specifiedin the configuration. The hbase-site.xml in the Flume agent’s classpathmust have authentication set to kerberos (For details on how to do this, please refer toHBase documentation).

For convenience, two serializers are provided with Flume. TheSimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)writes the event bodyas-is to HBase, and optionally increments a column in Hbase. This is primarilyan example implementation. The RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event bodybased on the given regex and writes each part into different columns.

The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be hbase
tableThe name of the table in Hbase to write to.
columnFamilyThe column family in Hbase to write to.
zookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize100Number of events to be written per txn.
coalesceIncrementsfalseShould the sink coalesce multiple increments to a cell per batch. This might givebetter performance if there are multiple increments to a limited number of cells.
serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializerDefault increment column = “iCol”, payload column = “pCol”.
serializer.*Properties to be passed to the serializer.
kerberosPrincipalKerberos user principal for accessing secure HBase
kerberosKeytabKerberos keytab for accessing secure HBase

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = hbase
  4. a1.sinks.k1.table = foo_table
  5. a1.sinks.k1.columnFamily = bar_cf
  6. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
  7. a1.sinks.k1.channel = c1
HBase2Sink

HBase2Sink is the equivalent of HBaseSink for HBase version 2.The provided functionality and the configuration parameters are the same as in case of HBaseSink (except the hbase2 tag in the sink type and the package/class names).

The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.

Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be hbase2
tableThe name of the table in HBase to write to.
columnFamilyThe column family in HBase to write to.
zookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize100Number of events to be written per txn.
coalesceIncrementsfalseShould the sink coalesce multiple increments to a cell per batch. This might givebetter performance if there are multiple increments to a limited number of cells.
serializerorg.apache.flume.sink.hbase2.SimpleHBase2EventSerializerDefault increment column = “iCol”, payload column = “pCol”.
serializer.*Properties to be passed to the serializer.
kerberosPrincipalKerberos user principal for accessing secure HBase
kerberosKeytabKerberos keytab for accessing secure HBase

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = hbase2
  4. a1.sinks.k1.table = foo_table
  5. a1.sinks.k1.columnFamily = bar_cf
  6. a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
  7. a1.sinks.k1.channel = c1
AsyncHBaseSink

This sink writes data to HBase using an asynchronous model. A class implementingAsyncHbaseEventSerializer which is specified by the configuration is used to convert the events intoHBase puts and/or increments. These puts and increments are then writtento HBase. This sink uses the Asynchbase API to write toHBase. This sink provides the same consistency guarantees as HBase,which is currently row-wise atomicity. In the event of Hbase failing towrite certain events, the sink will replay all events in that transaction.AsyncHBaseSink can only be used with HBase 1.x. The async client library used by AsyncHBaseSink is not available for HBase 2.The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be asynchbase
tableThe name of the table in Hbase to write to.
zookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamilyThe column family in Hbase to write to.
batchSize100Number of events to be written per txn.
coalesceIncrementsfalseShould the sink coalesce multiple increments to a cell per batch. This might givebetter performance if there are multiple increments to a limited number of cells.
timeout60000The length of time (in milliseconds) the sink waits for acks from hbase forall events in a transaction.
serializerorg.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.Properties to be passed to the serializer.
async.Properties to be passed to asyncHbase library.These properties have precedence over the old zookeeperQuorum and znodeParent values.You can find the list of the available properties atthe documentation page of AsyncHBase.

Note that this sink takes the Zookeeper Quorum and parent znode information inthe configuration. Zookeeper Quorum and parent node configuration may bespecified in the flume configuration file. Alternatively, these configurationvalues are taken from the first hbase-site.xml file in the classpath.

If these are not provided in the configuration, then the sinkwill read this information from the first hbase-site.xml file in the classpath.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = asynchbase
  4. a1.sinks.k1.table = foo_table
  5. a1.sinks.k1.columnFamily = bar_cf
  6. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
  7. a1.sinks.k1.channel = c1

MorphlineSolrSink

This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications.

This sink is well suited for use cases that stream raw data into HDFS (via the HdfsSink) and simultaneously extract, transform and load the same data into Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary heterogeneous raw data from disparate data sources and turn it into a data model that is useful to Search applications.

The ETL functionality is customizable using a morphline configuration file that defines a chain of transformation commands that pipe event records from one command to another.

Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline command is a bit like a Flume Interceptor. Morphlines can be embedded into Hadoop components such as Flume.

Commands to parse and transform a set of standard data formats such as log files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional data formats can be added as morphline plugins. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed.

Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava’s ArrayListMultimap, which is a ListMultimap). Note that a field can have multiple values and any two records need not use common field names.

This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. The commands can then act on this data.

Routing to a SolrCloud cluster is supported to improve scalability. Indexing load can be spread across a large number of MorphlineSolrSinks for improved scalability. Indexing load can be replicated across multiple MorphlineSolrSinks for high availability, for example using Flume features such as Load balancing Sink Processor. MorphlineInterceptor can also help to implement dynamic routing to multiple Solr collections (e.g. for multi-tenancy).

The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation.

The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink

Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFileThe relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf
morphlineIdnullOptional name used to identify a morphline if there are multiple morphlines in a morphline config file
batchSize1000The maximum number of events to take per flume transaction.
batchDurationMillis1000The maximum duration per flume transaction (ms). The transaction commits after this duration or when batchSize is exceeded, whichever comes first.
handlerClassorg.apache.flume.sink.solr.morphline.MorphlineHandlerImplThe FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler
isProductionModefalseThis flag should be enabled for mission critical, large-scale online production systems that need to make progress without downtime when unrecoverable exceptions occur. Corrupt or malformed parser input data, parser bugs, and errors related to unknown Solr schema fields produce unrecoverable exceptions.
recoverableExceptionClassesorg.apache.solr.client.solrj.SolrServerExceptionComma separated list of recoverable exceptions that tend to be transient, in which case the corresponding task can be retried. Examples include network connection errors, timeouts, etc. When the production mode flag is set to true, the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.
isIgnoringRecoverableExceptionsfalseThis flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable. This enables the sink to make progress and avoid retrying an event forever.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
  6. # a1.sinks.k1.morphlineId = morphline1
  7. # a1.sinks.k1.batchSize = 1000
  8. # a1.sinks.k1.batchDurationMillis = 1000

ElasticSearchSink

This sink writes data to an elasticsearch cluster. By default, events will be written so that the Kibana graphical interfacecan display them - just as if logstash wrote them.

The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation.Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor versionof the JVM. SerializationExceptions will appear if this is incorrect. Toselect the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch clientlibrary which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once theelasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agentwhich is running the ElasticSearchSink should also match the JVM the target cluster is running down to the minor version.

Events will be written to a new index every day. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sinkwill start writing to a new index at midnight UTC.

Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer by default. This behaviour can beoverridden with the serializer parameter. This parameter accepts implementations of org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializeror org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory. Implementing ElasticSearchEventSerializer is deprecated in favour ofthe more powerful ElasticSearchIndexRequestBuilderFactory.

The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink

Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNamesComma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used
indexNameflumeThe name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header
indexTypelogsThe type to index the document to, defaults to ‘log’Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header
clusterNameelasticsearchName of the ElasticSearch cluster to connect to
batchSize100Number of events to be written per txn.
ttlTTL in days, when set will cause the expired documents to be deleted automatically,if not set documents will never be automatically deleted. TTL is accepted both in the earlier form ofinteger only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
serializerorg.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializerThe ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations ofeither class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.
serializer.*Properties to be passed to the serializer.

Note

Header substitution is a handy to use the value of an event header to dynamically decide the indexName and indexType to use when storing the event.Caution should be used in using this feature as the event submitter now has control of the indexName and indexType.Furthermore, if the elasticsearch REST client is used then the event submitter has control of the URL path used.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = elasticsearch
  4. a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
  5. a1.sinks.k1.indexName = foo_index
  6. a1.sinks.k1.indexType = bar_type
  7. a1.sinks.k1.clusterName = foobar_cluster
  8. a1.sinks.k1.batchSize = 500
  9. a1.sinks.k1.ttl = 5d
  10. a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
  11. a1.sinks.k1.channel = c1

Kite Dataset Sink

Experimental sink that writes events to a Kite Dataset.This sink will deserialize the body of each incoming event and store theresulting record in a Kite Dataset. It determines target Dataset by loading adataset by URI.

The only supported serialization is avro, and the record schema must be passedin the event headers, using either flume.avro.schema.literal with the JSONschema representation or flume.avro.schema.url with a URL where the schemamay be found (hdfs:/… URIs are supported). This is compatible with theLog4jAppender flume client and the spooling directory source’s Avrodeserializer using deserializer.schemaType = LITERAL.

Note 1: The flume.avro.schema.hash header is not supported.Note 2: In some cases, file rolling may occur slightly after the roll intervalhas been exceeded. However, this delay will not exceed 5 seconds. In mostcases, the delay is neglegible.

Property NameDefaultDescription
channel
typeMust be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uriURI of the dataset to open
kite.repo.uriURI of the repository to open(deprecated; use kite.dataset.uri instead)
kite.dataset.namespaceNamespace of the Dataset where records will be written(deprecated; use kite.dataset.uri instead)
kite.dataset.nameName of the Dataset where records will be written(deprecated; use kite.dataset.uri instead)
kite.batchSize100Number of records to process in each batch
kite.rollInterval30Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatchtrueIf true, the Flume transaction will be commited and thewriter will be flushed on each batch of kite.batchSizerecords. This setting only applies to flushable datasets. Whentrue, it’s possible for temp files with commited data to beleft in the dataset directory. These files need to be recoveredby hand for the data to be visible to DatasetReaders.
kite.syncable.syncOnBatchtrueControls whether the sink will also sync data when committingthe transaction. This setting only applies to syncable datasets.Syncing gaurentees that data will be written on stable storageon the remote system while flushing only gaurentees that datahas left Flume’s client buffers. When thekite.flushable.commitOnBatch property is set to false,this property must also be set to false.
kite.entityParseravroParser that turns Flume Events into Kite entities.Valid values are avro and the fully-qualified class nameof an implementation of the EntityParser.Builder interface.
kite.failurePolicyretryPolicy that handles non-recoverable errors such as a missingSchema in the Event header. The default value, retry,will fail the current batch and try again which matches the oldbehavior. Other valid values are save, which will write theraw Event to the kite.error.dataset.uri dataset, and thefully-qualified class name of an implementation of theFailurePolicy.Builder interface.
kite.error.dataset.uriURI of the dataset where failed events are saved whenkite.failurePolicy is set to save. Required whenthe kite.failurePolicy is set to save.
auth.kerberosPrincipalKerberos user principal for secure authentication to HDFS
auth.kerberosKeytabKerberos keytab location (local FS) for the principal
auth.proxyUserThe effective user for HDFS actions, if different fromthe kerberos principal

Kafka Sink

This is a Flume Sink implementation that can publish data to aKafka topic. One of the objective is to integrate Flumewith Kafka so that pull based processing systems can process the data comingthrough various Flume sources.

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Required properties are marked in bold font.

Property NameDefaultDescription
typeMust be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.serversList of brokers Kafka-Sink will connect to, to get the list of topic partitionsThis can be a partial list of brokers, but we recommend at least two for HA.The format is comma separated list of hostname:port
kafka.topicdefault-flume-topicThe topic in Kafka to which the messages will be published. If this parameter is configured,messages will be published to this topic.If the event header contains a “topic” field, the event will be published to that topicoverriding the topic configured here.Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”.(If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)
flumeBatchSize100How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks1How many replicas must acknowledge a message before its considered successfully written.Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)Set this to -1 to avoid data loss in some cases of leader failure.
useFlumeEventFormatfalseBy default events are put as bytes onto the Kafka topic directly from the event body. Set totrue to store events as the Flume Avro binary format. Used in conjunction with the same propertyon the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserveany Flume headers for the producing side.
defaultPartitionIdSpecifies a Kafka partition ID (integer) for all events in this channel to be sent to, unlessoverriden by partitionIdHeader. By default, if this property is not set, events will bedistributed by the Kafka Producer’s partitioner - including by key if specified (or by apartitioner specified by kafka.partitioner.class).
partitionIdHeaderWhen set, the sink will take the value of the field named using the value of this propertyfrom the event header and send the message to the specified partition of the topic. If thevalue represents an invalid partition, an EventDeliveryException will be thrown. If the header valueis present then this setting overrides defaultPartitionId.
allowTopicOverridetrueWhen set, the sink will allow a message to be produced into a topic specified by the topicHeader property (if provided).
topicHeadertopicWhen set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property.Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.
kafka.producer.security.protocolPLAINTEXTSet to SASLPLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
_more producer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additionalproperties that need to be set on producer.
Other Kafka Producer PropertiesThese properties are used to configure the Kafka Producer. Any producer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.producer.For example: kafka.producer.linger.ms

Note

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka.If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink.If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same keywill be sent to the same partition. If the key is null, events will be sent to random partitions.

The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer)and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Property NameDefaultDescription
brokerListUse kafka.bootstrap.servers
topicdefault-flume-topicUse kafka.topic
batchSize100Use kafka.flumeBatchSize
requiredAcks1Use kafka.producer.acks

An example configuration of a Kafka sink is given below. Properties startingwith the prefix kafka.producer the Kafka producer. The properties that are passed when creating the Kafkaproducer are not limited to the properties given in this example.Also it is possible to include your custom properties here and access them insidethe preprocessor through the Flume Context object passed in as a methodargument.

  1. a1.sinks.k1.channel = c1
  2. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  3. a1.sinks.k1.kafka.topic = mytopic
  4. a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
  5. a1.sinks.k1.kafka.flumeBatchSize = 20
  6. a1.sinks.k1.kafka.producer.acks = 1
  7. a1.sinks.k1.kafka.producer.linger.ms = 1
  8. a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.producer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled,the magnitude of which depends on the CPU type and the JVM implementation.Reference: Kafka security overviewand the jira for tracking this issue:KAFKA-2561

TLS and Kafka Sink:

Please read the steps described in Configuring Kafka Clients SSLto learn about additional configuration settings for fine tuning for example any of the following:security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SSL
  5. # optional, the global truststore can be used alternatively
  6. a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  7. a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead.For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithmis not defined, so hostname verification is not performed.In order to enable hostname verification, set the following properties

  1. a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN)against one of the following two fields:

  • Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  • Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
    If client side authentication is also required then additionally the following needs to be added to Flume agentconfiguration or the global SSL setup can be used (see SSL/TLS support section).Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers eitherindividually or by their signature chain. Common example is to sign each client certificate by a single Root CAwhich in turn is trusted by Kafka brokers.
  1. # optional, the global keystore can be used alternatively
  2. a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
  3. a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

If keystore and key use different password protection then ssl.key.password property willprovide the required additional secret for producer keystore:

  1. a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink:

To use Kafka sink with a Kafka cluster secured with Kerberos, set the producer.security.protocol property noted above for producer.The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed.See Kafka docfor information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
  5. a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
  6. a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
  5. a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
  6. a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
  7. # optional, the global truststore can be used alternatively
  8. a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  9. a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)in Kafka documentation of SASL configuration.Unlike the Kafka Source or Kafka Channel a “Client” section is not required, unless it is needed by other connecting components. Also please make surethat the operating system user of the Flume processes has read privileges on the jaas and keytab files.

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };

HTTP Sink

Behaviour of this sink is that it will take events from the channel, andsend those events to a remote service using an HTTP POST request. The eventcontent is sent as the POST body.

Error handling behaviour of this sink depends on the HTTP response returnedby the target server. The sink backoff/ready status is configurable, as is thetransaction commit/rollback result and whether the event contributes to thesuccessful event drain count.

Any malformed HTTP response returned by the server where the status code isnot readable will result in a backoff signal and the event is not consumedfrom the channel.

Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be http.
endpointThe fully qualified URL endpoint to POST to
connectTimeout5000The socket connection timeout in milliseconds
requestTimeout5000The maximum request processing time in milliseconds
contentTypeHeadertext/plainThe HTTP Content-Type header
acceptHeadertext/plainThe HTTP Accept header value
defaultBackofftrueWhether to backoff by default on receiving all HTTP status codes
defaultRollbacktrueWhether to rollback by default on receiving all HTTP status codes
defaultIncrementMetricsfalseWhether to increment metrics by default on receiving all HTTP status codes
backoff.CODEConfigures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODEConfigures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODEConfigures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

Note that the most specific HTTP status code match is used for the backoff,rollback and incrementMetrics configuration options. If there are configurationvalues for both 2XX and 200 status codes, then 200 HTTP codes will use the 200value, and all other HTTP codes in the 201-299 range will use the 2XX value.

Any empty or null events are consumed without any request being made to theHTTP endpoint.

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = http
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.endpoint = http://localhost:8080/someuri
  6. a1.sinks.k1.connectTimeout = 2000
  7. a1.sinks.k1.requestTimeout = 2000
  8. a1.sinks.k1.acceptHeader = application/json
  9. a1.sinks.k1.contentTypeHeader = application/json
  10. a1.sinks.k1.defaultBackoff = true
  11. a1.sinks.k1.defaultRollback = true
  12. a1.sinks.k1.defaultIncrementMetrics = false
  13. a1.sinks.k1.backoff.4XX = false
  14. a1.sinks.k1.rollback.4XX = false
  15. a1.sinks.k1.incrementMetrics.4XX = true
  16. a1.sinks.k1.backoff.200 = false
  17. a1.sinks.k1.rollback.200 = false
  18. a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

A custom sink is your own implementation of the Sink interface. A customsink’s class and its dependencies must be included in the agent’s classpathwhen starting the Flume agent. The type of the custom sink is its FQCN.Required properties are in bold.

Property NameDefaultDescription
channel
typeThe component type name, needs to be your FQCN

Example for agent named a1:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = org.example.MySink
  4. a1.sinks.k1.channel = c1