Monitoring

Monitoring in Flume is still a work in progress. Changes can happen very often.Several Flume components report metrics to the JMX platform MBean server. Thesemetrics can be queried using Jconsole.

Available Component Metrics

The following tables show what metrics are available for components. Each component only maintains aset of metrics, indicated by an ‘x’, the unmaintained ones show default values, that is 0.These tables tell you where you can expect meaningful data.The name of the metrics should be descriptive enough, for more information you have to dig into thesource code of the components.

Sources 1

AvroExecHTTPJMSKafkaMultiportSyslogTCPScribe
AppendAcceptedCountx
AppendBatchAcceptedCountx xx
AppendBatchReceivedCountx xx
AppendReceivedCountx
ChannelWriteFailx xxxxx
EventAcceptedCountxxxxxxx
EventReadFail xxxxx
EventReceivedCountxxxxxxx
GenericProcessingFail x x
KafkaCommitTimer x
KafkaEmptyCount x
KafkaEventGetTimer x
OpenConnectionCountx

Sources 2

SequenceGeneratorSpoolDirectorySyslogTcpSyslogUDPTaildirThrift
AppendAcceptedCount x
AppendBatchAcceptedCountxx xx
AppendBatchReceivedCount x xx
AppendReceivedCount x
ChannelWriteFailxxxxxx
EventAcceptedCountxxxxxx
EventReadFail xxxx
EventReceivedCount xxxxx
GenericProcessingFail x x
KafkaCommitTimer
KafkaEmptyCount
KafkaEventGetTimer
OpenConnectionCount

Sinks 1

Avro/ThriftAsyncHBaseElasticSearchHBaseHBase2
BatchCompleteCountxxxxx
BatchEmptyCountxxxxx
BatchUnderflowCountxxxxx
ChannelReadFailx x
ConnectionClosedCountxxxxx
ConnectionCreatedCountxxxxx
ConnectionFailedCountxxxxx
EventDrainAttemptCountxxxxx
EventDrainSuccessCountxxxxx
EventWriteFailx x
KafkaEventSendTimer
RollbackCount

Sinks 2

HDFSEventHiveHttpKafkaMorphlineRollingFile
BatchCompleteCountxx x
BatchEmptyCountxx xx
BatchUnderflowCountxx xx
ChannelReadFailxxxxxx
ConnectionClosedCountxx x
ConnectionCreatedCountxx x
ConnectionFailedCountxx x
EventDrainAttemptCountxxx xx
EventDrainSuccessCountxxxxxx
EventWriteFailxxxxxx
KafkaEventSendTimer x
RollbackCount x

Channels

FileKafkaMemoryPseudoTxnMemorySpillableMemory
ChannelCapacityx x x
ChannelSizex xxx
CheckpointBackupWriteErrorCountx
CheckpointWriteErrorCountx
EventPutAttemptCountxxxxx
EventPutErrorCountx
EventPutSuccessCountxxxxx
EventTakeAttemptCountxxxxx
EventTakeErrorCountx
EventTakeSuccessCountxxxxx
KafkaCommitTimer x
KafkaEventGetTimer x
KafkaEventSendTimer x
Openx
RollbackCounter x
Unhealthyx

JMX Reporting

JMX Reporting can be enabled by specifying JMX parameters in the JAVA_OPTS environment variable usingflume-env.sh, like


export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”

NOTE: The sample above disables the security. To enable Security, please refer http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia Reporting

Flume can also report these metrics toGanglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agentmust be started with this support. The Flume agent has to be started by passingin the following parameters as system properties prefixed by flume.monitoring.,and can be specified in the flume-env.sh:

Property NameDefaultDescription
typeThe component type name, has to be ganglia
hostsComma-separated list of hostname:port of Ganglia servers
pollFrequency60Time, in seconds, between consecutive reporting to Ganglia server
isGanglia3falseGanglia server version is 3. By default, Flume sends in Ganglia 3.1 format

We can start Flume with Ganglia support as follows:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hostsa Web server on a configurable port. Flume reports metrics in the following JSON format:

  1. {
  2. "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
  3. "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
  4. }

Here is an example:

  1. {
  2. "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
  3. "Type":"CHANNEL",
  4. "StopTime":"0",
  5. "EventPutAttemptCount":"468086",
  6. "ChannelSize":"233428",
  7. "StartTime":"1344882233070",
  8. "EventTakeSuccessCount":"458200",
  9. "ChannelCapacity":"600000",
  10. "EventTakeAttemptCount":"458288"},
  11. "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
  12. "Type":"CHANNEL",
  13. "StopTime":"0",
  14. "EventPutAttemptCount":"22948908",
  15. "ChannelSize":"5",
  16. "StartTime":"1344882209413",
  17. "EventTakeSuccessCount":"22948900",
  18. "ChannelCapacity":"100",
  19. "EventTakeAttemptCount":"22948908"}
  20. }
Property NameDefaultDescription
typeThe component type name, has to be http
port41414The port to start the server on.

We can start Flume with JSON Reporting support as follows:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

Metrics will then be available at http://<hostname>:<port>/metrics webpage.Custom components can report metrics as mentioned in the Ganglia section above.

Custom Reporting

It is possible to report metrics to other systems by writing servers that dothe reporting. Any reporting class has to implement the interface,org.apache.flume.instrumentation.MonitorService. Such a class can be usedthe same way the GangliaServer is used for reporting. They can poll the platformmbean server to poll the mbeans for metrics. For example, if an HTTPmonitoring service called HTTPReporting can be used as follows:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property NameDefaultDescription
typeThe component type name, has to be FQCN

Reporting metrics from custom components

Any custom flume components should inherit from theorg.apache.flume.instrumentation.MonitoredCounterGroup class. The classshould then provide getter methods for each of the metrics it exposes. Seethe code below. The MonitoredCounterGroup expects a list of attributes whosemetrics are exposed by this class. As of now, this class only supports exposingmetrics as long values.

  1. public class SinkCounter extends MonitoredCounterGroup implements
  2. SinkCounterMBean {
  3.  
  4. private static final String COUNTER_CONNECTION_CREATED =
  5. "sink.connection.creation.count";
  6.  
  7. private static final String COUNTER_CONNECTION_CLOSED =
  8. "sink.connection.closed.count";
  9.  
  10. private static final String COUNTER_CONNECTION_FAILED =
  11. "sink.connection.failed.count";
  12.  
  13. private static final String COUNTER_BATCH_EMPTY =
  14. "sink.batch.empty";
  15.  
  16. private static final String COUNTER_BATCH_UNDERFLOW =
  17. "sink.batch.underflow";
  18.  
  19. private static final String COUNTER_BATCH_COMPLETE =
  20. "sink.batch.complete";
  21.  
  22. private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
  23. "sink.event.drain.attempt";
  24.  
  25. private static final String COUNTER_EVENT_DRAIN_SUCCESS =
  26. "sink.event.drain.sucess";
  27.  
  28. private static final String[] ATTRIBUTES = {
  29. COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
  30. COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
  31. COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
  32. COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  33. };
  34.  
  35.  
  36. public SinkCounter(String name) {
  37. super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  38. }
  39.  
  40. @Override
  41. public long getConnectionCreatedCount() {
  42. return get(COUNTER_CONNECTION_CREATED);
  43. }
  44.  
  45. public long incrementConnectionCreatedCount() {
  46. return increment(COUNTER_CONNECTION_CREATED);
  47. }
  48.  
  49. }