Kubernetes 安装

入门

入门 指南描述了如何在 Kubernetes 上部署 Flink Session 集群

介绍

本文描述了如何使用 Flink standalone 部署模式在 Kubernetes 上部署 standalone 模式的 Flink 集群。通常我们建议新用户使用 native Kubernetes 部署模式在 Kubernetes上部署 Flink。

准备

本指南假设存在一个 Kubernets 的运行环境。你可以通过运行 kubectl get nodes 命令来确保 Kubernetes 环境运行正常,该命令展示所有连接到 Kubernets 集群的 node 节点信息。

如果你想在本地运行 Kubernetes,建议使用 MiniKube

如果使用 MiniKube,请确保在部署 Flink 集群之前先执行 minikube ssh 'sudo ip link set docker0 promisc on',否则 Flink 组件不能自动地将自己映射到 Kubernetes Service 中。

Flink session 集群 是以一种长期运行的 Kubernetes Deployment 形式执行的。你可以在一个 session 集群 上运行多个 Flink 作业。当然,只有 session 集群部署好以后才可以在上面提交 Flink 作业。

在 Kubernetes 上部署一个基本的 Flink session 集群 时,一般包括下面三个组件:

  • 运行 JobManagerDeployment
  • 运行 TaskManagersDeployment
  • 暴露 JobManager 上 REST 和 UI 端口的 Service

使用通用集群资源定义中提供的文件内容来创建以下文件,并使用 kubectl 命令来创建相应的组件:

  1. # Configuration 和 service 的定义
  2. $ kubectl create -f flink-configuration-configmap.yaml
  3. $ kubectl create -f jobmanager-service.yaml
  4. # 为集群创建 deployment
  5. $ kubectl create -f jobmanager-session-deployment.yaml
  6. $ kubectl create -f taskmanager-session-deployment.yaml

接下来,我们设置端口转发以访问 Flink UI 页面并提交作业:

  1. 运行 kubectl port-forward ${flink-jobmanager-pod} 8081:8081 将 jobmanager 的 web ui 端口映射到本地 8081。
  2. 在浏览器中导航到 http://localhost:8081 页面。
  3. 此外,也可以使用如下命令向集群提交作业:
  1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

可以使用以下命令停止运行 flink 集群:

  1. $ kubectl delete -f jobmanager-service.yaml
  2. $ kubectl delete -f flink-configuration-configmap.yaml
  3. $ kubectl delete -f taskmanager-session-deployment.yaml
  4. $ kubectl delete -f jobmanager-session-deployment.yaml

部署模式

Application 集群模式

Flink Application 集群 是运行单个 Application 的专用集群,部署集群时要保证该 Application 可用。

在 Kubernetes 上部署一个基本的 Flink Application 集群 时,一般包括下面三个组件:

  • 一个运行 JobManagerApplication
  • 运行若干个 TaskManager 的 Deployment;
  • 暴露 JobManager 上 REST 和 UI 端口的 Service;

检查 Application 集群资源定义 并做出相应的调整:

jobmanager-job.yaml 中的 args 属性必须指定用户作业的主类。也可以参考如何设置 JobManager 参数来了解如何将额外的 args 传递给 jobmanager-job.yaml 配置中指定的 Flink 镜像。

job artifacts 参数必须可以从 资源定义示例 中的 job-artifacts-volume 处获取。假如是在 minikube 集群中创建这些组件,那么定义示例中的 job-artifacts-volume 可以挂载为主机的本地目录。如果不使用 minikube 集群,那么可以使用 Kubernetes 集群中任何其它可用类型的 volume 来提供 job artifacts。此外,还可以构建一个已经包含 job artifacts 参数的自定义镜像

在创建通用集群组件后,指定 Application 集群资源定义文件,执行 kubectl 命令来启动 Flink Application 集群:

  1. $ kubectl create -f jobmanager-job.yaml
  2. $ kubectl create -f taskmanager-job-deployment.yaml

要停止单个 application 集群,可以使用 kubectl 命令来删除相应组件以及 通用集群资源对应的组件 :

  1. $ kubectl delete -f taskmanager-job-deployment.yaml
  2. $ kubectl delete -f jobmanager-job.yaml

Per-Job 集群模式

在 Kubernetes 上部署 Standalone 集群时不支持 Per-Job 集群模式。

Session 集群模式

本文档开始部分的入门指南中描述了 Session 集群模式的部署。

Kubernetes 上运行 Standalone 集群指南

Configuration

所有配置项都展示在配置页面上。在 config map 配置文件 flink-configuration-configmap.yaml 中,可以将配置添加在 flink-conf.yaml 部分。

接下来可以访问 Flink UI 页面并通过不同的方式提交作业:

  • kubectl proxy:

    1. 在终端运行 kubectl proxy 命令。
    2. 在浏览器中导航到 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy
  • kubectl port-forward:

    1. 运行 kubectl port-forward ${flink-jobmanager-pod} 8081:8081 将 jobmanager 的 web ui 端口映射到本地的 8081。
    2. 在浏览器中导航到 http://localhost:8081
    3. 此外,也可以使用如下命令向集群提交作业:
    1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
  • 基于 jobmanager 的 rest 服务上创建 NodePort service:

    1. 运行 kubectl create -f jobmanager-rest-service.yaml 来基于 jobmanager 创建 NodePort service。jobmanager-rest-service.yaml 的示例文件可以在 附录 中找到。
    2. 运行 kubectl get svc flink-jobmanager-rest 来查询 server 的 node-port,然后再浏览器导航到 http://<public-node-ip>:<node-port>
    3. 如果使用 minikube 集群,可以执行 minikube ip 命令来查看 public ip。
    4. port-forward 方案类似,也可以使用如下命令向集群提交作业。
    1. $ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar

调试和访问日志

通过查看 Flink 的日志文件,可以很轻松地发现许多常见错误。如果你有权访问 Flink 的 Web 用户界面,那么可以在页面上访问 JobManager 和 TaskManager 日志。

如果启动 Flink 出现问题,也可以使用 Kubernetes 工具集访问日志。使用 kubectl get pods 命令查看所有运行的 pods 资源。针对上面的快速入门示例,你可以看到三个 pod:

  1. $ kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
  4. flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
  5. flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s

现在你可以通过运行 kubectl logs flink-jobmanager-589967dcfc-m49xv 来访问日志。

高可用的 Standalone Kubernetes

对于在 Kubernetes 上实现HA,可以参考当前的 Kubernets 高可用服务

Kubernetes 高可用 Services

Session 模式和 Application 模式集群都支持使用 Kubernetes 高可用服务。需要在 flink-configuration-configmap.yaml 中添加如下 Flink 配置项。

Note 配置了 HA 存储目录相对应的文件系统必须在运行时可用。请参阅自定义Flink 镜像启用文件系统插件获取更多相关信息。

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. ...
  10. kubernetes.cluster-id: <cluster-id>
  11. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  12. high-availability.storageDir: hdfs:///flink/recovery
  13. restart-strategy: fixed-delay
  14. restart-strategy.fixed-delay.attempts: 10
  15. ...

此外,你必须使用具有创建、编辑、删除 ConfigMap 权限的 service 账号启动 JobManager 和 TaskManager pod。请查看如何为 pod 配置 service 账号获取更多信息。

当启用了高可用,Flink 会使用自己的 HA 服务进行服务发现。因此,JobManager Pod 会使用 IP 地址而不是 Kubernetes 的 service 名称来作为 jobmanager.rpc.address 的配置项启动。完整配置请参考附录

Standby JobManagers

通常,只启动一个 JobManager pod 就足够了,因为一旦 pod 崩溃,Kubernetes 就会重新启动它。如果要实现更快的恢复,需要将 jobmanager-session-deployment-ha.yaml 中的 replicas 配置 或 jobmanager-application-ha.yaml 中的 parallelism 配置设定为大于 1 的整型值来启动 Standby JobManagers。

启用 Queryable State

如果你为 TaskManager 创建了 NodePort service,那么你就可以访问 TaskManager 的 Queryable State 服务:

  1. 运行 kubectl create -f taskmanager-query-state-service.yaml 来为 taskmanager pod 创建 NodePort service。taskmanager-query-state-service.yaml 的示例文件可以从附录中找到。
  2. 运行 kubectl get svc flink-taskmanager-query-state 来查询 service 对应 node-port 的端口号。然后你就可以创建 QueryableStateClient(<public-node-ip>, <node-port> 来提交状态查询。

在 Reactive 模式下使用 Standalone Kubernetes

Reactive Mode 允许在 Application 集群 始终根据可用资源调整作业并行度的模式下运行 Flink。与 Kubernetes 结合使用,TaskManager 部署的副本数决定了可用资源。增加副本数将扩大作业规模,而减少副本数将会触发缩减作业规模。通过使用 Horizontal Pod Autoscaler 也可以自动实现该功能。

要在 Kubernetes 上使用 Reactive Mode,请按照使用 Application 集群部署作业 完成相同的步骤。但是要使用 flink-reactive-mode-configuration-configmap.yaml 配置文件来代替 flink-configuration-configmap.yaml。该文件包含了针对 Flink 的 scheduler-mode: reactive 配置。

一旦你部署了 Application 集群,就可以通过修改 flink-taskmanager 的部署副本数量来扩大或缩小作业的并行度。

附录

通用集群资源定义

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. log4j-console.properties: |+
  19. # 如下配置会同时影响用户代码和 Flink 的日志行为
  20. rootLogger.level = INFO
  21. rootLogger.appenderRef.console.ref = ConsoleAppender
  22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  23. # 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
  24. #logger.flink.name = org.apache.flink
  25. #logger.flink.level = INFO
  26. # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
  27. # root logger 的配置不会覆盖此处配置。
  28. # 你必须手动修改这里的日志级别。
  29. logger.akka.name = akka
  30. logger.akka.level = INFO
  31. logger.kafka.name= org.apache.kafka
  32. logger.kafka.level = INFO
  33. logger.hadoop.name = org.apache.hadoop
  34. logger.hadoop.level = INFO
  35. logger.zookeeper.name = org.apache.zookeeper
  36. logger.zookeeper.level = INFO
  37. # 将所有 info 级别的日志输出到 console
  38. appender.console.name = ConsoleAppender
  39. appender.console.type = CONSOLE
  40. appender.console.layout.type = PatternLayout
  41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  42. # 将所有 info 级别的日志输出到指定的 rolling file
  43. appender.rolling.name = RollingFileAppender
  44. appender.rolling.type = RollingFile
  45. appender.rolling.append = false
  46. appender.rolling.fileName = ${sys:log.file}
  47. appender.rolling.filePattern = ${sys:log.file}.%i
  48. appender.rolling.layout.type = PatternLayout
  49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  50. appender.rolling.policies.type = Policies
  51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  52. appender.rolling.policies.size.size=100MB
  53. appender.rolling.strategy.type = DefaultRolloverStrategy
  54. appender.rolling.strategy.max = 10
  55. # 关闭 Netty channel handler 中不相关的(错误)警告
  56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  57. logger.netty.level = OFF

flink-reactive-mode-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. scheduler-mode: reactive
  19. execution.checkpointing.interval: 10s
  20. log4j-console.properties: |+
  21. # 如下配置会同时影响用户代码和 Flink 的日志行为
  22. rootLogger.level = INFO
  23. rootLogger.appenderRef.console.ref = ConsoleAppender
  24. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  25. # 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
  26. #logger.flink.name = org.apache.flink
  27. #logger.flink.level = INFO
  28. # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
  29. # root logger 的配置不会覆盖此处配置。
  30. # 你必须手动修改这里的日志级别。
  31. logger.akka.name = akka
  32. logger.akka.level = INFO
  33. logger.kafka.name= org.apache.kafka
  34. logger.kafka.level = INFO
  35. logger.hadoop.name = org.apache.hadoop
  36. logger.hadoop.level = INFO
  37. logger.zookeeper.name = org.apache.zookeeper
  38. logger.zookeeper.level = INFO
  39. # 将所有 info 级别的日志输出到 console
  40. appender.console.name = ConsoleAppender
  41. appender.console.type = CONSOLE
  42. appender.console.layout.type = PatternLayout
  43. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  44. # 将所有 info 级别的日志输出到指定的 rolling file
  45. appender.rolling.name = RollingFileAppender
  46. appender.rolling.type = RollingFile
  47. appender.rolling.append = false
  48. appender.rolling.fileName = ${sys:log.file}
  49. appender.rolling.filePattern = ${sys:log.file}.%i
  50. appender.rolling.layout.type = PatternLayout
  51. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  52. appender.rolling.policies.type = Policies
  53. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  54. appender.rolling.policies.size.size=100MB
  55. appender.rolling.strategy.type = DefaultRolloverStrategy
  56. appender.rolling.strategy.max = 10
  57. # 关闭 Netty channel handler 中不相关的(错误)警告
  58. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  59. logger.netty.level = OFF

jobmanager-service.yaml 。可选的 service,仅在非 HA 模式下需要。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml。可选的 service,该 service 将 jobmanager 的 rest 端口暴露为公共 Kubernetes node 的节点端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager

taskmanager-query-state-service.yaml。可选的 service,该 service 将 TaskManager 的端口暴露为公共 Kubernetes node 的节点端口,通过该端口来访问 queryable state 服务。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-taskmanager-query-state
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: query-state
  9. port: 6125
  10. targetPort: 6125
  11. nodePort: 30025
  12. selector:
  13. app: flink
  14. component: taskmanager

Session 集群资源定义

jobmanager-session-deployment-non-ha.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: apache/flink:1.14.4-scala_2.11
  20. args: ["jobmanager"]
  21. ports:
  22. - containerPort: 6123
  23. name: rpc
  24. - containerPort: 6124
  25. name: blob-server
  26. - containerPort: 8081
  27. name: webui
  28. livenessProbe:
  29. tcpSocket:
  30. port: 6123
  31. initialDelaySeconds: 30
  32. periodSeconds: 60
  33. volumeMounts:
  34. - name: flink-config-volume
  35. mountPath: /opt/flink/conf
  36. securityContext:
  37. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  38. volumes:
  39. - name: flink-config-volume
  40. configMap:
  41. name: flink-config
  42. items:
  43. - key: flink-conf.yaml
  44. path: flink-conf.yaml
  45. - key: log4j-console.properties
  46. path: log4j-console.properties

jobmanager-session-deployment-ha.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1 # 通过设置大于 1 的整型值来开启 Standby JobManager
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: apache/flink:1.14.4-scala_2.11
  20. env:
  21. - name: POD_IP
  22. valueFrom:
  23. fieldRef:
  24. apiVersion: v1
  25. fieldPath: status.podIP
  26. # 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 的属性值。
  27. args: ["jobmanager", "$(POD_IP)"]
  28. ports:
  29. - containerPort: 6123
  30. name: rpc
  31. - containerPort: 6124
  32. name: blob-server
  33. - containerPort: 8081
  34. name: webui
  35. livenessProbe:
  36. tcpSocket:
  37. port: 6123
  38. initialDelaySeconds: 30
  39. periodSeconds: 60
  40. volumeMounts:
  41. - name: flink-config-volume
  42. mountPath: /opt/flink/conf
  43. securityContext:
  44. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  45. serviceAccountName: flink-service-account # 拥有创建、编辑、删除 ConfigMap 权限的 Service 账号
  46. volumes:
  47. - name: flink-config-volume
  48. configMap:
  49. name: flink-config
  50. items:
  51. - key: flink-conf.yaml
  52. path: flink-conf.yaml
  53. - key: log4j-console.properties
  54. path: log4j-console.properties

taskmanager-session-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: apache/flink:1.14.4-scala_2.11
  20. args: ["taskmanager"]
  21. ports:
  22. - containerPort: 6122
  23. name: rpc
  24. - containerPort: 6125
  25. name: query-state
  26. livenessProbe:
  27. tcpSocket:
  28. port: 6122
  29. initialDelaySeconds: 30
  30. periodSeconds: 60
  31. volumeMounts:
  32. - name: flink-config-volume
  33. mountPath: /opt/flink/conf/
  34. securityContext:
  35. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  36. volumes:
  37. - name: flink-config-volume
  38. configMap:
  39. name: flink-config
  40. items:
  41. - key: flink-conf.yaml
  42. path: flink-conf.yaml
  43. - key: log4j-console.properties
  44. path: log4j-console.properties

Application 集群资源定义

jobmanager-application-non-ha.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. template:
  7. metadata:
  8. labels:
  9. app: flink
  10. component: jobmanager
  11. spec:
  12. restartPolicy: OnFailure
  13. containers:
  14. - name: jobmanager
  15. image: apache/flink:1.14.4-scala_2.11
  16. env:
  17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  18. ports:
  19. - containerPort: 6123
  20. name: rpc
  21. - containerPort: 6124
  22. name: blob-server
  23. - containerPort: 8081
  24. name: webui
  25. livenessProbe:
  26. tcpSocket:
  27. port: 6123
  28. initialDelaySeconds: 30
  29. periodSeconds: 60
  30. volumeMounts:
  31. - name: flink-config-volume
  32. mountPath: /opt/flink/conf
  33. - name: job-artifacts-volume
  34. mountPath: /opt/flink/usrlib
  35. securityContext:
  36. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  37. volumes:
  38. - name: flink-config-volume
  39. configMap:
  40. name: flink-config
  41. items:
  42. - key: flink-conf.yaml
  43. path: flink-conf.yaml
  44. - key: log4j-console.properties
  45. path: log4j-console.properties
  46. - name: job-artifacts-volume
  47. hostPath:
  48. path: /host/path/to/job/artifacts

jobmanager-application-ha.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. parallelism: 1 # 通过设置大于 1 的整型值来开启 Standby JobManager
  7. template:
  8. metadata:
  9. labels:
  10. app: flink
  11. component: jobmanager
  12. spec:
  13. restartPolicy: OnFailure
  14. containers:
  15. - name: jobmanager
  16. image: apache/flink:1.14.4-scala_2.11
  17. env:
  18. - name: POD_IP
  19. valueFrom:
  20. fieldRef:
  21. apiVersion: v1
  22. fieldPath: status.podIP
  23. # 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 的属性值。
  24. args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  25. ports:
  26. - containerPort: 6123
  27. name: rpc
  28. - containerPort: 6124
  29. name: blob-server
  30. - containerPort: 8081
  31. name: webui
  32. livenessProbe:
  33. tcpSocket:
  34. port: 6123
  35. initialDelaySeconds: 30
  36. periodSeconds: 60
  37. volumeMounts:
  38. - name: flink-config-volume
  39. mountPath: /opt/flink/conf
  40. - name: job-artifacts-volume
  41. mountPath: /opt/flink/usrlib
  42. securityContext:
  43. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  44. serviceAccountName: flink-service-account # 拥有创建、编辑、删除 ConfigMap 权限的 Service 账号
  45. volumes:
  46. - name: flink-config-volume
  47. configMap:
  48. name: flink-config
  49. items:
  50. - key: flink-conf.yaml
  51. path: flink-conf.yaml
  52. - key: log4j-console.properties
  53. path: log4j-console.properties
  54. - name: job-artifacts-volume
  55. hostPath:
  56. path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: apache/flink:1.14.4-scala_2.11
  20. env:
  21. args: ["taskmanager"]
  22. ports:
  23. - containerPort: 6122
  24. name: rpc
  25. - containerPort: 6125
  26. name: query-state
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/flink/conf/
  35. - name: job-artifacts-volume
  36. mountPath: /opt/flink/usrlib
  37. securityContext:
  38. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j-console.properties
  47. path: log4j-console.properties
  48. - name: job-artifacts-volume
  49. hostPath:
  50. path: /host/path/to/job/artifacts