Metric Reporters

Flink allows reporting metrics to external systems. For more information about Flink’s metric system go to the metric system documentation.

Reporter

Metrics can be exposed to an external system by configuring one or several reporters in conf/flink-conf.yaml. These reporters will be instantiated on each job and task manager when they are started.

  • metrics.reporter.<name>.<config>: Generic setting <config> for the reporter named <name>.
  • metrics.reporter.<name>.class: The reporter class to use for the reporter named <name>.
  • metrics.reporter.<name>.factory.class: The reporter factory class to use for the reporter named <name>.
  • metrics.reporter.<name>.interval: The reporter interval to use for the reporter named <name>.
  • metrics.reporter.<name>.scope.delimiter: The delimiter to use for the identifier (default value use metrics.scope.delimiter) for the reporter named <name>.
  • metrics.reporter.<name>.scope.variables.excludes: (optional) A semi-colon (;) separate list of variables that should be ignored by tag-based reporters (e.g., Prometheus, InfluxDB).
  • metrics.reporters: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
  • metrics.reporter.<name>.scope.variables.additional: (optional) A comma separated map of variables and their values, which are separated by a colon (:). These mappings are added to the variable map by tag-based reporters (e.g. Prometheux, InfluxDB).

All reporters must at least have either the class or factory.class property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information. Some reporters (referred to as Scheduled) allow specifying a reporting interval. Below more settings specific to each reporter will be listed.

Example reporter configuration that specifies multiple reporters:

  1. metrics.reporters: my_jmx_reporter,my_other_reporter
  2. metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
  3. metrics.reporter.my_jmx_reporter.port: 9020-9040
  4. metrics.reporter.my_jmx_reporter.scope.variables.excludes: job_id;task_attempt_num
  5. metrics.reporter.my_jmx_reporter.scope.variables.additional: cluster_name:my_test_cluster,tag_name:tag_value
  6. metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
  7. metrics.reporter.my_other_reporter.host: 192.168.1.1
  8. metrics.reporter.my_other_reporter.port: 10000

Important: The jar containing the reporter must be accessible when Flink is started. Reporters that support the factory.class property can be loaded as plugins. Otherwise the jar must be placed in the /lib folder. Reporters that are shipped with Flink (i.e., all reporters documented on this page) are available by default.

You can write your own Reporter by implementing the org.apache.flink.metrics.reporter.MetricReporter interface. If the Reporter should send out reports regularly you have to implement the Scheduled interface as well. By additionally implementing a MetricReporterFactory your reporter can also be loaded as a plugin.

The following sections list the supported reporters.

JMX

You don’t have to include an additional dependency since the JMX reporter is available by default but not activated.

Parameters:

  • port - (optional) the port on which JMX listens for connections. In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260. When a range is specified the actual port is shown in the relevant job or task manager log. If this setting is set Flink will start an extra JMX connector for the given port/range. Metrics are always available on the default local JMX interface.

Example configuration:

  1. metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
  2. metrics.reporter.jmx.port: 8789

Metrics exposed through JMX are identified by a domain and a list of key-properties, which together form the object name.

The domain always begins with org.apache.flink followed by a generalized metric identifier. In contrast to the usual identifier it is not affected by scope-formats, does not contain any variables and is constant across jobs. An example for such a domain would be org.apache.flink.job.task.numBytesOut.

The key-property list contains the values for all variables, regardless of configured scope formats, that are associated with a given metric. An example for such a list would be host=localhost,job_name=MyJob,task_name=MyTask.

The domain thus identifies a metric class, while the key-property list identifies one (or multiple) instances of that metric.

Graphite

Parameters:

  • host - the Graphite server host
  • port - the Graphite server port
  • protocol - protocol to use (TCP/UDP)

Example configuration:

  1. metrics.reporter.grph.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
  2. metrics.reporter.grph.host: localhost
  3. metrics.reporter.grph.port: 2003
  4. metrics.reporter.grph.protocol: TCP
  5. metrics.reporter.grph.interval: 60 SECONDS

InfluxDB

In order to use this reporter you must copy /opt/flink-metrics-influxdb-1.15.0.jar into the plugins/influxdb folder of your Flink distribution.

Parameters:

KeyDefaultTypeDescription
connectTimeout
10000Integer(optional) the InfluxDB connect timeout for metrics
consistency
ONE

Enum

(optional) the InfluxDB consistency level for metrics

Possible values:
  • “ALL”
  • “ANY”
  • “ONE”
  • “QUORUM”
db
(none)Stringthe InfluxDB database to store metrics
host
(none)Stringthe InfluxDB server host
password
(none)String(optional) InfluxDB username’s password used for authentication
port
8086Integerthe InfluxDB server port
retentionPolicy
(none)String(optional) the InfluxDB retention policy for metrics
scheme
http

Enum

the InfluxDB schema

Possible values:
  • “http”
  • “https”
username
(none)String(optional) InfluxDB username used for authentication
writeTimeout
10000Integer(optional) the InfluxDB write timeout for metrics

Example configuration:

  1. metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
  2. metrics.reporter.influxdb.scheme: http
  3. metrics.reporter.influxdb.host: localhost
  4. metrics.reporter.influxdb.port: 8086
  5. metrics.reporter.influxdb.db: flink
  6. metrics.reporter.influxdb.username: flink-metrics
  7. metrics.reporter.influxdb.password: qwerty
  8. metrics.reporter.influxdb.retentionPolicy: one_hour
  9. metrics.reporter.influxdb.consistency: ANY
  10. metrics.reporter.influxdb.connectTimeout: 60000
  11. metrics.reporter.influxdb.writeTimeout: 60000
  12. metrics.reporter.influxdb.interval: 60 SECONDS

The reporter would send metrics using http protocol to the InfluxDB server with the specified retention policy (or the default policy specified on the server). All Flink metrics variables (see List of all Variables) are exported as InfluxDB tags.

Prometheus

Parameters:

  • port - (optional) the port the Prometheus exporter listens on, defaults to 9249. In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260.
  • filterLabelValueCharacters - (optional) Specifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements.

Example configuration:

  1. metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink metric types are mapped to Prometheus metric types as follows:

FlinkPrometheusNote
CounterGaugePrometheus counters cannot be decremented.
GaugeGaugeOnly numbers and booleans are supported.
HistogramSummaryQuantiles .5, .75, .95, .98, .99 and .999
MeterGaugeThe gauge exports the meter’s rate.

All Flink metrics variables (see List of all Variables) are exported to Prometheus as labels.

PrometheusPushGateway

Parameters:

KeyDefaultTypeDescription
deleteOnShutdown
trueBooleanSpecifies whether to delete metrics from the PushGateway on shutdown. Flink will try its best to delete the metrics but this is not guaranteed. See here for more details.
filterLabelValueCharacters
trueBooleanSpecifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements.
groupingKey
(none)StringSpecifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by ‘=’, and labels are separated by ‘;’, e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the Prometheus requirements.
hostUrl
(none)StringThe PushGateway server host URL including scheme, host name, and port.
jobName
(none)StringThe job name under which metrics will be pushed
randomJobNameSuffix
trueBooleanSpecifies whether a random suffix should be appended to the job name.

Example configuration:

  1. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  2. metrics.reporter.promgateway.hostUrl: http://localhost:9091
  3. metrics.reporter.promgateway.jobName: myJob
  4. metrics.reporter.promgateway.randomJobNameSuffix: true
  5. metrics.reporter.promgateway.deleteOnShutdown: false
  6. metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
  7. metrics.reporter.promgateway.interval: 60 SECONDS

The PrometheusPushGatewayReporter pushes metrics to a Pushgateway, which can be scraped by Prometheus.

Please see the Prometheus documentation for use-cases.

StatsD

Parameters:

  • host - the StatsD server host
  • port - the StatsD server port

Example configuration:

  1. metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
  2. metrics.reporter.stsd.host: localhost
  3. metrics.reporter.stsd.port: 8125
  4. metrics.reporter.stsd.interval: 60 SECONDS

Datadog

Note any variables in Flink metrics, such as <host>, <job_name>, <tm_id>, <subtask_index>, <task_name>, and <operator_name>, will be sent to Datadog as tags. Tags will look like host:localhost and job_name:myjobname.

Note Histograms are exposed as a series of gauges following the naming convention of Datadog histograms (<metric_name>.<aggregation>). The min aggregation is reported by default, whereas sum is not available. In contrast to Datadog-provided Histograms the reported aggregations are not computed for a specific reporting interval.

Parameters:

  • apikey - the Datadog API key
  • tags - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
  • proxyHost - (optional) The proxy host to use when sending to Datadog.
  • proxyPort - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
  • dataCenter - (optional) The data center (EU/US) to connect to, defaults to US.
  • maxMetricsPerRequest - (optional) The maximum number of metrics to include in each request, defaults to 2000.

Example configuration:

  1. metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
  2. metrics.reporter.dghttp.apikey: xxx
  3. metrics.reporter.dghttp.tags: myflinkapp,prod
  4. metrics.reporter.dghttp.proxyHost: my.web.proxy.com
  5. metrics.reporter.dghttp.proxyPort: 8080
  6. metrics.reporter.dghttp.dataCenter: US
  7. metrics.reporter.dghttp.maxMetricsPerRequest: 2000
  8. metrics.reporter.dghttp.interval: 60 SECONDS

Slf4j

Example configuration:

  1. metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
  2. metrics.reporter.slf4j.interval: 60 SECONDS