监控

Flume的监控系统的完善仍在进行中,变化可能会比较频繁,有几个Flume组件会向JMX平台MBean服务器报告运行指标。 可以使用Jconsole查询这些指标数据。

JMX Reporting

MX监控可以通过在flume-env.sh脚本中修改JAVA_OPTS环境变量中的JMX参数来开启,比如这样:

  1. export JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

警告

注意:上面的JVM启动参数例子里面没有开启安全验证,如果要开启请参考:http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia Reporting

Flume也可以向Ganglia 3或Ganglia 3.1报告运行指标数据。想要开启这个功能,必须在Agent启动时候指定。Flume Agent在启动时候必须制定下面这些参数并在参数前面加上前缀「flume.monitoring.」来配置,也可以在flume-env.sh中设定这些参数。


属性

默认值

解释

type



组件类型,这个是: ganglia

hosts



hostname:port 格式的 Ganglia 服务列表,多个用逗号分隔

pollFrequency

60

向Ganglia服务器报告数据的时间间隔(秒)

isGanglia3

false

设置为true后Ganglia的版本兼容为Ganglia3,默认情况下Flume发送的数据是Ganglia3.1格式的

我们可以在启动时这样开启Ganglia支持:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

提示

看上面这个启动脚本,其中 -Dflume.monitoring.type=ganglia 以及后面的参数都是按照上面描述的规则配置的,就是「固定的前缀+参数=参数值」的形式。

JSON Reporting

Flume也支持以JSON格式报告运行指标。为了对外提供这些报告数据,Flume会在某个端口(可自定义)上运行一个web服务来提供这些数据,以下面这种格式:

  1. {
  2. "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
  3. "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
  4. }

下面是一个具体的报告例子:

  1. {
  2. "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
  3. "Type":"CHANNEL",
  4. "StopTime":"0",
  5. "EventPutAttemptCount":"468086",
  6. "ChannelSize":"233428",
  7. "StartTime":"1344882233070",
  8. "EventTakeSuccessCount":"458200",
  9. "ChannelCapacity":"600000",
  10. "EventTakeAttemptCount":"458288"},
  11. "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
  12. "Type":"CHANNEL",
  13. "StopTime":"0",
  14. "EventPutAttemptCount":"22948908",
  15. "ChannelSize":"5",
  16. "StartTime":"1344882209413",
  17. "EventTakeSuccessCount":"22948900",
  18. "ChannelCapacity":"100",
  19. "EventTakeAttemptCount":"22948908"}
  20. }

属性

默认值

解释

type



组件类型,这个是: http

port

41414

查看json报告的端口

启用JSON报告的启动脚本示例:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

启动后可以通过这个地址 http://<hostname>:<port>/metrics 来查看报告,自定义组件可以报告上面Ganglia部分中提到的指标数据。

Custom Reporting

可以通过编写自己的执行报告服务向其他系统报告运行指标。 报告类必须实现org.apache.flume.instrumentation.MonitorService 接口。自定义的报告类与GangliaServer的报告用法相同。 他们可以轮询请求mbean服务器获取mbeans的运行指标。 例如,假设一个命名为为HTTPReporting的HTTP监视服务,启动脚本如下所示:

  1. $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332

属性

默认值

解释

type



自定义报告组件的全限定类名

Reporting metrics from custom components

自定义Flume监控组件必须应继承自 org.apache.flume.instrumentation.MonitoredCounterGroup 类。 然后,该类应为其公开的每个度量指标提供getter方法。 请参阅下面的代码。 MonitoredCounterGroup 需要一个此类要提供的监控属性列表。 目前仅支持将监控指标值设置为long型。

  1. public class SinkCounter extends MonitoredCounterGroup implements
  2. SinkCounterMBean {
  3.  
  4. private static final String COUNTER_CONNECTION_CREATED =
  5. "sink.connection.creation.count";
  6.  
  7. private static final String COUNTER_CONNECTION_CLOSED =
  8. "sink.connection.closed.count";
  9.  
  10. private static final String COUNTER_CONNECTION_FAILED =
  11. "sink.connection.failed.count";
  12.  
  13. private static final String COUNTER_BATCH_EMPTY =
  14. "sink.batch.empty";
  15.  
  16. private static final String COUNTER_BATCH_UNDERFLOW =
  17. "sink.batch.underflow";
  18.  
  19. private static final String COUNTER_BATCH_COMPLETE =
  20. "sink.batch.complete";
  21.  
  22. private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
  23. "sink.event.drain.attempt";
  24.  
  25. private static final String COUNTER_EVENT_DRAIN_SUCCESS =
  26. "sink.event.drain.sucess";
  27.  
  28. private static final String[] ATTRIBUTES = {
  29. COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
  30. COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
  31. COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
  32. COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  33. };
  34.  
  35.  
  36. public SinkCounter(String name) {
  37. super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  38. }
  39.  
  40. @Override
  41. public long getConnectionCreatedCount() {
  42. return get(COUNTER_CONNECTION_CREATED);
  43. }
  44.  
  45. public long incrementConnectionCreatedCount() {
  46. return increment(COUNTER_CONNECTION_CREATED);
  47. }
  48.  
  49. }