Flume Interceptors

Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptorsare classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor canmodify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supportschaining of interceptors. This is made possible through by specifying the list of interceptor builder class namesin the configuration. Interceptors are specified as a whitespace separated list in the source configuration.The order in which the interceptors are specified is the order in which they are invoked.The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptorscan modify or drop events. If an interceptor needs to drop events, it just does not return that event inthe list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptorsare named components, here is an example of how they are created through configuration:

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.r1.interceptors = i1 i2
  5. a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
  6. a1.sources.r1.interceptors.i1.preserveExisting = false
  7. a1.sources.r1.interceptors.i1.hostHeader = hostname
  8. a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  9. a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
  10. a1.sinks.k1.channel = c1

Note that the interceptor builders are passed to the type config parameter. The interceptors are themselvesconfigurable and can be passed configuration values just like they are passed to any other configurable component.In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptorare then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN)or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also usethe HostInterceptor.

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptorinserts a header with key timestamp (or as specified by the header property) whose value is the relevant timestamp.This interceptor can preserve an existing timestamp if it is already present in the configuration.

Property NameDefaultDescription
typeThe component type name, has to be timestamp or the FQCN
headerNametimestampThe name of the header in which to place the generated timestamp.
preserveExistingfalseIf the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.channels = c1
  4. a1.sources.r1.type = seq
  5. a1.sources.r1.interceptors = i1
  6. a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a headerwith key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

Property NameDefaultDescription
typeThe component type name, has to be host
preserveExistingfalseIf the host header already exists, should it be preserved - true or false
useIPtrueUse the IP Address if true, else use hostname.
hostHeaderhostThe header key to be used.

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.interceptors = i1
  4. a1.sources.r1.interceptors.i1.type = host

Static Interceptor

Static interceptor allows user to append a static header with static value to all events.

The current implementation does not allow specifying multiple headers at one time. Instead user might chainmultiple static interceptors each defining one static header.

Property NameDefaultDescription
typeThe component type name, has to be static
preserveExistingtrueIf configured header already exists, should it be preserved - true or false
keykeyName of header that should be created
valuevalueStatic value that should be created

Example for agent named a1:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.channels = c1
  4. a1.sources.r1.type = seq
  5. a1.sources.r1.interceptors = i1
  6. a1.sources.r1.interceptors.i1.type = static
  7. a1.sources.r1.interceptors.i1.key = datacenter
  8. a1.sources.r1.interceptors.i1.value = NEW_YORK

Remove Header Interceptor

This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified.

Note that if only one header needs to be removed, specifying it by name provides performance benefits over the other 2 methods.

Property NameDefaultDescription
typeThe component type name has to be remove_header
withNameName of the header to remove
fromListList of headers to remove, separated with the separator specified by fromListSeparator
fromListSeparator\s,\sRegular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters
matchingAll the headers which names match this regular expression are removed

UUID Interceptor

This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.

Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.

Property NameDefaultDescription
typeThe component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerNameidThe name of the Flume header to modify
preserveExistingtrueIf the UUID header already exists, should it be preserved - true or false
prefix“”The prefix string constant to prepend to each generated UUID

Morphline Interceptor

This interceptor filters the events through a morphline configuration file that defines a chain of transformation commands that pipe records from one command to another.For example the morphline can ignore certain events or alter or insert certain event headers via regular expression based pattern matching, or it can auto-detect and set a MIME type via Apache Tika on events that are intercepted. For example, this kind of packet sniffing can be used for content based dynamic routing in a Flume topology.MorphlineInterceptor can also help to implement dynamic routing to multiple Apache Solr collections (e.g. for multi-tenancy).

Currently, there is a restriction in that the morphline of an interceptor must not generate more than one output record for each input event. This interceptor is not intended for heavy duty ETL processing - if you need this consider moving ETL processing from the Flume Source to a Flume Sink, e.g. to a MorphlineSolrSink.

Required properties are in bold.

Property NameDefaultDescription
typeThe component type name has to be org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
morphlineFileThe relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf
morphlineIdnullOptional name used to identify a morphline if there are multiple morphlines in a morphline config file

Sample flume.conf file:

  1. a1.sources.avroSrc.interceptors = morphlineinterceptor
  2. a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
  3. a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
  4. a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

Search and Replace Interceptor

This interceptor provides simple string-based search-and-replace functionalitybased on Java regular expressions. Backtracking / group capture is also available.This interceptor uses the same rules as in the Java Matcher.replaceAll() method.

Property NameDefaultDescription
typeThe component type name has to be search_replace
searchPatternThe pattern to search for and replace.
replaceStringThe replacement string.
charsetUTF-8The charset of the event body. Assumed by default to be UTF-8.

Example configuration:

  1. a1.sources.avroSrc.interceptors = search-replace
  2. a1.sources.avroSrc.interceptors.search-replace.type = search_replace
  3.  
  4. # Remove leading alphanumeric characters in an event body.
  5. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
  6. a1.sources.avroSrc.interceptors.search-replace.replaceString =

Another example:

  1. a1.sources.avroSrc.interceptors = search-replace
  2. a1.sources.avroSrc.interceptors.search-replace.type = search_replace
  3.  
  4. # Use grouping operators to reorder and munge words on a line.
  5. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
  6. a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

Regex Filtering Interceptor

This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression.The supplied regular expression can be used to include events or exclude events.

Property NameDefaultDescription
typeThe component type name has to be regex_filter
regex”.*”Regular expression for matching against events
excludeEventsfalseIf true, regex determines events to exclude, otherwise regex determinesevents to include.

Regex Extractor Interceptor

This interceptor extracts regex match groups using a specified regular expression and appends the match groups as headers on the event.It also supports pluggable serializers for formatting the match groups before adding them as event headers.

Property NameDefaultDescription
typeThe component type name has to be regex_extractor
regexRegular expression for matching against events
serializersSpace-separated list of serializers for mapping matches to header names and serializing theirvalues. (See example below)Flume provides built-in support for the following serializers:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializers.<s1>.typedefaultMust be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer,or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
serializers.<s1>.name
serializers.*Serializer-specific properties

The serializers are used to map the matches to a header name and a formatted header value; by default, you only need to specifythe header name and the default org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer will be used.This serializer simply maps the matches to the specified header name and passes the value through as it was extracted by the regex.You can plug custom serializer implementations into the extractor using the fully qualified class name (FQCN) to format the matchesin anyway you like.

Example 1:

If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used

  1. a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
  2. a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
  3. a1.sources.r1.interceptors.i1.serializers.s1.name = one
  4. a1.sources.r1.interceptors.i1.serializers.s2.name = two
  5. a1.sources.r1.interceptors.i1.serializers.s3.name = three

The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3

Example 2:

If the Flume event body contained 2012-10-18 18:47:57,614 some log line and the following configuration was used

  1. a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
  2. a1.sources.r1.interceptors.i1.serializers = s1
  3. a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
  4. a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
  5. a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

the extracted event will contain the same body but the following headers will have been added timestamp=>1350611220000