Kubernetes Setup

Getting Started

This Getting Started guide describes how to deploy a Session cluster on Kubernetes.

Introduction

This page describes deploying a standalone Flink cluster on top of Kubernetes, using Flink’s standalone deployment. We generally recommend new users to deploy Flink on Kubernetes using native Kubernetes deployments.

Preparation

This guide expects a Kubernetes environment to be present. You can ensure that your Kubernetes setup is working by running a command like kubectl get nodes, which lists all connected Kubelets.

If you want to run Kubernetes locally, we recommend using MiniKube.

If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to reference themselves through a Kubernetes service.

Starting a Kubernetes Cluster (Session Mode)

A Flink Session cluster is executed as a long-running Kubernetes Deployment. You can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A Flink Session cluster deployment in Kubernetes has at least three components:

  • a Deployment which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Using the file contents provided in the the common resource definitions, create the following files, and create the respective components with the kubectl command:

  1. # Configuration and service definition
  2. $ kubectl create -f flink-configuration-configmap.yaml
  3. $ kubectl create -f jobmanager-service.yaml
  4. # Create the deployments for the cluster
  5. $ kubectl create -f jobmanager-session-deployment.yaml
  6. $ kubectl create -f taskmanager-session-deployment.yaml

Next, we set up a port forward to access the Flink UI and submit jobs:

  1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
  2. Navigate to http://localhost:8081 in your browser.
  3. Moreover, you could use the following command below to submit jobs to the cluster:
  1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

You can tear down the cluster using the following commands:

  1. $ kubectl delete -f jobmanager-service.yaml
  2. $ kubectl delete -f flink-configuration-configmap.yaml
  3. $ kubectl delete -f taskmanager-session-deployment.yaml
  4. $ kubectl delete -f jobmanager-session-deployment.yaml

Back to top

Deployment Modes

Application Mode

For high-level intuition behind the application mode, please refer to the deployment mode overview.

A Flink Application cluster is a dedicated cluster which runs a single application, which needs to be available at deployment time.

A basic Flink Application cluster deployment in Kubernetes has three components:

  • an Application which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Check the Application cluster specific resource definitions and adjust them accordingly:

The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.

After creating the common cluster components, use the Application cluster specific resource definitions to launch the cluster with the kubectl command:

  1. $ kubectl create -f jobmanager-job.yaml
  2. $ kubectl create -f taskmanager-job-deployment.yaml

To terminate the single application cluster, these components can be deleted along with the common ones with the kubectl command:

  1. $ kubectl delete -f taskmanager-job-deployment.yaml
  2. $ kubectl delete -f jobmanager-job.yaml

Session Mode

For high-level intuition behind the session mode, please refer to the deployment mode overview.

Deployment of a Session cluster is explained in the Getting Started guide at the top of this page.

Back to top

Configuration

All configuration options are listed on the configuration page. Configuration options can be added to the flink-conf.yaml section of the flink-configuration-configmap.yaml config map.

You can then access the Flink UI and submit jobs via different ways:

  • kubectl proxy:

    1. Run kubectl proxy in a terminal.
    2. Navigate to http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy in your browser.
  • kubectl port-forward:

    1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
    2. Navigate to http://localhost:8081 in your browser.
    3. Moreover, you can use the following command below to submit jobs to the cluster:
    1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
  • Create a NodePort service on the rest service of jobmanager:

    1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://<public-node-ip>:<node-port> in your browser.
    3. If you use minikube, you can get its public ip by running minikube ip.
    4. Similarly to the port-forward solution, you can also use the following command below to submit jobs to the cluster:
    1. $ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar

Debugging and Log Access

Many common errors are easy to detect by checking Flink’s log files. If you have access to Flink’s web user interface, you can access the JobManager and TaskManager logs from there.

If there are problems starting Flink, you can also use Kubernetes utilities to access the logs. Use kubectl get pods to see all running pods. For the quickstart example from above, you should see three pods:

  1. $ kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
  4. flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
  5. flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s

You can now access the logs by running kubectl logs flink-jobmanager-589967dcfc-m49xv

High-Availability with Standalone Kubernetes

For high availability on Kubernetes, you can use the existing high availability services.

Kubernetes High-Availability Services

Session Mode and Application Mode clusters support using the Kubernetes high availability service. You need to add the following Flink config options to flink-configuration-configmap.yaml.

Note The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to custom Flink image and enable plugins for more information.

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. ...
  10. kubernetes.cluster-id: <cluster-id>
  11. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  12. high-availability.storageDir: hdfs:///flink/recovery
  13. restart-strategy: fixed-delay
  14. restart-strategy.fixed-delay.attempts: 10
  15. ...

Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.

When High-Availability is enabled, Flink will use its own HA-services for service discovery. Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its jobmanager.rpc.address. Refer to the appendix for full configuration.

Standby JobManagers

Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes. If you want to achieve faster recovery, configure the replicas in jobmanager-session-deployment-ha.yaml or parallelism in jobmanager-application-ha.yaml to a value greater than 1 to start standby JobManagers.

Enabling Queryable State

You can access the queryable state of TaskManager if you create a NodePort service for it:

  1. Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service for the taskmanager pod. The example of taskmanager-query-state-service.yaml can be found in appendix.
  2. Run kubectl get svc flink-taskmanager-query-state to get the <node-port> of this service. Then you can create the QueryableStateClient(<public-node-ip>, <node-port> to submit state queries.

Using Standalone Kubernetes with Reactive Mode

Reactive Mode allows to run Flink in a mode, where the Application Cluster is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a Horizontal Pod Autoscaler.

To use Reactive Mode on Kubernetes, follow the same steps as for deploying a job using an Application Cluster. But instead of flink-configuration-configmap.yaml use this config map: flink-reactive-mode-configuration-configmap.yaml. It contains the scheduler-mode: reactive setting for Flink.

Once you have deployed the Application Cluster, you can scale your job up or down by changing the replica count in the flink-taskmanager deployment.

Enabling Local Recovery Across Pod Restarts

In order to speed up recoveries in case of pod failures, you can leverage Flink’s working directory feature together with local recovery. If the working directory is configured to reside on a persistent volume that gets remounted to a restarted TaskManager pod, then Flink is able to recover state locally. With the StatefulSet, Kubernetes gives you the exact tool you need to map a pod to a persistent volume.

Deploying TaskManagers as a StatefulSet, allows you to configure a volume claim template that is used to mount persistent volumes to the TaskManagers. Additionally, you need to configure a deterministic taskmanager.resource-id. A suitable value is the pod name, that you expose using environment variables. For an example StatefulSet configuration take a look at the appendix.

Back to top

Appendix

Common cluster resource definitions

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. log4j-console.properties: |+
  19. # This affects logging for both user code and Flink
  20. rootLogger.level = INFO
  21. rootLogger.appenderRef.console.ref = ConsoleAppender
  22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  23. # Uncomment this if you want to _only_ change Flink's logging
  24. #logger.flink.name = org.apache.flink
  25. #logger.flink.level = INFO
  26. # The following lines keep the log level of common libraries/connectors on
  27. # log level INFO. The root logger does not override this. You have to manually
  28. # change the log levels here.
  29. logger.akka.name = akka
  30. logger.akka.level = INFO
  31. logger.kafka.name= org.apache.kafka
  32. logger.kafka.level = INFO
  33. logger.hadoop.name = org.apache.hadoop
  34. logger.hadoop.level = INFO
  35. logger.zookeeper.name = org.apache.zookeeper
  36. logger.zookeeper.level = INFO
  37. # Log all infos to the console
  38. appender.console.name = ConsoleAppender
  39. appender.console.type = CONSOLE
  40. appender.console.layout.type = PatternLayout
  41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  42. # Log all infos in the given rolling file
  43. appender.rolling.name = RollingFileAppender
  44. appender.rolling.type = RollingFile
  45. appender.rolling.append = false
  46. appender.rolling.fileName = ${sys:log.file}
  47. appender.rolling.filePattern = ${sys:log.file}.%i
  48. appender.rolling.layout.type = PatternLayout
  49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  50. appender.rolling.policies.type = Policies
  51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  52. appender.rolling.policies.size.size=100MB
  53. appender.rolling.strategy.type = DefaultRolloverStrategy
  54. appender.rolling.strategy.max = 10
  55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  57. logger.netty.level = OFF

flink-reactive-mode-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. scheduler-mode: reactive
  19. execution.checkpointing.interval: 10s
  20. log4j-console.properties: |+
  21. # This affects logging for both user code and Flink
  22. rootLogger.level = INFO
  23. rootLogger.appenderRef.console.ref = ConsoleAppender
  24. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  25. # Uncomment this if you want to _only_ change Flink's logging
  26. #logger.flink.name = org.apache.flink
  27. #logger.flink.level = INFO
  28. # The following lines keep the log level of common libraries/connectors on
  29. # log level INFO. The root logger does not override this. You have to manually
  30. # change the log levels here.
  31. logger.akka.name = akka
  32. logger.akka.level = INFO
  33. logger.kafka.name= org.apache.kafka
  34. logger.kafka.level = INFO
  35. logger.hadoop.name = org.apache.hadoop
  36. logger.hadoop.level = INFO
  37. logger.zookeeper.name = org.apache.zookeeper
  38. logger.zookeeper.level = INFO
  39. # Log all infos to the console
  40. appender.console.name = ConsoleAppender
  41. appender.console.type = CONSOLE
  42. appender.console.layout.type = PatternLayout
  43. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  44. # Log all infos in the given rolling file
  45. appender.rolling.name = RollingFileAppender
  46. appender.rolling.type = RollingFile
  47. appender.rolling.append = false
  48. appender.rolling.fileName = ${sys:log.file}
  49. appender.rolling.filePattern = ${sys:log.file}.%i
  50. appender.rolling.layout.type = PatternLayout
  51. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  52. appender.rolling.policies.type = Policies
  53. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  54. appender.rolling.policies.size.size=100MB
  55. appender.rolling.strategy.type = DefaultRolloverStrategy
  56. appender.rolling.strategy.max = 10
  57. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  58. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  59. logger.netty.level = OFF

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager

taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-taskmanager-query-state
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: query-state
  9. port: 6125
  10. targetPort: 6125
  11. nodePort: 30025
  12. selector:
  13. app: flink
  14. component: taskmanager

Session cluster resource definitions

jobmanager-session-deployment-non-ha.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: apache/flink:1.15.0-scala_2.12
  20. args: ["jobmanager"]
  21. ports:
  22. - containerPort: 6123
  23. name: rpc
  24. - containerPort: 6124
  25. name: blob-server
  26. - containerPort: 8081
  27. name: webui
  28. livenessProbe:
  29. tcpSocket:
  30. port: 6123
  31. initialDelaySeconds: 30
  32. periodSeconds: 60
  33. volumeMounts:
  34. - name: flink-config-volume
  35. mountPath: /opt/flink/conf
  36. securityContext:
  37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  38. volumes:
  39. - name: flink-config-volume
  40. configMap:
  41. name: flink-config
  42. items:
  43. - key: flink-conf.yaml
  44. path: flink-conf.yaml
  45. - key: log4j-console.properties
  46. path: log4j-console.properties

jobmanager-session-deployment-ha.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1 # Set the value to greater than 1 to start standby JobManagers
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: apache/flink:1.15.0-scala_2.12
  20. env:
  21. - name: POD_IP
  22. valueFrom:
  23. fieldRef:
  24. apiVersion: v1
  25. fieldPath: status.podIP
  26. # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
  27. args: ["jobmanager", "$(POD_IP)"]
  28. ports:
  29. - containerPort: 6123
  30. name: rpc
  31. - containerPort: 6124
  32. name: blob-server
  33. - containerPort: 8081
  34. name: webui
  35. livenessProbe:
  36. tcpSocket:
  37. port: 6123
  38. initialDelaySeconds: 30
  39. periodSeconds: 60
  40. volumeMounts:
  41. - name: flink-config-volume
  42. mountPath: /opt/flink/conf
  43. securityContext:
  44. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  45. serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
  46. volumes:
  47. - name: flink-config-volume
  48. configMap:
  49. name: flink-config
  50. items:
  51. - key: flink-conf.yaml
  52. path: flink-conf.yaml
  53. - key: log4j-console.properties
  54. path: log4j-console.properties

taskmanager-session-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: apache/flink:1.15.0-scala_2.12
  20. args: ["taskmanager"]
  21. ports:
  22. - containerPort: 6122
  23. name: rpc
  24. - containerPort: 6125
  25. name: query-state
  26. livenessProbe:
  27. tcpSocket:
  28. port: 6122
  29. initialDelaySeconds: 30
  30. periodSeconds: 60
  31. volumeMounts:
  32. - name: flink-config-volume
  33. mountPath: /opt/flink/conf/
  34. securityContext:
  35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  36. volumes:
  37. - name: flink-config-volume
  38. configMap:
  39. name: flink-config
  40. items:
  41. - key: flink-conf.yaml
  42. path: flink-conf.yaml
  43. - key: log4j-console.properties
  44. path: log4j-console.properties

Application cluster resource definitions

jobmanager-application-non-ha.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. template:
  7. metadata:
  8. labels:
  9. app: flink
  10. component: jobmanager
  11. spec:
  12. restartPolicy: OnFailure
  13. containers:
  14. - name: jobmanager
  15. image: apache/flink:1.15.0-scala_2.12
  16. env:
  17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  18. ports:
  19. - containerPort: 6123
  20. name: rpc
  21. - containerPort: 6124
  22. name: blob-server
  23. - containerPort: 8081
  24. name: webui
  25. livenessProbe:
  26. tcpSocket:
  27. port: 6123
  28. initialDelaySeconds: 30
  29. periodSeconds: 60
  30. volumeMounts:
  31. - name: flink-config-volume
  32. mountPath: /opt/flink/conf
  33. - name: job-artifacts-volume
  34. mountPath: /opt/flink/usrlib
  35. securityContext:
  36. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  37. volumes:
  38. - name: flink-config-volume
  39. configMap:
  40. name: flink-config
  41. items:
  42. - key: flink-conf.yaml
  43. path: flink-conf.yaml
  44. - key: log4j-console.properties
  45. path: log4j-console.properties
  46. - name: job-artifacts-volume
  47. hostPath:
  48. path: /host/path/to/job/artifacts

jobmanager-application-ha.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
  7. template:
  8. metadata:
  9. labels:
  10. app: flink
  11. component: jobmanager
  12. spec:
  13. restartPolicy: OnFailure
  14. containers:
  15. - name: jobmanager
  16. image: apache/flink:1.15.0-scala_2.12
  17. env:
  18. - name: POD_IP
  19. valueFrom:
  20. fieldRef:
  21. apiVersion: v1
  22. fieldPath: status.podIP
  23. # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
  24. args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  25. ports:
  26. - containerPort: 6123
  27. name: rpc
  28. - containerPort: 6124
  29. name: blob-server
  30. - containerPort: 8081
  31. name: webui
  32. livenessProbe:
  33. tcpSocket:
  34. port: 6123
  35. initialDelaySeconds: 30
  36. periodSeconds: 60
  37. volumeMounts:
  38. - name: flink-config-volume
  39. mountPath: /opt/flink/conf
  40. - name: job-artifacts-volume
  41. mountPath: /opt/flink/usrlib
  42. securityContext:
  43. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  44. serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
  45. volumes:
  46. - name: flink-config-volume
  47. configMap:
  48. name: flink-config
  49. items:
  50. - key: flink-conf.yaml
  51. path: flink-conf.yaml
  52. - key: log4j-console.properties
  53. path: log4j-console.properties
  54. - name: job-artifacts-volume
  55. hostPath:
  56. path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: apache/flink:1.15.0-scala_2.12
  20. env:
  21. args: ["taskmanager"]
  22. ports:
  23. - containerPort: 6122
  24. name: rpc
  25. - containerPort: 6125
  26. name: query-state
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/flink/conf/
  35. - name: job-artifacts-volume
  36. mountPath: /opt/flink/usrlib
  37. securityContext:
  38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j-console.properties
  47. path: log4j-console.properties
  48. - name: job-artifacts-volume
  49. hostPath:
  50. path: /host/path/to/job/artifacts

Local Recovery Enabled TaskManager StatefulSet

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. state.backend.local-recovery: true
  15. process.taskmanager.working-dir: /pv
  16. ---
  17. apiVersion: v1
  18. kind: Service
  19. metadata:
  20. name: taskmanager-hl
  21. spec:
  22. clusterIP: None
  23. selector:
  24. app: flink
  25. component: taskmanager
  26. ---
  27. apiVersion: apps/v1
  28. kind: StatefulSet
  29. metadata:
  30. name: flink-taskmanager
  31. spec:
  32. serviceName: taskmanager-hl
  33. replicas: 2
  34. selector:
  35. matchLabels:
  36. app: flink
  37. component: taskmanager
  38. template:
  39. metadata:
  40. labels:
  41. app: flink
  42. component: taskmanager
  43. spec:
  44. securityContext:
  45. runAsUser: 9999
  46. fsGroup: 9999
  47. containers:
  48. - name: taskmanager
  49. image: apache/flink:1.15.0-scala_2.12
  50. env:
  51. - name: POD_NAME
  52. valueFrom:
  53. fieldRef:
  54. fieldPath: metadata.name
  55. args: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]
  56. ports:
  57. - containerPort: 6122
  58. name: rpc
  59. - containerPort: 6125
  60. name: query-state
  61. - containerPort: 6121
  62. name: metrics
  63. livenessProbe:
  64. tcpSocket:
  65. port: 6122
  66. initialDelaySeconds: 30
  67. periodSeconds: 60
  68. volumeMounts:
  69. - name: flink-config-volume
  70. mountPath: /opt/flink/conf/
  71. - name: pv
  72. mountPath: /pv
  73. volumes:
  74. - name: flink-config-volume
  75. configMap:
  76. name: flink-config
  77. items:
  78. - key: flink-conf.yaml
  79. path: flink-conf.yaml
  80. - key: log4j-console.properties
  81. path: log4j-console.properties
  82. volumeClaimTemplates:
  83. - metadata:
  84. name: pv
  85. spec:
  86. accessModes: [ "ReadWriteOnce" ]
  87. resources:
  88. requests:
  89. storage: 50Gi

Back to top