Kubernetes Setup

This page describes how to deploy a Flink job and session cluster on Kubernetes.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster.If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster.Each job needs to be submitted to the cluster after the cluster has been deployed.

A basic Flink session cluster deployment in Kubernetes has three components:

  • a Deployment/Job which runs the JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Using the resource definitions for a session cluster, launch the cluster with the kubectl command:

  1. kubectl create -f flink-configuration-configmap.yaml
  2. kubectl create -f jobmanager-service.yaml
  3. kubectl create -f jobmanager-deployment.yaml
  4. kubectl create -f taskmanager-deployment.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

You can then access the Flink UI via different ways:

  1. ./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
  • Create a NodePort service on the rest service of jobmanager:
    • Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    • Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://: in your browser.
    • Similarly to port-forward solution, you could also use the following command below to submit jobs to the cluster:
  1. ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

In order to terminate the Flink session cluster, use kubectl:

  1. kubectl delete -f jobmanager-deployment.yaml
  2. kubectl delete -f taskmanager-deployment.yaml
  3. kubectl delete -f jobmanager-service.yaml
  4. kubectl delete -f flink-configuration-configmap.yaml

A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.

Creating the job-specific image

The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started.Therefore, one needs to build a dedicated container image for every job.Please follow these instructions to build the Docker image.

In order to deploy the a job cluster on Kubernetes please follow these instructions.

Advanced Cluster Deployment

An early version of a Flink Helm chart is available on GitHub.

Appendix

Session cluster resource definitions

The Deployment definitions use the pre-built image flink:latest which can be found on Docker Hub.The image is built from this Github repository.

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 1
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. jobmanager.heap.size: 1024m
  15. taskmanager.heap.size: 1024m
  16. log4j.properties: |+
  17. log4j.rootLogger=INFO, file
  18. log4j.logger.akka=INFO
  19. log4j.logger.org.apache.kafka=INFO
  20. log4j.logger.org.apache.hadoop=INFO
  21. log4j.logger.org.apache.zookeeper=INFO
  22. log4j.appender.file=org.apache.log4j.FileAppender
  23. log4j.appender.file.file=${log.file}
  24. log4j.appender.file.layout=org.apache.log4j.PatternLayout
  25. log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  26. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-deployment.yaml

  1. apiVersion: extensions/v1beta1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. template:
  8. metadata:
  9. labels:
  10. app: flink
  11. component: jobmanager
  12. spec:
  13. containers:
  14. - name: jobmanager
  15. image: flink:latest
  16. workingDir: /opt/flink
  17. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
  18. while :;
  19. do
  20. if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
  21. then tail -f -n +1 log/*jobmanager*.log;
  22. fi;
  23. done"]
  24. ports:
  25. - containerPort: 6123
  26. name: rpc
  27. - containerPort: 6124
  28. name: blob
  29. - containerPort: 8081
  30. name: ui
  31. livenessProbe:
  32. tcpSocket:
  33. port: 6123
  34. initialDelaySeconds: 30
  35. periodSeconds: 60
  36. volumeMounts:
  37. - name: flink-config-volume
  38. mountPath: /opt/flink/conf
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j.properties
  47. path: log4j.properties

taskmanager-deployment.yaml

  1. apiVersion: extensions/v1beta1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. template:
  8. metadata:
  9. labels:
  10. app: flink
  11. component: taskmanager
  12. spec:
  13. containers:
  14. - name: taskmanager
  15. image: flink:latest
  16. workingDir: /opt/flink
  17. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
  18. while :;
  19. do
  20. if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
  21. then tail -f -n +1 log/*taskmanager*.log;
  22. fi;
  23. done"]
  24. ports:
  25. - containerPort: 6122
  26. name: rpc
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/flink/conf/
  35. volumes:
  36. - name: flink-config-volume
  37. configMap:
  38. name: flink-config
  39. items:
  40. - key: flink-conf.yaml
  41. path: flink-conf.yaml
  42. - key: log4j.properties
  43. path: log4j.properties

jobmanager-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob
  11. port: 6124
  12. - name: ui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. selector:
  12. app: flink
  13. component: jobmanager