Metrics and Logging

Metrics

The Flink Kubernetes Operator (Operator) extends the Flink Metric System that allows gathering and exposing metrics to centralized monitoring solutions.

Different operator metrics can be turned on/off individually using the configuration. For details check the metrics config reference.

The Operator gathers aggregates metrics about managed resources.

ScopeMetricsDescriptionType
NamespaceFlinkDeployment/FlinkSessionJob.CountNumber of managed resources per namespaceGauge
NamespaceFlinkDeployment.ResourceUsage.Cpu/MemoryTotal resources used per namespaceGauge
NamespaceFlinkDeployment.JmDeploymentStatus.<Status>.CountNumber of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERRORGauge
NamespaceFlinkDeployment.FlinkVersion.<FlinkVersion>.CountNumber of managed FlinkDeployment resources per <FlinkVersion> per namespace. <FlinkVersion> is retrieved via REST API from Flink JM.Gauge
NamespaceFlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.CountNumber of managed resources currently in state <State> per namespace. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILEDGauge
System/NamespaceFlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSecondsTime spent in state <State$gt for a given resource. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILEDHistogram
System/NamespaceFlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSecondsTime statistics for selected lifecycle state transitions. <Transition> can take values from: Resume, Upgrade, Suspend, Stabilization, Rollback, SubmissionHistogram

Lifecycle metrics

Based on the resource status the operator monitors resource lifecycle states.

The number of resources and time spend in each of these states at any given time is tracked by the Lifecycle.<STATE>.Count and Lifecycle.<STATE>.TimeSeconds metrics.

In addition to the simple counts we further track a few selected state transitions:

  • Upgrade : End-to-end resource upgrade time from stable to stable
  • Resume : Time from suspended to stable
  • Suspend : Time for any suspend operation
  • Stabilization : Time from deployed to stable state
  • Rollback : Time from deployed to rolled_back state if the resource was rolled back
  • Submission: Flink resource submission time

Kubernetes Client Metrics

The Operator gathers various metrics related to Kubernetes API server access.

ScopeMetricsDescriptionType
SystemKubeClient.HttpRequest.CountNumber of HTTP request sent to the Kubernetes API ServerCounter
SystemKubeClient.HttpRequest.<RequestMethod>.CountNumber of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc.Counter
SystemKubeClient.HttpRequest.Failed.CountNumber of failed HTTP requests that has no response from the Kubernetes API ServerCounter
SystemKubeClient.HttpResponse.CountNumber of HTTP responses received from the Kubernetes API ServerCounter
SystemKubeClient.HttpResponse.<ResponseCode>.CountNumber of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc.Counter
SystemKubeClient.HttpResponse.<ResponseCode>.NumPerSecondNumber of HTTP responses received from the Kubernetes API Server per response code per second. <ResponseCode> can take values from: 200, 404, 503, etc.Meter
SystemKubeClient.HttpRequest.NumPerSecondNumber of HTTP requests sent to the Kubernetes API Server per secondMeter
SystemKubeClient.HttpRequest.Failed.NumPerSecondNumber of failed HTTP requests sent to the Kubernetes API Server per secondMeter
SystemKubeClient.HttpResponse.NumPerSecondNumber of HTTP responses received from the Kubernetes API Server per secondMeter
SystemKubeClient.HttpResponse.TimeNanosLatency statistics obtained from the HTTP responses received from the Kubernetes API ServerHistogram

Kubernetes client metrics by Http Response Code

It’s possible to publish additional metrics by Http response code received from API server by setting kubernetes.client.metrics.http.response.code.groups.enabled to true .

ScopeMetricsDescriptionType
SystemKubeClient.HttpResponse.1xx.CountNumber of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code.Counter
SystemKubeClient.HttpResponse.2xx.CountNumber of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code.Counter
SystemKubeClient.HttpResponse.3xx.CountNumber of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code.Counter
SystemKubeClient.HttpResponse.4xx.CountNumber of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code.Counter
SystemKubeClient.HttpResponse.5xx.CountNumber of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code.Counter
SystemKubeClient.HttpResponse.1xx.NumPerSecondNumber of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code per second.Meter
SystemKubeClient.HttpResponse.2xx.NumPerSecondNumber of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code per second.Meter
SystemKubeClient.HttpResponse.3xx.NumPerSecondNumber of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code per second.Meter
SystemKubeClient.HttpResponse.4xx.NumPerSecondNumber of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code per second.Meter
SystemKubeClient.HttpResponse.5xx.NumPerSecondNumber of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code per second.Meter

JVM Metrics

The Operator gathers metrics about the JVM process and exposes it similarly to core Flink System metrics. The list of metrics are not repeated in this document.

JOSDK Metrics

The Flink operator also forwards metrics created by the Java Operator SDK framework itself under the JOSDK metric name prefix. Some of these metrics are on system, namespace and resource level.

Metric Reporters

The well known Metric Reporters are shipped in the operator image and are ready to use.

In order to specify metrics configuration for the operator, prefix them with kubernetes.operator.. This logic ensures that we can separate Flink job and operator metrics configuration.

Let’s look at a few examples.

Slf4j

The default metrics reporter in the operator is Slf4j. It does not require any external monitoring systems, and it is enabled in the values.yaml file by default, mainly for demonstrating purposes.

  1. defaultConfiguration:
  2. create: true
  3. append: true
  4. flink-conf.yaml: |+
  5. kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
  6. kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE

To use a more robust production grade monitoring solution the configuration needs to be changed.

How to Enable Prometheus (Example)

The following example shows how to enable the Prometheus metric reporter:

  1. defaultConfiguration:
  2. create: true
  3. append: true
  4. flink-conf.yaml: |+
  5. kubernetes.operator.metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
  6. kubernetes.operator.metrics.reporter.prom.port: 9999

Some metric reporters, including the Prometheus, need a port to be exposed on the container. This can be achieved be defining a value for the otherwise empty metrics.port variable. Either in the values.yaml file:

  1. metrics:
  2. port: 9999

or using the option --set metrics.port=9999 in the command line.

Set up Prometheus locally

The Prometheus Operator among other options provides an elegant, declarative way to specify how group of pods should be monitored using custom resources.

To install the Prometheus operator via Helm run:

  1. helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
  2. helm install prometheus prometheus-community/kube-prometheus-stack

The Grafana dashboard can be accessed through port-forwarding:

  1. kubectl port-forward deployment/prometheus-grafana 3000

The credentials for the dashboard can be retrieved by:

  1. kubectl get secret prometheus-grafana -o jsonpath="{.data.admin-user}" | base64 --decode ; echo
  2. kubectl get secret prometheus-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

To enable the operator metrics in Prometheus create a pod-monitor.yaml file with the following content:

  1. apiVersion: monitoring.coreos.com/v1
  2. kind: PodMonitor
  3. metadata:
  4. name: flink-kubernetes-operator
  5. labels:
  6. release: prometheus
  7. spec:
  8. selector:
  9. matchLabels:
  10. app.kubernetes.io/name: flink-kubernetes-operator
  11. podMetricsEndpoints:
  12. - port: metrics

and apply it on your Kubernetes environment:

  1. kubectl create -f pod-monitor.yaml

Once the custom resource is created in the Kubernetes environment the operator metrics are ready to explore http://localhost:3000/explore.

Logging

The Operator controls the logging behaviour for Flink applications and the Operator itself using configuration files mounted externally via ConfigMaps. Configuration files with default values are shipped in the Helm chart. It is recommended to review and adjust them if needed in the values.yaml file before deploying the Operator in production environments.

To append/override the default log configuration properties for the operator and Flink deployments define the log4j-operator.properties and log4j-console.properties keys respectively:

  1. defaultConfiguration:
  2. create: true
  3. append: true
  4. log4j-operator.properties: |+
  5. # Flink Operator Logging Overrides
  6. # rootLogger.level = DEBUG
  7. log4j-console.properties: |+
  8. # Flink Deployment Logging Overrides
  9. # rootLogger.level = DEBUG

Logging in the operator is intentionally succinct and does not include contextual information such as namespace or name of the FlinkDeployment objects. We rely on the MDC provided by the operator-sdk to access this information and use it directly in the log layout.

See the Java Operator SDK docs for more detail.

To learn more about accessing the job logs or changing the log level dynamically check the corresponding section of the core documentation.

FlinkDeployment Logging Configuration

Users have the freedom to override the default log4j-console.properties settings on a per-deployment level by putting the entire log configuration into spec.logConfiguration:

  1. spec:
  2. ...
  3. logConfiguration:
  4. log4j-console.properties: |+
  5. rootLogger.level = DEBUG
  6. rootLogger.appenderRef.file.ref = LogFile
  7. ...

FlinkDeployment Prometheus Configuration

The following example shows how to enable the Prometheus metric reporter for the FlinkDeployment:

  1. spec:
  2. ...
  3. flinkConfiguration:
  4. metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
  5. metrics.reporter.prom.port: 9249-9250