Setup

Setting up an agent

Flume agent configuration is stored in a local configuration file. This is atext file that follows the Java properties file format.Configurations for one or more agents can be specified in the sameconfiguration file. The configuration file includes properties of each source,sink and channel in an agent and how they are wired together to form dataflows.

Configuring individual components

Each component (source, sink or channel) in the flow has a name, type, and setof properties that are specific to the type and instantiation. For example, anAvro source needs a hostname (or IP address) and a port number to receive datafrom. A memory channel can have max queue size (“capacity”), and an HDFS sinkneeds to know the file system URI, path to create files, frequency of filerotation (“hdfs.rollInterval”) etc. All such attributes of a component needs tobe set in the properties file of the hosting Flume agent.

Wiring the pieces together

The agent needs to know what individual components to load and how they areconnected in order to constitute the flow. This is done by listing the names ofeach of the sources, sinks and channels in the agent, and then specifying theconnecting channel for each sink and source. For example, an agent flows eventsfrom an Avro source called avroWeb to HDFS sink hdfs-cluster1 via a filechannel called file-channel. The configuration file will contain names of thesecomponents and file-channel as a shared channel for both avroWeb source andhdfs-cluster1 sink.

Starting an agent

An agent is started using a shell script called flume-ng which is located inthe bin directory of the Flume distribution. You need to specify the agentname, the config directory, and the config file on the command line:

  1. $ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

Now the agent will start running source and sinks configured in the givenproperties file.

A simple example

Here, we give an example configuration file, describing a single-node Flume deployment.This configuration lets a user generate events and subsequently logs them to the console.

  1. # example.conf: A single-node Flume configuration
  2.  
  3. # Name the components on this agent
  4. a1.sources = r1
  5. a1.sinks = k1
  6. a1.channels = c1
  7.  
  8. # Describe/configure the source
  9. a1.sources.r1.type = netcat
  10. a1.sources.r1.bind = localhost
  11. a1.sources.r1.port = 44444
  12.  
  13. # Describe the sink
  14. a1.sinks.k1.type = logger
  15.  
  16. # Use a channel which buffers events in memory
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100
  20.  
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1

This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channelthat buffers event data in memory, and a sink that logs event data to the console. The configuration file names thevarious components, then describes their types and configuration parameters. A given configuration file might defineseveral named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

Given this configuration file, we can start Flume as follows:

  1. $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Note that in a full deployment we would typically include one more option: —conf=<conf-dir>.The <conf-dir> directory would include a shell script flume-env.sh and potentially a log4j properties file.In this example, we pass a Java option to force Flume to log to the console and we go without a custom environment script.

From a separate terminal, we can then telnet port 44444 and send Flume an event:

  1. $ telnet localhost 44444
  2. Trying 127.0.0.1...
  3. Connected to localhost.localdomain (127.0.0.1).
  4. Escape character is '^]'.
  5. Hello world! <ENTER>
  6. OK

The original Flume terminal will output the event in a log message.

  1. 12/06/19 15:32:19 INFO source.NetcatSource: Source starting
  2. 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
  3. 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

Congratulations - you’ve successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail.

Using environment variables in configuration files

Flume has the ability to substitute environment variables in the configuration. For example:

  1. a1.sources = r1
  2. a1.sources.r1.type = netcat
  3. a1.sources.r1.bind = 0.0.0.0
  4. a1.sources.r1.port = ${NC_PORT}
  5. a1.sources.r1.channels = c1

NB: it currently works for values only, not for keys. (Ie. only on the “right side” of the = mark of the config lines.)

This can be enabled via Java system properties on agent invocation by setting propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties.

  • For example::
  • $ NC_PORT=44444 bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

Note the above is just an example, environment variables can be configured in other ways, including being set in conf/flume-env.sh.

Logging raw data

Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour inmany production environments because this may result in leaking sensitive data or security relatedconfigurations, such as secret keys, to Flume log files.By default, Flume will not log such information. On the other hand, if the data pipeline is broken,Flume will attempt to provide clues for debugging the problem.

One way to debug problems with event pipelines is to set up an additional Memory Channelconnected to a Logger Sink, which will output all event data to the Flume logs.In some situations, however, this approach is insufficient.

In order to enable logging of event- and configuration-related data, some Java system propertiesmust be set in addition to log4j properties.

To enable configuration-related logging, set the Java system property-Dorg.apache.flume.log.printconfig=true. This can either be passed on the command line or bysetting this in the JAVAOPTS variable in _flume-env.sh.

To enable data logging, set the Java system property -Dorg.apache.flume.log.rawdata=truein the same way described above. For most components, the log4j logging level must also be set toDEBUG or TRACE to make event-specific logging appear in the Flume logs.

Here is an example of enabling both configuration logging and raw data logging while alsosetting the Log4j loglevel to DEBUG for console output:

  1. $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

Zookeeper based Configuration

Flume supports Agent configurations via Zookeeper. This is an experimental feature. The configuration file needs to be uploadedin the Zookeeper, under a configurable prefix. The configuration file is stored in Zookeeper Node data.Following is how the Zookeeper Node tree would look like for agents a1 and a2

  1. - /flume
  2. |- /a1 [Agent config file]
  3. |- /a2 [Agent config file]

Once the configuration file is uploaded, start the agent with following options


$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
Argument NameDefaultDescription
zZookeeper connection string. Comma separated list of hostname:port
p/flumeBase Path in Zookeeper to store Agent configurations

Installing third-party plugins

Flume has a fully plugin-based architecture. While Flume ships with manyout-of-the-box sources, channels, sinks, serializers, and the like, manyimplementations exist which ship separately from Flume.

While it has always been possible to include custom Flume components byadding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file,Flume now supports a special directory called plugins.d which automaticallypicks up plugins that are packaged in a specific format. This allows for easiermanagement of plugin packaging issues as well as simpler debugging andtroubleshooting of several classes of issues, especially library dependencyconflicts.

The plugins.d directory

The plugins.d directory is located at $FLUME_HOME/plugins.d. At startuptime, the flume-ng start script looks in the plugins.d directory forplugins that conform to the below format and includes them in proper paths whenstarting up java.

Directory layout for plugins

Each plugin (subdirectory) within plugins.d can have up to threesub-directories:

  • lib - the plugin’s jar(s)
  • libext - the plugin’s dependency jar(s)
  • native - any required native libraries, such as .so files
    Example of two plugins within the plugins.d directory:
  1. plugins.d/
  2. plugins.d/custom-source-1/
  3. plugins.d/custom-source-1/lib/my-source.jar
  4. plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
  5. plugins.d/custom-source-2/
  6. plugins.d/custom-source-2/lib/custom.jar
  7. plugins.d/custom-source-2/native/gettext.so

Data ingestion

Flume supports a number of mechanisms to ingest data from external sources.

RPC

An Avro client included in the Flume distribution can send a given file toFlume Avro source using avro RPC mechanism:

  1. $ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

The above command will send the contents of /usr/logs/log.10 to to the Flumesource listening on that ports.

Executing commands

There’s an exec source that executes a given command and consumes the output. Asingle ‘line’ of output ie. text followed by carriage return (‘\r’) or linefeed (‘\n’) or both together.

Network streams

Flume supports the following mechanisms to read data from popular log streamtypes, such as:

  • Avro
  • Thrift
  • Syslog
  • Netcat

Setting multi-agent flow

Two agents communicating over Avro RPC

In order to flow the data across multiple agents or hops, the sink of theprevious agent and source of the current hop need to be avro type with the sinkpointing to the hostname (or IP address) and port of the source.

Consolidation

A very common scenario in log collection is a large number of log producingclients sending data to a few consumer agents that are attached to the storagesubsystem. For example, logs collected from hundreds of web servers sent to adozen of agents that write to HDFS cluster.

A fan-in flow using Avro RPC to consolidate events in one place

This can be achieved in Flume by configuring a number of first tier agents withan avro sink, all pointing to an avro source of single agent (Again you coulduse the thrift sources/sinks/clients in such a scenario). This sourceon the second tier agent consolidates the received events into a singlechannel which is consumed by a sink to its final destination.

Multiplexing the flow

Flume supports multiplexing the event flow to one or more destinations. This isachieved by defining a flow multiplexer that can replicate or selectively routean event to one or more channels.

A fan-out flow using a (multiplexing) channel selector

The above example shows a source from agent “foo” fanning out the flow to threedifferent channels. This fan out can be replicating or multiplexing. In case ofreplicating flow, each event is sent to all three channels. For themultiplexing case, an event is delivered to a subset of available channels whenan event’s attribute matches a preconfigured value. For example, if an eventattribute called “txnType” is set to “customer”, then it should go to channel1and channel3, if it’s “vendor” then it should go to channel2, otherwisechannel3. The mapping can be set in the agent’s configuration file.