度量

Flink公开了一个度量系统,允许收集和公开指标到外部系统。

注册指标

您可以通过调用从扩展RichFunction任何用户函数访问度量标准系统getRuntimeContext().getMetricGroup()此方法返回一个MetricGroup对象,您可以在该对象上创建和注册新指标。

度量类型

Flink支持CountersGaugesHistogramsMeters

计数器

A Counter用于计算某些东西。可以使用inc()/inc(long n)来Reduce当前值dec()/dec(long n)您可以创建并注册Counter调用counter(String name)MetricGroup

  1. public class MyMapper extends RichMapFunction<String, String> {
  2. private transient Counter counter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .counter("myCounter");
  8. }
  9. @Override
  10. public String map(String value) throws Exception {
  11. this.counter.inc();
  12. return value;
  13. }
  14. }
  1. class MyMapper extends RichMapFunction[String,String] {
  2. @transient private var counter: Counter = _
  3. override def open(parameters: Configuration): Unit = {
  4. counter = getRuntimeContext()
  5. .getMetricGroup()
  6. .counter("myCounter")
  7. }
  8. override def map(value: String): String = {
  9. counter.inc()
  10. value
  11. }
  12. }

或者,您也可以使用自己的Counter实现:

  1. public class MyMapper extends RichMapFunction<String, String> {
  2. private transient Counter counter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .counter("myCustomCounter", new CustomCounter());
  8. }
  9. @Override
  10. public String map(String value) throws Exception {
  11. this.counter.inc();
  12. return value;
  13. }
  14. }
  1. class MyMapper extends RichMapFunction[String,String] {
  2. @transient private var counter: Counter = _
  3. override def open(parameters: Configuration): Unit = {
  4. counter = getRuntimeContext()
  5. .getMetricGroup()
  6. .counter("myCustomCounter", new CustomCounter())
  7. }
  8. override def map(value: String): String = {
  9. counter.inc()
  10. value
  11. }
  12. }

测量

A根据需要Gauge提供任何类型的值。为了使用a,Gauge您必须首先创建一个实现该org.apache.flink.metrics.Gauge接口的类返回值的类型没有限制。你可以通过调用注册一个计gauge(String name, Gauge gauge)MetricGroup

  1. public class MyMapper extends RichMapFunction<String, String> {
  2. private transient int valueToExpose = 0;
  3. @Override
  4. public void open(Configuration config) {
  5. getRuntimeContext()
  6. .getMetricGroup()
  7. .gauge("MyGauge", new Gauge<Integer>() {
  8. @Override
  9. public Integer getValue() {
  10. return valueToExpose;
  11. }
  12. });
  13. }
  14. @Override
  15. public String map(String value) throws Exception {
  16. valueToExpose++;
  17. return value;
  18. }
  19. }
  1. new class MyMapper extends RichMapFunction[String,String] {
  2. @transient private var valueToExpose = 0
  3. override def open(parameters: Configuration): Unit = {
  4. getRuntimeContext()
  5. .getMetricGroup()
  6. .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  7. }
  8. override def map(value: String): String = {
  9. valueToExpose += 1
  10. value
  11. }
  12. }

请注意,报告会将公开的对象转换为a String,这意味着需要进行有意义的toString()实现。

直方图

A Histogram衡量长值的分布。你可以通过调用注册一个histogram(String name, Histogram histogram)上一个MetricGroup

  1. public class MyMapper extends RichMapFunction<Long, Long> {
  2. private transient Histogram histogram;
  3. @Override
  4. public void open(Configuration config) {
  5. this.histogram = getRuntimeContext()
  6. .getMetricGroup()
  7. .histogram("myHistogram", new MyHistogram());
  8. }
  9. @Override
  10. public Long map(Long value) throws Exception {
  11. this.histogram.update(value);
  12. return value;
  13. }
  14. }
  1. class MyMapper extends RichMapFunction[Long,Long] {
  2. @transient private var histogram: Histogram = _
  3. override def open(parameters: Configuration): Unit = {
  4. histogram = getRuntimeContext()
  5. .getMetricGroup()
  6. .histogram("myHistogram", new MyHistogram())
  7. }
  8. override def map(value: Long): Long = {
  9. histogram.update(value)
  10. value
  11. }
  12. }

Flink没有提供默认实现Histogram,但提供了一个允许使用Codahale / DropWizard直方图Wrapper要使用此打包,请在以下内容中添加以下依赖项pom.xml

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-metrics-dropwizard</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

然后你可以像这样注册一个Codahale / DropWizard直方图:

  1. public class MyMapper extends RichMapFunction<Long, Long> {
  2. private transient Histogram histogram;
  3. @Override
  4. public void open(Configuration config) {
  5. com.codahale.metrics.Histogram dropwizardHistogram =
  6. new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
  7. this.histogram = getRuntimeContext()
  8. .getMetricGroup()
  9. .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
  10. }
  11. @Override
  12. public Long map(Long value) throws Exception {
  13. this.histogram.update(value);
  14. return value;
  15. }
  16. }
  1. class MyMapper extends RichMapFunction[Long, Long] {
  2. @transient private var histogram: Histogram = _
  3. override def open(config: Configuration): Unit = {
  4. com.codahale.metrics.Histogram dropwizardHistogram =
  5. new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
  6. histogram = getRuntimeContext()
  7. .getMetricGroup()
  8. .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  9. }
  10. override def map(value: Long): Long = {
  11. histogram.update(value)
  12. value
  13. }
  14. }

仪表

A Meter衡量平均吞吐量。可以使用该markEvent()方法注册事件的发生可以使用markEvent(long n)方法注册同时发生多个事件你可以通过调用注册一个仪表meter(String name, Meter meter)MetricGroup

  1. public class MyMapper extends RichMapFunction<Long, Long> {
  2. private transient Meter meter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.meter = getRuntimeContext()
  6. .getMetricGroup()
  7. .meter("myMeter", new MyMeter());
  8. }
  9. @Override
  10. public Long map(Long value) throws Exception {
  11. this.meter.markEvent();
  12. return value;
  13. }
  14. }
  1. class MyMapper extends RichMapFunction[Long,Long] {
  2. @transient private var meter: Meter = _
  3. override def open(config: Configuration): Unit = {
  4. meter = getRuntimeContext()
  5. .getMetricGroup()
  6. .meter("myMeter", new MyMeter())
  7. }
  8. override def map(value: Long): Long = {
  9. meter.markEvent()
  10. value
  11. }
  12. }

Flink提供了一个允许使用Codahale / DropWizard表打包器要使用此打包,请在以下内容中添加以下依赖项pom.xml

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-metrics-dropwizard</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

然后你可以像这样注册一个Codahale / DropWizard仪表:

  1. public class MyMapper extends RichMapFunction<Long, Long> {
  2. private transient Meter meter;
  3. @Override
  4. public void open(Configuration config) {
  5. com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
  6. this.meter = getRuntimeContext()
  7. .getMetricGroup()
  8. .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
  9. }
  10. @Override
  11. public Long map(Long value) throws Exception {
  12. this.meter.markEvent();
  13. return value;
  14. }
  15. }
  1. class MyMapper extends RichMapFunction[Long,Long] {
  2. @transient private var meter: Meter = _
  3. override def open(config: Configuration): Unit = {
  4. com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
  5. meter = getRuntimeContext()
  6. .getMetricGroup()
  7. .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  8. }
  9. override def map(value: Long): Long = {
  10. meter.markEvent()
  11. value
  12. }
  13. }

范围

为每个度量标准分配一个标识符和一组键值对,在该键值对下将报告度量标准。

标识符基于3个组件:注册度量标准时的用户定义名称,可选的用户定义范围和系统提供的范围。例如,如果A.B是系统范围,C.D用户范围和E名称,则度量标识符将是A.B.C.D.E

您可以.通过设置metrics.scope.delimiterKeys来配置要用于标识符的分隔符(默认值:) conf/flink-conf.yaml

用户范围

你可以通过调用定义用户范围MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value)这些方法影响什么MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents返回。

  1. counter = getRuntimeContext()
  2. .getMetricGroup()
  3. .addGroup("MyMetrics")
  4. .counter("myCounter");
  5. counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .addGroup("MyMetricsKey", "MyMetricsValue")
  8. .counter("myCounter");
  1. counter = getRuntimeContext()
  2. .getMetricGroup()
  3. .addGroup("MyMetrics")
  4. .counter("myCounter")
  5. counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .addGroup("MyMetricsKey", "MyMetricsValue")
  8. .counter("myCounter")

系统范围

系统范围包含有关度量标准的上下文信息,例如,它在哪个任务中注册或该任务属于哪个作业。

可以通过设置以下键来配置应包含哪些上下文信息conf/flink-conf.yaml这些键中的每一个都期望一个格式字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),它们将在运行时被替换。

  • metrics.scope.jm
    • 默认值: .jobmanager
    • 应用于作用域JobManager的所有指标。
  • metrics.scope.jm.job
    • 默认值: .jobmanager。
    • 应用于作用于JobManager和作业的所有度量标准。
  • metrics.scope.tm
    • 默认值: .taskmanager。
    • 应用于作用于TaskManager的所有度量标准。
  • metrics.scope.tm.job
    • 默认值: .taskmanager。
    • 应用于作用于TaskManager和作业的所有度量标准。
  • metrics.scope.task
    • 默认值: .taskmanager。
    • 应用于作用于任务的所有指标。
  • metrics.scope.operator
    • 默认值: .taskmanager。
    • 应用于作用于算子的所有指标。变量的数量或顺序没有限制。变量区分大小写。

算子指标的默认范围将产生类似于的标识符 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

如果您还想包含任务名称但省略TaskManager信息,则可以指定以下格式:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric

请注意,对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用通过包含ID(例如<job_id>)或通过为作业和 算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:
  • TaskManager:
  • 作业:,<作业名称>
  • 任务:
  • 算子:要点:对于Batch API,<operator_id>始终等于<task_id>。

用户变量

您可以通过调用来定义用户变量MetricGroup#addGroup(String key, String value)这种方法会影响什么MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables()返回。

重要提示:用户变量不能用于范围格式。

  1. counter = getRuntimeContext()
  2. .getMetricGroup()
  3. .addGroup("MyMetricsKey", "MyMetricsValue")
  4. .counter("myCounter");
  1. counter = getRuntimeContext()
  2. .getMetricGroup()
  3. .addGroup("MyMetricsKey", "MyMetricsValue")
  4. .counter("myCounter")

报告

通过配置一个或多个报告,可以将度量标准暴露给外部系统conf/flink-conf.yaml这些报告将在每个工作和TaskManager启动时进行实例化。

  • metrics.reporter.<name>.<config><config>报告的通用设置命名<name>
  • metrics.reporter.<name>.class:报告类用于为报告命名<name>
  • metrics.reporter.<name>.interval:报告间隔用于报告的名字<name>
  • metrics.reporter.<name>.scope.delimiter:用于名称的报告者的标识符(默认值使用metrics.scope.delimiter)的分隔符<name>
  • metrics.reporters:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。所有报告必须至少拥有该class财产,其中一些允许指定报告interval下面,我们将列出针对每位报告的更多设置。

示例报表配置,指定多个报告:

  1. metrics.reporters: my_jmx_reporter,my_other_reporter
  2. metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
  3. metrics.reporter.my_jmx_reporter.port: 9020-9040
  4. metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
  5. metrics.reporter.my_other_reporter.host: 192.168.1.1
  6. metrics.reporter.my_other_reporter.port: 10000

重要说明:启动Flink时,通过将其放在/ lib文件夹中,可以访问包含报告者的jar。

您可以Reporter通过实现org.apache.flink.metrics.reporter.MetricReporter接口编写自己如果Reporter应定期发送报告,您还必须实现该Scheduled接口。

以下部分列出了受支持的报告。

您不必包含其他依赖项,因为默认情况下JMX报告器可用但未激活。

参数:

  • port - (可选)JMX侦听连接的端口。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260。指定范围时,实际端口将显示在相关作业或TaskManager日志中。如果设置此设置,Flink将为给定的端口/范围启动额外的JMX连接器。度量标准始终在默认的本地JMX界面上可用。配置示例:
  1. metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
  2. metrics.reporter.jmx.port: 8789

通过JMX公开的度量标准由域和键属性列表标识,这些键属性一起形成对象名称。

域始终以org.apache.flink广义度量标识符开头与通常的标识符相反,它不受作用域格式的影响,不包含任何变量,并且在作业中保持不变。这种域的一个例子是org.apache.flink.job.task.numBytesOut

键属性列表包含与给定度量关联的所有变量的值,无论配置的范围格式如何。这样一个列表的一个例子是host=localhost,job_name=MyJob,task_name=MyTask

因此,域标识度量标准类,而关键属性列表标识该度量标准的一个(或多个)实例。

要使用此报告,您必须复制/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host-下配置的的gmond主机地址udp_recv_channel.bindgmond.conf
  • port-下配置的端口的gmond udp_recv_channel.portgmond.conf
  • tmax - 应保存旧度量标准的软限制
  • dmax - 应保存旧指标多长时间的硬限制
  • ttl - 传输的UDP数据包的生存时间
  • addressingMode - 要使用的UDP寻址模式(UNICAST / MULTICAST)配置示例:
  1. metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
  2. metrics.reporter.gang.host: localhost
  3. metrics.reporter.gang.port: 8649
  4. metrics.reporter.gang.tmax: 60
  5. metrics.reporter.gang.dmax: 0
  6. metrics.reporter.gang.ttl: 1
  7. metrics.reporter.gang.addressingMode: MULTICAST

要使用此报告,您必须复制/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host - Graphite服务器主机
  • port - Graphite服务器端口
  • protocol - 使用协议(TCP / UDP)配置示例:
  1. metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
  2. metrics.reporter.grph.host: localhost
  3. metrics.reporter.grph.port: 2003
  4. metrics.reporter.grph.protocol: TCP

要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • port- (可选)Prometheus导出器侦听的端口,默认为9249。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260配置示例:
  1. metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink度量标准类型映射到Prometheus度量标准类型,如下所示:

Flink Prometheus 注意
计数器测量 Prometheus 计数器不能Reduce。
测量测量仅支持数字和布尔值。
直方图概要分位数.5,.75,.95,.98,.99和.999
仪表测量仪表输出仪表的速率。

所有Flink度量变量(请参阅所有变量列表)都将作为标签导出到Prometheus。

要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

默认描述
##### deleteOnShutdowntrue指定是否在关闭时从PushGateway中删除指标。
##### Host(none)PushGateway服务器主机。
##### jobName(none)将推送指标的作业名称
##### port-1PushGateway服务器端口。
##### randomJobNameSuffixtrue指定是否应将随机后缀附加到作业名称。

配置示例:

  1. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  2. metrics.reporter.promgateway.host: localhost
  3. metrics.reporter.promgateway.port: 9091
  4. metrics.reporter.promgateway.jobName: myJob
  5. metrics.reporter.promgateway.randomJobNameSuffix: true
  6. metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter将指标推送到Pushgateway,可由Prometheus 抓取

有关用例,请参阅Prometheus文档

要使用此报告,您必须复制/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host - StatsD服务器主机
  • port - StatsD服务器端口配置示例:
  1. metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
  2. metrics.reporter.stsd.host: localhost
  3. metrics.reporter.stsd.port: 8125

要使用此报告,您必须复制/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

注意Flink指标,如任何变量<host><job_name><tm_id><subtask_index><task_name>,和<operator_name>,将被发送到Datadog的标签。标签看起来像host:localhostjob_name:myjobname

参数:

  • apikey - Datadog APIKeys
  • tags - (可选)发送到Datadog时将应用于度量标准的全局标记。标签应仅以逗号分隔配置示例:
  1. metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
  2. metrics.reporter.dghttp.apikey: xxx
  3. metrics.reporter.dghttp.tags: myflinkapp,prod

要使用此报告,您必须复制/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

配置示例:

  1. metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
  2. metrics.reporter.slf4j.interval: 60 SECONDS

系统指标

默认情况下,Flink会收集几个指标,这些指标可以提供有关当前状态的深入见解。本节是所有这些指标的参考。

下表通常包含5列:

  • “范围”列描述了用于生成系统范围的范围格式。例如,如果单元格包含“Operator”,则使用“metrics.scope.operator”的范围格式。如果单元格包含多个值(以斜杠分隔),则会针对不同的实体多次报告度量标准,例如作业和TaskManager。

  • (可选)“Infix”列描述了哪个中缀附加到系统范围。

  • “度量标准”列列出了为给定范围和中缀注册的所有度量标准的名称。

  • “描述”列提供有关给定度量正在测量的信息。

  • “类型”列描述了用于测量的度量类型。

请注意,中缀/指标名称列中的所有点仍受“metrics.delimiter”设置的约束。

因此,为了推断度量标识符:

  • 根据“范围”列获取范围格式
  • 如果存在,则将值附加到“中缀”列中,并考虑“metrics.delimiter”设置
  • 附加指标名称。

中央处理器

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.CPU加载JVM最近的CPU使用情况。测量
时间JVM使用的CPU时间。测量

内存

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.MemoryHeap.Used当前使用的堆内存量(以字节为单位)。测量
Heap.Committed保证可供JVM使用的堆内存量(以字节为单位)。测量
Heap.Max可用于内存管理的最大堆内存量(以字节为单位)。测量
NonHeap.Used当前使用的非堆内存量(以字节为单位)。测量
NonHeap.Committed保证JVM可用的非堆内存量(以字节为单位)。测量
NonHeap.Max可用于内存管理的最大非堆内存量(以字节为单位)。测量
Direct.Count直接缓冲池中的缓冲区数。测量
Direct.MemoryUsedJVM用于直接缓冲池的内存量(以字节为单位)。测量
Direct.TotalCapacity直接缓冲池中所有缓冲区的总容量(以字节为单位)。测量
Mapped.Count映射缓冲池中的缓冲区数。测量
Mapped.MemoryUsedJVM用于映射缓冲池的内存量(以字节为单位)。测量
Mapped.TotalCapacity映射缓冲池中的缓冲区数(以字节为单位)。测量

线程

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.Threads计数活动线程总数。测量

垃圾收集

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.GarbageCollector<GarbageCollector> .Count之间已发生的集合总数。测量
<GarbageCollector>。时间执行垃圾收集所花费的总时间。测量

类加载器

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.ClassLoaderClassesLoaded自JVM启动以来加载的类总数。测量
ClassesUnloaded自JVM启动以来卸载的类总数。测量

网络

范围中缀度量描述类型
TaskManagerStatus.NetworkAvailableMemorySegments未使用的内存段数。测量
TotalMemorySegments分配的内存段数。测量
TaskbuffersinputQueueLength排队的输入缓冲区数。测量
outputQueueLength排队输出缓冲区的数量。测量
inPoolUsage估计输入缓冲区的使用情况。测量
outPoolUsage估计输出缓冲区的使用情况。测量
Network.<Input|Output>.<gate>(only available if taskmanager.net.detailed-metrics config option is set)totalQueueLen所有输入/输出通道中排队缓冲区的总数。测量
minQueueLen所有输入/输出通道中的最小排队缓冲区数。测量
maxQueueLen所有输入/输出通道中的最大排队缓冲区数。测量
avgQueueLen所有输入/输出通道中的平均缓冲区数。测量

集群

范围度量描述类型
JobManagernumRegisteredTaskManagers注册任务管理员的数量。测量
numRunningJobs正在运行的作业数量。测量
taskSlotsAvailable可用任务槽的数量。测量
taskSlotsTotal任务槽的总数。测量

可用性

范围度量描述类型
Job (only available on JobManager)restartingTime重新启动作业所花费的时间,或当前重新启动的持续时间(以毫秒为单位)。测量
uptime作业运行的时间不间断。对于已完成的作业,返回-1(以毫秒为单位)。测量
downtime对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(以毫秒为单位)。测量
fullRestarts自提交此作业以来完全重新启动的总次数。测量

检查点

范围度量描述类型
Job (only available on JobManager)lastCheckpointDuration完成最后一个检查点所花费的时间(以毫秒为单位)。测量
lastCheckpointSize最后一个检查点的总大小(以字节为单位)。测量
lastCheckpointExternalPath存储最后一个外部检查点的路径。测量
lastCheckpointRestoreTimestamp在协调器上恢复最后一个检查点时的时间戳(以毫秒为单位)。测量
lastCheckpointAlignmentBuffered在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(以字节为单位)。测量
numberOfInProgressCheckpoints进行中检查点的数量。测量
numberOfCompletedCheckpoints成功完成检查点的数量。测量
numberOfFailedCheckpoints失败检查点的数量。测量
totalNumberOfCheckpoints总检查点的数量(正在进行,已完成,失败)。测量
TaskcheckpointAlignmentTime最后一次屏障对齐完成所花费的时间(以纳秒为单位),或当前对齐到目前为止所用的时间(以纳秒为单位)。测量

IO

范围度量描述类型
Job (only available on TaskManager)<SOURCE_ID> <source_subtask_index> <operator_id> <operator_subtask_index> .latency从给定源子任务到算子子任务的延迟分布(以毫秒为单位)。直方图
任务numBytesInLocal此任务从本地源读取的总字节数。计数器
numBytesInLocalPerSecond此任务每秒从本地源读取的字节数。仪表
numBytesInRemote此任务从远程源读取的总字节数。计数器
numBytesInRemotePerSecond此任务每秒从远程源读取的字节数。仪表
numBuffersInLocal此任务从本地源读取的网络缓冲区总数。计数器
numBuffersInLocalPerSecond此任务每秒从本地源读取的网络缓冲区数。仪表
numBuffersInRemote此任务从远程源读取的网络缓冲区总数。计数器
numBuffersInRemotePerSecond此任务每秒从远程源读取的网络缓冲区数。仪表
numBytesOut此任务已发出的总字节数。计数器
numBytesOutPerSecond此任务每秒发出的字节数。仪表
numBuffersOut此任务已发出的网络缓冲区总数。计数器
numBuffersOutPerSecond此任务每秒发出的网络缓冲区数。仪表
任务/算子numRecordsIn此 算子/任务已收到的记录总数。计数器
numRecordsInPerSecond此 算子/任务每秒接收的记录数。仪表
numRecordsOut此 算子/任务已发出的记录总数。计数器
numRecordsOutPerSecond此 算子/任务每秒发送的记录数。仪表
numLateRecordsDropped此算子/任务因迟到而丢失的记录数。计数器
currentInputWatermark此 算子/任务收到的​​最后一个水印(以毫秒为单位)。注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。测量
算子currentInput1Watermark此 算子在其第一个输入(毫秒)中收到的最后一个水印。注意:仅适用于具有2个输入的算子。测量
currentInput2Watermark此 算子在其第二个输入中接收的最后一个水印(以毫秒为单位)。注意:仅适用于具有2个输入的算子。测量
currentOutputWatermark此 算子发出的最后一个水印(以毫秒为单位)。测量
numSplitsProcessed此数据源已处理的InputSplits总数(如果 算子是数据源)。测量

连接器

Kafka连接器

范围度量用户变量描述类型
算子commitsSucceededN / A如果启用了偏移提交并且启用了检查点,则成功向Kafka提交的偏移提交总数。计数器
算子commitsFailedN / A如果启用了偏移提交并且启用了检查点,则Kafka的偏移提交失败总数。请注意,将偏移量提交回Kafka只是暴露消费者进度的一种方法,因此提交失败不会影响Flink的检查点分区偏移的完整性。计数器
算子committedOffsetsTopic,分区对于每个分区,最后成功提交到Kafka的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。测量
算子currentOffsetsTopic,分区消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。测量

Kinesis连接器

范围度量用户变量描述类型
算子millisBehindLateststream,shardId对于每个Kinesis分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。测量
算子sleepTimeMillisstream,shardId消费者在从Kinesis获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。测量
算子maxNumberOfRecordsPerFetchstream,shardId消费者在单个getRecords调用Kinesis时请求的最大记录数。如果ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS设置为true,则自适应地计算此值以最大化Kinesis的2 Mbps读取限制。测量
算子numberOfAggregatedRecordsPerFetchstream,shardId消费者在单个getRecords调用Kinesis时获取的聚合Kinesis记录数。测量
算子numberOfDeggregatedRecordsPerFetchstream,shardId消费者在单个getRecords调用Kinesis时获取的分解Kinesis记录的数量。测量
算子averageRecordSizeBytesstream,shardIdKinesis记录的平均大小(以字节为单位),由消费者在单个getRecords调用中获取。测量
算子runLoopTimeNanosstream,shardId消费者在运行循环中花费的实际时间(以纳秒为单位)。测量
算子loopFrequencyHzstream,shardId一秒钟内调用getRecords的次数。测量
算子bytesRequestedPerFetchstream,shardId在一次调用getRecords中请求的字节数(2 Mbps / loopFrequencyHz)。测量

系统资源

默认情况下禁用系统资源报告。metrics.system-resource启用下面列出的指标将是可利用的作业-与TaskManager。系统资源度量标准会定期更新,并显示已配置间隔(metrics.system-resource-probing-interval)的平均值

系统资源报告要求在类路径上存在可选的依赖项(例如,放在Flink的lib目录中):

  • com.github.oshi:oshi-core:3.4.0 (根据EPL 1.0许可证授权)包括它的传递依赖:

  • net.java.dev.jna:jna-platform:jar:4.2.2

  • net.java.dev.jna:jna:jar:4.2.2这方面的失败将被报告为启动期间NoClassDefFoundError记录的警告消息SystemResourcesMetricsInitializer

系统CPU

范围中缀度量描述
Job-/TaskManagerSystem.CPU用法机器上CPU使用率的总体百分比。
机器上CPU空闲使用率的百分比。
SYS计算机上系统CPU使用率的百分比。
用户计算机上用户CPU使用率的百分比。
IOWAIT计算机上IOWait CPU使用率的百分比。
IRQ机器上Irq CPU使用率的百分比。
软中断计算机上SoftIrq CPU使用率的百分比。
尼斯在机器上使用Nice Idle的百分比。
Load1min平均CPU负载超过1分钟
Load5min平均CPU负载超过5分钟
Load15min平均CPU负载超过15分钟
UsageCPU *每个处理器的CPU使用率百分比

系统内存

范围中缀度量描述
Job-/TaskManagerSystem.Memory可得到可用内存字节数
总内存(字节)
System.Swap用过的使用的交换字节
总交换字节数

系统网络

范围中缀度量描述
Job-/TaskManagerSystem.Network.INTERFACE_NAMEReceiveRate平均接收速率,以每秒字节数为单位
SendRate平均发送速率,以字节/秒为单位

延迟跟踪

Flink允许跟踪通过系统传输的记录的延迟。默认情况下禁用此函数。为了使延迟跟踪你必须设置latencyTrackingInterval在无论是正数Flink配置ExecutionConfig

latencyTrackingInterval,源将定期发出一个特殊的记录,称为LatencyMarker标记包含从源发出记录时的时间戳。延迟标记不能超过常规用户记录,因此如果记录在算子面前排队,则会增加标记跟踪的延迟。

请注意,延迟标记不会考虑用户记录在算子中绕过它们的时间。特别是标记不考虑记录在窗口缓冲区中花费的时间。只有当算子无法接受新记录,因此他们排队时,使用标记测量的延迟才会反映出来。

所有中间 算子都会保存n每个源的最后一个延迟列表,以计算延迟分布。接收器算子保存每个源的列表,以及每个并行源实例,以允许检测由各个机器引起的延迟问题。

目前,Flink假定群集中所有计算机的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP)以避免错误的延迟结果。

警告启用延迟指标可能会显着影响群集的性能。强烈建议仅将它们用于调试目的。

REST API集成

可以通过Monitoring REST API查询度量标准

下面是可用端点列表,带有示例JSON响应。所有端点都是样本表单http://hostname:8081/jobmanager/metrics,下面我们仅列出URL 路径部分。

尖括号中的值是变量,例如http://hostname:8081/jobs/<jobid>/metrics,必须请求变量http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics

请求特定实体的指标:

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>请求在相应类型的所有实体之间聚合的指标:

  • /taskmanagers/metrics

  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics请求在相应类型的所有实体的子集上聚合的度量标准:

  • /taskmanagers/metrics?taskmanagers=A,B,C

  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3请求可用指标列表:

GET /jobmanager/metrics

  1. [
  2. {
  3. "id": "metric1"
  4. },
  5. {
  6. "id": "metric2"
  7. }
  8. ]

请求特定(未聚合)指标的值:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2

  1. [
  2. {
  3. "id": "metric1",
  4. "value": "34"
  5. },
  6. {
  7. "id": "metric2",
  8. "value": "2"
  9. }
  10. ]

请求特定指标的汇总值:

GET /taskmanagers/metrics?get=metric1,metric2

  1. [
  2. {
  3. "id": "metric1",
  4. "min": 1,
  5. "max": 34,
  6. "avg": 15,
  7. "sum": 45
  8. },
  9. {
  10. "id": "metric2",
  11. "min": 2,
  12. "max": 14,
  13. "avg": 7,
  14. "sum": 16
  15. }
  16. ]

请求特定指标的特定聚合值:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

  1. [
  2. {
  3. "id": "metric1",
  4. "min": 1,
  5. "max": 34,
  6. },
  7. {
  8. "id": "metric2",
  9. "min": 2,
  10. "max": 14,
  11. }
  12. ]

仪表板集成

为每个任务或算子收集的度量标准也可以在仪表板中显示。在作业的主页面上,选择Metrics选项卡。选择顶部图表中的一个任务后,您可以使用Add Metric下拉菜单选择要显示的指标

  • 任务指标列为<subtask_index>.<metric_name>
  • 算子指标列为<subtask_index>.<operator_name>.<metric_name>每个度量将可视化为单独的图形,x轴表示时间,y轴表示测量值。所有图表每10秒自动更新一次,并在导航到另一页时继续更新。

可视化指标的数量没有限制; 但是只能显示数字指标。