Kubernetes Setup
Getting Started
This Getting Started guide describes how to deploy a Session cluster on Kubernetes.
Introduction
This page describes deploying a standalone Flink cluster on top of Kubernetes, using Flink’s standalone deployment. We generally recommend new users to deploy Flink on Kubernetes using native Kubernetes deployments.
Apache Flink also provides a Kubernetes operator for managing Flink clusters on Kubernetes. It supports both standalone and native deployment mode and greatly simplifies deployment, configuration and the life cycle management of Flink resources on Kubernetes.
For more information, please refer to the Flink Kubernetes Operator documentation
Preparation
This guide expects a Kubernetes environment to be present. You can ensure that your Kubernetes setup is working by running a command like kubectl get nodes, which lists all connected Kubelets.
If you want to run Kubernetes locally, we recommend using MiniKube.
If using MiniKube please make sure to execute
minikube ssh 'sudo ip link set docker0 promisc on'before deploying a Flink cluster. Otherwise Flink components are not able to reference themselves through a Kubernetes service.
Starting a Kubernetes Cluster (Session Mode)
A Flink Session cluster is executed as a long-running Kubernetes Deployment. You can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
A Flink Session cluster deployment in Kubernetes has at least three components:
- a Deployment which runs a JobManager
- a Deployment for a pool of TaskManagers
- a Service exposing the JobManager’s REST and UI ports
Using the file contents provided in the the common resource definitions, create the following files, and create the respective components with the kubectl command:
# Configuration and service definition$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml# Create the deployments for the cluster$ kubectl create -f jobmanager-session-deployment-non-ha.yaml$ kubectl create -f taskmanager-session-deployment.yaml
Next, we set up a port forward to access the Flink UI and submit jobs:
- Run
kubectl port-forward ${flink-jobmanager-pod} 8081:8081to forward your jobmanager’s web ui port to local 8081. - Navigate to http://localhost:8081 in your browser.
- Moreover, you could use the following command below to submit jobs to the cluster:
$ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
You can tear down the cluster using the following commands:
$ kubectl delete -f jobmanager-service.yaml$ kubectl delete -f flink-configuration-configmap.yaml$ kubectl delete -f taskmanager-session-deployment.yaml$ kubectl delete -f jobmanager-session-deployment-non-ha.yaml
Deployment Modes
Application Mode
For high-level intuition behind the application mode, please refer to the deployment mode overview.
A Flink Application cluster is a dedicated cluster which runs a single application, which needs to be available at deployment time.
A basic Flink Application cluster deployment in Kubernetes has three components:
- an Application which runs a JobManager
- a Deployment for a pool of TaskManagers
- a Service exposing the JobManager’s REST and UI ports
Check the Application cluster specific resource definitions and adjust them accordingly:
The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.
The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.
After creating the common cluster components, use the Application cluster specific resource definitions to launch the cluster with the kubectl command:
$ kubectl create -f jobmanager-job.yaml$ kubectl create -f taskmanager-job-deployment.yaml
To terminate the single application cluster, these components can be deleted along with the common ones with the kubectl command:
$ kubectl delete -f taskmanager-job-deployment.yaml$ kubectl delete -f jobmanager-job.yaml
Session Mode
For high-level intuition behind the session mode, please refer to the deployment mode overview.
Deployment of a Session cluster is explained in the Getting Started guide at the top of this page.
Flink on Standalone Kubernetes Reference
Configuration
All configuration options are listed on the configuration page. Configuration options can be added to the flink-conf.yaml section of the flink-configuration-configmap.yaml config map.
Accessing Flink in Kubernetes
You can then access the Flink UI and submit jobs via different ways:
kubectl proxy:- Run
kubectl proxyin a terminal. - Navigate to http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy in your browser.
- Run
kubectl port-forward:- Run
kubectl port-forward ${flink-jobmanager-pod} 8081:8081to forward your jobmanager’s web ui port to local 8081. - Navigate to http://localhost:8081 in your browser.
- Moreover, you can use the following command below to submit jobs to the cluster:
$ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
- Run
Create a
NodePortservice on the rest service of jobmanager:- Run
kubectl create -f jobmanager-rest-service.yamlto create theNodePortservice on jobmanager. The example ofjobmanager-rest-service.yamlcan be found in appendix. - Run
kubectl get svc flink-jobmanager-restto know thenode-portof this service and navigate to http://<public-node-ip>:<node-port> in your browser. - If you use minikube, you can get its public ip by running
minikube ip. - Similarly to the
port-forwardsolution, you can also use the following command below to submit jobs to the cluster:
$ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar
- Run
Debugging and Log Access
Many common errors are easy to detect by checking Flink’s log files. If you have access to Flink’s web user interface, you can access the JobManager and TaskManager logs from there.
If there are problems starting Flink, you can also use Kubernetes utilities to access the logs. Use kubectl get pods to see all running pods. For the quickstart example from above, you should see three pods:
$ kubectl get podsNAME READY STATUS RESTARTS AGEflink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32sflink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28sflink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s
You can now access the logs by running kubectl logs flink-jobmanager-589967dcfc-m49xv
High-Availability with Standalone Kubernetes
For high availability on Kubernetes, you can use the existing high availability services.
Kubernetes High-Availability Services
Session Mode and Application Mode clusters support using the Kubernetes high availability service. You need to add the following Flink config options to flink-configuration-configmap.yaml.
Note The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to custom Flink image and enable plugins for more information.
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+...kubernetes.cluster-id: <cluster-id>high-availability.type: kuberneteshigh-availability.storageDir: hdfs:///flink/recoveryrestart-strategy.type: fixed-delayrestart-strategy.fixed-delay.attempts: 10...
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.
When High-Availability is enabled, Flink will use its own HA-services for service discovery. Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its jobmanager.rpc.address. Refer to the appendix for full configuration.
Standby JobManagers
Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes. If you want to achieve faster recovery, configure the replicas in jobmanager-session-deployment-ha.yaml or parallelism in jobmanager-application-ha.yaml to a value greater than 1 to start standby JobManagers.
Using Standalone Kubernetes with Reactive Mode
Reactive Mode allows to run Flink in a mode, where the Application Cluster is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a Horizontal Pod Autoscaler.
To use Reactive Mode on Kubernetes, follow the same steps as for deploying a job using an Application Cluster. But instead of flink-configuration-configmap.yaml use this config map: flink-reactive-mode-configuration-configmap.yaml. It contains the scheduler-mode: reactive setting for Flink.
Once you have deployed the Application Cluster, you can scale your job up or down by changing the replica count in the flink-taskmanager deployment.
Enabling Local Recovery Across Pod Restarts
In order to speed up recoveries in case of pod failures, you can leverage Flink’s working directory feature together with local recovery. If the working directory is configured to reside on a persistent volume that gets remounted to a restarted TaskManager pod, then Flink is able to recover state locally. With the StatefulSet, Kubernetes gives you the exact tool you need to map a pod to a persistent volume.
Deploying TaskManagers as a StatefulSet, allows you to configure a volume claim template that is used to mount persistent volumes to the TaskManagers. Additionally, you need to configure a deterministic taskmanager.resource-id. A suitable value is the pod name, that you expose using environment variables. For an example StatefulSet configuration take a look at the appendix.
Appendix
Common cluster resource definitions
flink-configuration-configmap.yaml
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.pekko.name = org.apache.pekkologger.pekko.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
flink-reactive-mode-configuration-configmap.yaml
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2scheduler-mode: reactiveexecution.checkpointing.interval: 10slog4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.pekko.name = org.apache.pekkologger.pekko.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.
apiVersion: v1kind: Servicemetadata:name: flink-jobmanagerspec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.
apiVersion: v1kind: Servicemetadata:name: flink-jobmanager-restspec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
Session cluster resource definitions
jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagerspec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.18.1-scala_2.12args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
jobmanager-session-deployment-ha.yaml
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagerspec:replicas: 1 # Set the value to greater than 1 to start standby JobManagersselector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.18.1-scala_2.12env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.args: ["jobmanager", "$(POD_IP)"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryserviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagerspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:1.18.1-scala_2.12args: ["taskmanager"]ports:- containerPort: 6122name: rpclivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
Application cluster resource definitions
jobmanager-application-non-ha.yaml
apiVersion: batch/v1kind: Jobmetadata:name: flink-jobmanagerspec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: apache/flink:1.18.1-scala_2.12env:args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf- name: job-artifacts-volumemountPath: /opt/flink/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /host/path/to/job/artifacts
jobmanager-application-ha.yaml
apiVersion: batch/v1kind: Jobmetadata:name: flink-jobmanagerspec:parallelism: 1 # Set the value to greater than 1 to start standby JobManagerstemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: apache/flink:1.18.1-scala_2.12env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf- name: job-artifacts-volumemountPath: /opt/flink/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryserviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagerspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:1.18.1-scala_2.12env:args: ["taskmanager"]ports:- containerPort: 6122name: rpclivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/- name: job-artifacts-volumemountPath: /opt/flink/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /host/path/to/job/artifacts
Local Recovery Enabled TaskManager StatefulSet
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122state.backend.local-recovery: trueprocess.taskmanager.working-dir: /pv---apiVersion: v1kind: Servicemetadata:name: taskmanager-hlspec:clusterIP: Noneselector:app: flinkcomponent: taskmanager---apiVersion: apps/v1kind: StatefulSetmetadata:name: flink-taskmanagerspec:serviceName: taskmanager-hlreplicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:securityContext:runAsUser: 9999fsGroup: 9999containers:- name: taskmanagerimage: apache/flink:1.18.1-scala_2.12env:- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.nameargs: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]ports:- containerPort: 6122name: rpc- containerPort: 6121name: metricslivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/- name: pvmountPath: /pvvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.propertiesvolumeClaimTemplates:- metadata:name: pvspec:accessModes: [ "ReadWriteOnce" ]resources:requests:storage: 50Gi
