Python API 教程

在该教程中,我们会从零开始,介绍如何创建一个Flink Python项目及运行Python Table API程序。

注意 PyFlink的运行需要Python 3.5及以上版本。

执行以下命令以确认当前环境下的指令“python”指向Python 3.5及以上版本:

  1. $ python --version
  2. # the version printed here must be 3.5+

创建一个Python Table API项目

首先,使用您最熟悉的IDE创建一个Python项目。之后执行命令python -m pip install apache-flink从PyPI下载安装PyFlink包。如果您想从源码安装,请参考构建PyFlink了解详细信息。

编写Flink Python Table API程序的第一步是创建BatchTableEnvironment(或者StreamTableEnvironment,如果你要创建一个流式作业)。这是Python Table API作业的入口类。

  1. exec_env = ExecutionEnvironment.get_execution_environment()
  2. exec_env.set_parallelism(1)
  3. t_config = TableConfig()
  4. t_env = BatchTableEnvironment.create(exec_env, t_config)

ExecutionEnvironment (或者StreamExecutionEnvironment,如果你要创建一个流式作业)可以用来设置执行参数,比如重启策略,缺省并发值等。

TableConfig可以用来设置缺省的catalog名字,自动生成代码时方法大小的阈值等.

接下来,我们将介绍如何创建源表和结果表。

  1. t_env.connect(FileSystem().path('/tmp/input')) \
  2. .with_format(OldCsv()
  3. .line_delimiter(' ')
  4. .field('word', DataTypes.STRING())) \
  5. .with_schema(Schema()
  6. .field('word', DataTypes.STRING())) \
  7. .create_temporary_table('mySource')
  8. t_env.connect(FileSystem().path('/tmp/output')) \
  9. .with_format(OldCsv()
  10. .field_delimiter('\t')
  11. .field('word', DataTypes.STRING())
  12. .field('count', DataTypes.BIGINT())) \
  13. .with_schema(Schema()
  14. .field('word', DataTypes.STRING())
  15. .field('count', DataTypes.BIGINT())) \
  16. .create_temporary_table('mySink')

上面的程序展示了如何创建及在ExecutionEnvironment中注册表名分别为mySourcemySink的表。其中,源表mySource有一列: word,该表代表了从输入文件/tmp/input中读取的单词;结果表mySink有两列: word和count,该表会将计算结果输出到文件/tmp/output中,字段之间使用\t作为分隔符。

接下来,我们介绍如何创建一个作业:该作业读取表mySource中的数据,进行一些变换,然后将结果写入表mySink

  1. t_env.scan('mySource') \
  2. .group_by('word') \
  3. .select('word, count(1)') \
  4. .insert_into('mySink')

最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当t_env.execute(job_name)被调用的时候,作业才会被真正提交到集群或者本地进行执行。

  1. t_env.execute("python_job")

该教程的完整代码如下:

  1. from pyflink.dataset import ExecutionEnvironment
  2. from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
  3. from pyflink.table.descriptors import Schema, OldCsv, FileSystem
  4. exec_env = ExecutionEnvironment.get_execution_environment()
  5. exec_env.set_parallelism(1)
  6. t_config = TableConfig()
  7. t_env = BatchTableEnvironment.create(exec_env, t_config)
  8. t_env.connect(FileSystem().path('/tmp/input')) \
  9. .with_format(OldCsv()
  10. .field('word', DataTypes.STRING())) \
  11. .with_schema(Schema()
  12. .field('word', DataTypes.STRING())) \
  13. .create_temporary_table('mySource')
  14. t_env.connect(FileSystem().path('/tmp/output')) \
  15. .with_format(OldCsv()
  16. .field_delimiter('\t')
  17. .field('word', DataTypes.STRING())
  18. .field('count', DataTypes.BIGINT())) \
  19. .with_schema(Schema()
  20. .field('word', DataTypes.STRING())
  21. .field('count', DataTypes.BIGINT())) \
  22. .create_temporary_table('mySink')
  23. t_env.from_path('mySource') \
  24. .group_by('word') \
  25. .select('word, count(1)') \
  26. .insert_into('mySink')
  27. t_env.execute("python_job")

首先,你需要在文件 “/tmp/input” 中准备好输入数据。你可以选择通过如下命令准备输入数据:

  1. $ echo "flink\npyflink\nflink" > /tmp/input

接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“/tmp/output”已经存在,你需要先删除文件,否则程序将无法正确运行起来):

  1. $ python WordCount.py

上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行,可以参考作业提交示例

最后,你可以通过如下命令查看你的运行结果:

  1. $ cat /tmp/output
  2. flink 2
  3. pyflink 1

上述教程介绍了如何编写并运行一个Flink Python Table API程序,如果想了解Flink Python Table API的更多信息,可以参考Flink Python Table API文档