Python REPL

Flink附带了一个集成的交互式Python Shell。 它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。 本地安装Flink,请看本地安装页面。 您也可以从源码安装Flink,请看从源码构建 Flink页面。

注意 Python Shell会调用“python”命令。关于Python执行环境的要求,请参考Python Table API环境安装

你可以通过PyPi安装PyFlink,然后使用Python Shell:

  1. # 安装 PyFlink
  2. $ python -m pip install apache-flink
  3. # 执行脚本
  4. $ pyflink-shell.sh local

关于如何在一个Cluster集群上运行Python shell,可以参考启动章节介绍。

使用

当前Python shell支持Table API的功能。 在启动之后,Table Environment的相关内容将会被自动加载。 可以通过变量”bt_env”来使用BatchTableEnvironment,通过变量”st_env”来使用StreamTableEnvironment。

Table API

下面是一个通过Python Shell 运行的简单示例:

stream

  1. >>> import tempfile
  2. >>> import os
  3. >>> import shutil
  4. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
  5. >>> if os.path.exists(sink_path):
  6. ... if os.path.isfile(sink_path):
  7. ... os.remove(sink_path)
  8. ... else:
  9. ... shutil.rmtree(sink_path)
  10. >>> s_env.set_parallelism(1)
  11. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
  12. >>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
  13. ... .schema(Schema.new_builder()
  14. ... .column("a", DataTypes.BIGINT())
  15. ... .column("b", DataTypes.STRING())
  16. ... .column("c", DataTypes.STRING())
  17. ... .build())
  18. ... .option("path", path)
  19. ... .format(FormatDescriptor.for_format("csv")
  20. ... .option("field-delimiter", ",")
  21. ... .build())
  22. ... .build())
  23. >>> t.select(col('a') + 1, col('b'), col('c'))\
  24. ... .execute_insert("stream_sink").wait()
  25. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
  26. >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
  27. ... print(f.read())

batch

  1. >>> import tempfile
  2. >>> import os
  3. >>> import shutil
  4. >>> sink_path = tempfile.gettempdir() + '/batch.csv'
  5. >>> if os.path.exists(sink_path):
  6. ... if os.path.isfile(sink_path):
  7. ... os.remove(sink_path)
  8. ... else:
  9. ... shutil.rmtree(sink_path)
  10. >>> b_env.set_parallelism(1)
  11. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
  12. >>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
  13. ... .schema(Schema.new_builder()
  14. ... .column("a", DataTypes.BIGINT())
  15. ... .column("b", DataTypes.STRING())
  16. ... .column("c", DataTypes.STRING())
  17. ... .build())
  18. ... .option("path", path)
  19. ... .format(FormatDescriptor.for_format("csv")
  20. ... .option("field-delimiter", ",")
  21. ... .build())
  22. ... .build())
  23. >>> t.select(col('a') + 1, col('b'), col('c'))\
  24. ... .execute_insert("batch_sink").wait()
  25. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
  26. >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
  27. ... print(f.read())

启动

查看Python Shell提供的可选参数,可以使用:

  1. pyflink-shell.sh --help

Local

Python Shell运行在local模式下,只需要执行:

  1. pyflink-shell.sh local

Remote

Python Shell运行在一个指定的JobManager上,通过关键字remote和对应的JobManager 的地址和端口号来进行指定:

  1. pyflink-shell.sh remote <hostname> <portnumber>

Yarn Python Shell cluster

Python Shell可以运行在YARN集群之上。Python shell在Yarn上部署一个新的Flink集群,并进行连接。除了指定container数量,你也 可以指定JobManager的内存,YARN应用的名字等参数。 例如,在一个部署了两个TaskManager的Yarn集群上运行Python Shell:

  1. pyflink-shell.sh yarn -n 2

关于所有可选的参数,可以查看本页面底部的完整说明。

Yarn Session

如果你已经通过Flink Yarn Session部署了一个Flink集群,能够通过以下的命令连接到这个集群:

  1. pyflink-shell.sh yarn

完整的参考

  1. Flink Python Shell
  2. 使用: pyflink-shell.sh [local|remote|yarn] [options] <args>...
  3. 命令: local [选项]
  4. 启动一个部署在localFlink Python shell
  5. 使用:
  6. -h,--help 查看所有可选的参数
  7. 命令: remote [选项] <host> <port>
  8. 启动一个部署在remote集群的Flink Python shell
  9. <host>
  10. JobManager的主机名
  11. <port>
  12. JobManager的端口号
  13. 使用:
  14. -h,--help 查看所有可选的参数
  15. 命令: yarn [选项]
  16. 启动一个部署在Yarn集群的Flink Python Shell
  17. 使用:
  18. -h,--help 查看所有可选的参数
  19. -jm,--jobManagerMemory <arg> 具有可选单元的JobManager
  20. container的内存(默认值:MB)
  21. -n,--container <arg> 需要分配的YARN container
  22. 数量 (=TaskManager的数量)
  23. -nm,--name <arg> 自定义YARN Application的名字
  24. -qu,--queue <arg> 指定YARNqueue
  25. -s,--slots <arg> 每个TaskManagerslots的数量
  26. -tm,--taskManagerMemory <arg> 具有可选单元的每个TaskManager
  27. container的内存(默认值:MB
  28. -h | --help
  29. 打印输出使用文档