命令行界面

Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。

使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过<flink-home>/bin/start-cluster.sh)或YARN环境可用。

命令行可用于

例子

  • 运行没有参数的示例程序:
  1. ./bin/flink run ./examples/batch/WordCount.jar
  • 使用输入和结果文件的参数运行示例程序:
  1. ./bin/flink run ./examples/batch/WordCount.jar \
  2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行带有并行性的示例程序16以及输入和结果文件的参数:
  1. ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
  2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行禁用flink log输出的示例程序:
  1. ./bin/flink run -q ./examples/batch/WordCount.jar
  • 以分离模式运行示例程序:
  1. ./bin/flink run -d ./examples/batch/WordCount.jar
  • 在特定的JobManager上运行示例程序:
  1. ./bin/flink run -m myJMHost:8081 \
  2. ./examples/batch/WordCount.jar \
  3. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 以特定类作为入口点运行示例程序:
  1. ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
  2. ./examples/batch/WordCount.jar \
  3. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  1. ./bin/flink run -m yarn-cluster -yn 2 \
  2. ./examples/batch/WordCount.jar \
  3. --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
  • 以Word为单位显示WordCount示例程序的优化执行计划:
  1. ./bin/flink info ./examples/batch/WordCount.jar \
  2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 列出计划和正在运行的作业(包括其JobID):
  1. ./bin/flink list
  • 列出预定作业(包括其作业ID):
  1. ./bin/flink list -s
  • 列出正在运行的作业(包括其作业ID):
  1. ./bin/flink list -r
  • 列出所有现有工作(包括其作业ID):
  1. ./bin/flink list -a
  • 列出在Flink YARN会话中运行Flink作业:
  1. ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
  • 取消工作:
  1. ./bin/flink cancel <jobID>
  • 使用保存点取消作业:
  1. ./bin/flink cancel -s [targetDirectory] <jobID>
  • 停止工作(仅限流处理工作):
  1. ./bin/flink stop <jobID>
  • 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify -p

注意:取消和停止(流处理)作业的区别如下:

在取消呼叫中,作业中的算子立即接收cancel()方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。

“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction接口的源的作业当用户请求停止作业时,所有源都将接收stop()方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。

保存点

保存点通过命令行客户端控制:

触发保存点

  1. ./bin/flink savepoint <jobId> [savepointDirectory]

这将触发具有ID的作业的保存点jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。

如果未指定目标目录,则需要配置默认目录否则,触发保存点将失败。

使用YARN触发保存点

  1. ./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

这将触发具有ID jobId和YARN应用程序ID 的作业的保存点yarnAppId,并返回创建的保存点的路径。

其他所有内容与上面触发保存点部分中描述的相同

使用保存点取消

您可以自动触发保存点并取消作业。

  1. ./bin/flink cancel -s [savepointDirectory] <jobID>

如果未配置保存点目录,则需要为Flink安装配置默认保存点目录(请参阅保存点)。

只有保存点成功,才会取消该作业。

恢复保存点

  1. ./bin/flink run -s <savepointPath> ...

run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。

默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。

  1. ./bin/flink run -s <savepointPath> -n ...

如果您的程序删除了属于保存点的 算子,这将非常有用。

配置保存点

  1. ./bin/flink savepoint -d <savepointPath>

在给定路径处理保存点。savepoint trigger命令返回保存点路径。

如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:

  1. ./bin/flink savepoint -d <savepointPath> -j <jarFile>

否则,你会遇到一个ClassNotFoundException

用法

命令行语法如下:

  1. ./flink <ACTION> [OPTIONS] [ARGUMENTS]
  2. The following actions are available:
  3. Action "run" compiles and runs a program.
  4. Syntax: run [OPTIONS] <jar-file> <arguments>
  5. "run" action options:
  6. -c,--class <classname> Class with the program entry point
  7. ("main" method or "getPlan()" method.
  8. Only needed if the JAR file does not
  9. specify the class in its manifest.
  10. -C,--classpath <url> Adds a URL to each user code
  11. classloader on all nodes in the
  12. cluster. The paths must specify a
  13. protocol (e.g. file://) and be
  14. accessible on all nodes (e.g. by means
  15. of a NFS share). You can use this
  16. option multiple times for specifying
  17. more than one URL. The protocol must
  18. be supported by the {@link
  19. java.net.URLClassLoader}.
  20. -d,--detached If present, runs the job in detached
  21. mode
  22. -n,--allowNonRestoredState Allow to skip savepoint state that
  23. cannot be restored. You need to allow
  24. this if you removed an operator from
  25. your program that was part of the
  26. program when the savepoint was
  27. triggered.
  28. -p,--parallelism <parallelism> The parallelism with which to run the
  29. program. Optional flag to override the
  30. default value specified in the
  31. configuration.
  32. -q,--sysoutLogging If present, suppress logging output to
  33. standard out.
  34. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
  35. from (for example
  36. hdfs:///flink/savepoint-1537).
  37. Options for yarn-cluster mode:
  38. -d,--detached If present, runs the job in detached
  39. mode
  40. -m,--jobmanager <arg> Address of the JobManager (master) to
  41. which to connect. Use this flag to
  42. connect to a different JobManager than
  43. the one specified in the
  44. configuration.
  45. -yD <property=value> use value for given property
  46. -yd,--yarndetached If present, runs the job in detached
  47. mode (deprecated; use non-YARN
  48. specific option instead)
  49. -yh,--yarnhelp Help for the Yarn session CLI.
  50. -yid,--yarnapplicationId <arg> Attach to running YARN session
  51. -yj,--yarnjar <arg> Path to Flink jar file
  52. -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container
  53. with optional unit (default: MB)
  54. -yn,--yarncontainer <arg> Number of YARN container to allocate
  55. (=Number of Task Managers)
  56. -ynm,--yarnname <arg> Set a custom name for the application
  57. on YARN
  58. -yq,--yarnquery Display available YARN resources
  59. (memory, cores)
  60. -yqu,--yarnqueue <arg> Specify YARN queue.
  61. -ys,--yarnslots <arg> Number of slots per TaskManager
  62. -yst,--yarnstreaming Start Flink in streaming mode
  63. -yt,--yarnship <arg> Ship files in the specified directory
  64. (t for transfer)
  65. -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
  66. with optional unit (default: MB)
  67. -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
  68. sub-paths for high availability mode
  69. -ynl,--yarnnodeLabel <arg> Specify YARN node label for
  70. the YARN application
  71. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  72. sub-paths for high availability mode
  73. Options for default mode:
  74. -m,--jobmanager <arg> Address of the JobManager (master) to which
  75. to connect. Use this flag to connect to a
  76. different JobManager than the one specified
  77. in the configuration.
  78. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  79. for high availability mode
  80. Action "info" shows the optimized execution plan of the program (JSON).
  81. Syntax: info [OPTIONS] <jar-file> <arguments>
  82. "info" action options:
  83. -c,--class <classname> Class with the program entry point ("main"
  84. method or "getPlan()" method. Only needed
  85. if the JAR file does not specify the class
  86. in its manifest.
  87. -p,--parallelism <parallelism> The parallelism with which to run the
  88. program. Optional flag to override the
  89. default value specified in the
  90. configuration.
  91. Action "list" lists running and scheduled programs.
  92. Syntax: list [OPTIONS]
  93. "list" action options:
  94. -r,--running Show only running programs and their JobIDs
  95. -s,--scheduled Show only scheduled programs and their JobIDs
  96. Options for yarn-cluster mode:
  97. -m,--jobmanager <arg> Address of the JobManager (master) to
  98. which to connect. Use this flag to connect
  99. to a different JobManager than the one
  100. specified in the configuration.
  101. -yid,--yarnapplicationId <arg> Attach to running YARN session
  102. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  103. sub-paths for high availability mode
  104. Options for default mode:
  105. -m,--jobmanager <arg> Address of the JobManager (master) to which
  106. to connect. Use this flag to connect to a
  107. different JobManager than the one specified
  108. in the configuration.
  109. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  110. for high availability mode
  111. Action "stop" stops a running program (streaming jobs only).
  112. Syntax: stop [OPTIONS] <Job ID>
  113. "stop" action options:
  114. Options for yarn-cluster mode:
  115. -m,--jobmanager <arg> Address of the JobManager (master) to
  116. which to connect. Use this flag to connect
  117. to a different JobManager than the one
  118. specified in the configuration.
  119. -yid,--yarnapplicationId <arg> Attach to running YARN session
  120. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  121. sub-paths for high availability mode
  122. Options for default mode:
  123. -m,--jobmanager <arg> Address of the JobManager (master) to which
  124. to connect. Use this flag to connect to a
  125. different JobManager than the one specified
  126. in the configuration.
  127. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  128. for high availability mode
  129. Action "cancel" cancels a running program.
  130. Syntax: cancel [OPTIONS] <Job ID>
  131. "cancel" action options:
  132. -s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job.
  133. The target directory is optional. If
  134. no directory is specified, the
  135. configured default directory
  136. (state.savepoints.dir) is used.
  137. Options for yarn-cluster mode:
  138. -m,--jobmanager <arg> Address of the JobManager (master) to
  139. which to connect. Use this flag to connect
  140. to a different JobManager than the one
  141. specified in the configuration.
  142. -yid,--yarnapplicationId <arg> Attach to running YARN session
  143. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  144. sub-paths for high availability mode
  145. Options for default mode:
  146. -m,--jobmanager <arg> Address of the JobManager (master) to which
  147. to connect. Use this flag to connect to a
  148. different JobManager than the one specified
  149. in the configuration.
  150. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  151. for high availability mode
  152. Action "savepoint" triggers savepoints for a running job or disposes existing ones.
  153. Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  154. "savepoint" action options:
  155. -d,--dispose <arg> Path of savepoint to dispose.
  156. -j,--jarfile <jarfile> Flink program JAR file.
  157. Options for yarn-cluster mode:
  158. -m,--jobmanager <arg> Address of the JobManager (master) to
  159. which to connect. Use this flag to connect
  160. to a different JobManager than the one
  161. specified in the configuration.
  162. -yid,--yarnapplicationId <arg> Attach to running YARN session
  163. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  164. sub-paths for high availability mode
  165. Options for default mode:
  166. -m,--jobmanager <arg> Address of the JobManager (master) to which
  167. to connect. Use this flag to connect to a
  168. different JobManager than the one specified
  169. in the configuration.
  170. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  171. for high availability mode
  172. Action "modify" modifies a running job (e.g. change of parallelism).
  173. Syntax: modify <Job ID> [OPTIONS]
  174. "modify" action options:
  175. -h,--help Show the help message for the CLI
  176. Frontend or the action.
  177. -p,--parallelism <newParallelism> New parallelism for the specified job.
  178. -v,--verbose This option is deprecated.
  179. Options for yarn-cluster mode:
  180. -m,--jobmanager <arg> Address of the JobManager (master) to
  181. which to connect. Use this flag to connect
  182. to a different JobManager than the one
  183. specified in the configuration.
  184. -yid,--yarnapplicationId <arg> Attach to running YARN session
  185. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  186. sub-paths for high availability mode
  187. Options for default mode:
  188. -m,--jobmanager <arg> Address of the JobManager (master) to which
  189. to connect. Use this flag to connect to a
  190. different JobManager than the one specified
  191. in the configuration.
  192. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  193. for high availability mode