Kubernetes Setup

This page describes how to deploy a Flink Job and Session cluster on Kubernetes.

Info This page describes deploying a standalone Flink cluster on top of Kubernetes. You can find more information on native Kubernetes deployments here.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

Flink Docker image

Before deploying the Flink Kubernetes components, please read the Flink Docker image documentation, its tags, how to customize the Flink Docker image and enable plugins to use the image in the Kubernetes definition files.

Deploy Flink cluster on Kubernetes

Using the common resource definitions, launch the common cluster components with the kubectl command:

  1. kubectl create -f flink-configuration-configmap.yaml
  2. kubectl create -f jobmanager-service.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

Then launch the specific components depending on whether you want to deploy a Session or Job cluster.

You can then access the Flink UI via different ways:

  1. ./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
  • Create a NodePort service on the rest service of jobmanager:
    1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://: in your browser.
    3. If you use minikube, you can get its public ip by running minikube ip.
    4. Similarly to the port-forward solution, you could also use the following command below to submit jobs to the cluster:
  1. ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

You can also access the queryable state of TaskManager if you create a NodePort service for it:

  1. Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service on taskmanager. The example of taskmanager-query-state-service.yaml can be found in appendix.
  2. Run kubectl get svc flink-taskmanager-query-state to know the node-port of this service. Then you can create the QueryableStateClient(, to submit the state queries.

In order to terminate the Flink cluster, delete the specific Session or Job cluster components and use kubectl to terminate the common components:

  1. kubectl delete -f jobmanager-service.yaml
  2. kubectl delete -f flink-configuration-configmap.yaml
  3. # if created then also the rest service
  4. kubectl delete -f jobmanager-rest-service.yaml
  5. # if created then also the queryable state service
  6. kubectl delete -f taskmanager-query-state-service.yaml

Deploy Session Cluster

A Flink Session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A Flink Session cluster deployment in Kubernetes has at least three components:

  • a Deployment which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

After creating the common cluster components, use the Session specific resource definitions to launch the Session cluster with the kubectl command:

  1. kubectl create -f jobmanager-session-deployment.yaml
  2. kubectl create -f taskmanager-session-deployment.yaml

To terminate the Session cluster, these components can be deleted along with the common ones with the kubectl command:

  1. kubectl delete -f taskmanager-session-deployment.yaml
  2. kubectl delete -f jobmanager-session-deployment.yaml

Deploy Job Cluster

A Flink Job cluster is a dedicated cluster which runs a single job. You can find more details here.

A basic Flink Job cluster deployment in Kubernetes has three components:

  • a Job which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Check the Job cluster specific resource definitions and adjust them accordingly.

The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.

After creating the common cluster components, use the Job cluster specific resource definitions to launch the cluster with the kubectl command:

  1. kubectl create -f jobmanager-job.yaml
  2. kubectl create -f taskmanager-job-deployment.yaml

To terminate the single job cluster, these components can be deleted along with the common ones with the kubectl command:

  1. kubectl delete -f taskmanager-job-deployment.yaml
  2. kubectl delete -f jobmanager-job.yaml

Appendix

Common cluster resource definitions

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. log4j-console.properties: |+
  19. # This affects logging for both user code and Flink
  20. rootLogger.level = INFO
  21. rootLogger.appenderRef.console.ref = ConsoleAppender
  22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  23. # Uncomment this if you want to _only_ change Flink's logging
  24. #logger.flink.name = org.apache.flink
  25. #logger.flink.level = INFO
  26. # The following lines keep the log level of common libraries/connectors on
  27. # log level INFO. The root logger does not override this. You have to manually
  28. # change the log levels here.
  29. logger.akka.name = akka
  30. logger.akka.level = INFO
  31. logger.kafka.name= org.apache.kafka
  32. logger.kafka.level = INFO
  33. logger.hadoop.name = org.apache.hadoop
  34. logger.hadoop.level = INFO
  35. logger.zookeeper.name = org.apache.zookeeper
  36. logger.zookeeper.level = INFO
  37. # Log all infos to the console
  38. appender.console.name = ConsoleAppender
  39. appender.console.type = CONSOLE
  40. appender.console.layout.type = PatternLayout
  41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  42. # Log all infos in the given rolling file
  43. appender.rolling.name = RollingFileAppender
  44. appender.rolling.type = RollingFile
  45. appender.rolling.append = false
  46. appender.rolling.fileName = ${sys:log.file}
  47. appender.rolling.filePattern = ${sys:log.file}.%i
  48. appender.rolling.layout.type = PatternLayout
  49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  50. appender.rolling.policies.type = Policies
  51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  52. appender.rolling.policies.size.size=100MB
  53. appender.rolling.strategy.type = DefaultRolloverStrategy
  54. appender.rolling.strategy.max = 10
  55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  56. logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
  57. logger.netty.level = OFF

jobmanager-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager

taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-taskmanager-query-state
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: query-state
  9. port: 6125
  10. targetPort: 6125
  11. nodePort: 30025
  12. selector:
  13. app: flink
  14. component: taskmanager

Session cluster resource definitions

jobmanager-session-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: flink:1.11.0-scala_2.11
  20. args: ["jobmanager"]
  21. ports:
  22. - containerPort: 6123
  23. name: rpc
  24. - containerPort: 6124
  25. name: blob-server
  26. - containerPort: 8081
  27. name: webui
  28. livenessProbe:
  29. tcpSocket:
  30. port: 6123
  31. initialDelaySeconds: 30
  32. periodSeconds: 60
  33. volumeMounts:
  34. - name: flink-config-volume
  35. mountPath: /opt/flink/conf
  36. securityContext:
  37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  38. volumes:
  39. - name: flink-config-volume
  40. configMap:
  41. name: flink-config
  42. items:
  43. - key: flink-conf.yaml
  44. path: flink-conf.yaml
  45. - key: log4j-console.properties
  46. path: log4j-console.properties

taskmanager-session-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: flink:1.11.0-scala_2.11
  20. args: ["taskmanager"]
  21. ports:
  22. - containerPort: 6122
  23. name: rpc
  24. - containerPort: 6125
  25. name: query-state
  26. livenessProbe:
  27. tcpSocket:
  28. port: 6122
  29. initialDelaySeconds: 30
  30. periodSeconds: 60
  31. volumeMounts:
  32. - name: flink-config-volume
  33. mountPath: /opt/flink/conf/
  34. securityContext:
  35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  36. volumes:
  37. - name: flink-config-volume
  38. configMap:
  39. name: flink-config
  40. items:
  41. - key: flink-conf.yaml
  42. path: flink-conf.yaml
  43. - key: log4j-console.properties
  44. path: log4j-console.properties

Job cluster resource definitions

jobmanager-job.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. template:
  7. metadata:
  8. labels:
  9. app: flink
  10. component: jobmanager
  11. spec:
  12. restartPolicy: OnFailure
  13. containers:
  14. - name: jobmanager
  15. image: flink:1.11.0-scala_2.11
  16. env:
  17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  18. ports:
  19. - containerPort: 6123
  20. name: rpc
  21. - containerPort: 6124
  22. name: blob-server
  23. - containerPort: 8081
  24. name: webui
  25. livenessProbe:
  26. tcpSocket:
  27. port: 6123
  28. initialDelaySeconds: 30
  29. periodSeconds: 60
  30. volumeMounts:
  31. - name: flink-config-volume
  32. mountPath: /opt/flink/conf
  33. - name: job-artifacts-volume
  34. mountPath: /opt/flink/usrlib
  35. securityContext:
  36. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  37. volumes:
  38. - name: flink-config-volume
  39. configMap:
  40. name: flink-config
  41. items:
  42. - key: flink-conf.yaml
  43. path: flink-conf.yaml
  44. - key: log4j-console.properties
  45. path: log4j-console.properties
  46. - name: job-artifacts-volume
  47. hostPath:
  48. path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: flink:1.11.0-scala_2.11
  20. env:
  21. args: ["taskmanager"]
  22. ports:
  23. - containerPort: 6122
  24. name: rpc
  25. - containerPort: 6125
  26. name: query-state
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/flink/conf/
  35. - name: job-artifacts-volume
  36. mountPath: /opt/flink/usrlib
  37. securityContext:
  38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j-console.properties
  47. path: log4j-console.properties
  48. - name: job-artifacts-volume
  49. hostPath:
  50. path: /host/path/to/job/artifacts

Back to top