Python REPL

Flink comes with an integrated interactive Python Shell.It can be used in a local setup as well as in a cluster setup.See the local setup page for more information about how to setup a local Flink.You can also build a local setup from source.

Note The Python Shell will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to Python 3.5+:

  1. $ python --version
  2. # the version printed here must be 3.5+

Note Using Python UDF in Python Shell requires apache-beam 2.15.0. Run the following command to confirm that it meets the requirements before run the Shell in local mode:

  1. $ python -m pip install apache-beam==2.15.0

To use the shell with an integrated Flink cluster just execute:

  1. bin/pyflink-shell.sh local

in the root directory of your binary Flink directory. To run the Shell on acluster, please see the Setup section below.

Usage

The shell only supports Table API currently.The Table Environments are automatically prebound after startup. Use “bt_env” and “st_env” to access BatchTableEnvironment and StreamTableEnvironment respectively.

Table API

The example below is a simple program in the Python shell:

  1. >>> import tempfile
  2. >>> import os
  3. >>> import shutil
  4. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
  5. >>> if os.path.exists(sink_path):
  6. ... if os.path.isfile(sink_path):
  7. ... os.remove(sink_path)
  8. ... else:
  9. ... shutil.rmtree(sink_path)
  10. >>> s_env.set_parallelism(1)
  11. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
  12. >>> st_env.connect(FileSystem().path(sink_path))\
  13. ... .with_format(OldCsv()
  14. ... .field_delimiter(',')
  15. ... .field("a", DataTypes.BIGINT())
  16. ... .field("b", DataTypes.STRING())
  17. ... .field("c", DataTypes.STRING()))\
  18. ... .with_schema(Schema()
  19. ... .field("a", DataTypes.BIGINT())
  20. ... .field("b", DataTypes.STRING())
  21. ... .field("c", DataTypes.STRING()))\
  22. ... .register_table_sink("stream_sink")
  23. >>> t.select("a + 1, b, c")\
  24. ... .insert_into("stream_sink")
  25. >>> st_env.execute("stream_job")
  26. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
  27. >>> with open(sink_path, 'r') as f:
  28. ... print(f.read())
  1. >>> import tempfile
  2. >>> import os
  3. >>> import shutil
  4. >>> sink_path = tempfile.gettempdir() + '/batch.csv'
  5. >>> if os.path.exists(sink_path):
  6. ... if os.path.isfile(sink_path):
  7. ... os.remove(sink_path)
  8. ... else:
  9. ... shutil.rmtree(sink_path)
  10. >>> b_env.set_parallelism(1)
  11. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
  12. >>> bt_env.connect(FileSystem().path(sink_path))\
  13. ... .with_format(OldCsv()
  14. ... .field_delimiter(',')
  15. ... .field("a", DataTypes.BIGINT())
  16. ... .field("b", DataTypes.STRING())
  17. ... .field("c", DataTypes.STRING()))\
  18. ... .with_schema(Schema()
  19. ... .field("a", DataTypes.BIGINT())
  20. ... .field("b", DataTypes.STRING())
  21. ... .field("c", DataTypes.STRING()))\
  22. ... .register_table_sink("batch_sink")
  23. >>> t.select("a + 1, b, c")\
  24. ... .insert_into("batch_sink")
  25. >>> bt_env.execute("batch_job")
  26. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
  27. >>> with open(sink_path, 'r') as f:
  28. ... print(f.read())

Setup

To get an overview of what options the Python Shell provides, please use

  1. bin/pyflink-shell.sh --help

Local

To use the shell with an integrated Flink cluster just execute:

  1. bin/pyflink-shell.sh local

Remote

To use it with a running cluster, please start the Python shell with the keyword remoteand supply the host and port of the JobManager with:

  1. bin/pyflink-shell.sh remote <hostname> <portnumber>

Yarn Python Shell cluster

The shell can deploy a Flink cluster to YARN, which is used exclusively by theshell.The shell deploys a new Flink cluster on YARN and connects thecluster. You can also specify options for YARN cluster such as memory forJobManager, name of YARN application, etc.

For example, to start a Yarn cluster for the Python Shell with two TaskManagersuse the following:

  1. bin/pyflink-shell.sh yarn -n 2

For all other options, see the full reference at the bottom.

Yarn Session

If you have previously deployed a Flink cluster using the Flink Yarn Session,the Python shell can connect with it using the following command:

  1. bin/pyflink-shell.sh yarn

Full Reference

  1. Flink Python Shell
  2. Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...
  3. Command: local [options]
  4. Starts Flink Python shell with a local Flink cluster
  5. usage:
  6. -h,--help Show the help message with descriptions of all options.
  7. Command: remote [options] <host> <port>
  8. Starts Flink Python shell connecting to a remote cluster
  9. <host>
  10. Remote host name as string
  11. <port>
  12. Remote port as integer
  13. usage:
  14. -h,--help Show the help message with descriptions of all options.
  15. Command: yarn [options]
  16. Starts Flink Python shell connecting to a yarn cluster
  17. usage:
  18. -h,--help Show the help message with descriptions of
  19. all options.
  20. -jm,--jobManagerMemory <arg> Memory for JobManager Container with
  21. optional unit (default: MB)
  22. -nm,--name <arg> Set a custom name for the application on
  23. YARN
  24. -qu,--queue <arg> Specify YARN queue.
  25. -s,--slots <arg> Number of slots per TaskManager
  26. -tm,--taskManagerMemory <arg> Memory per TaskManager Container with
  27. optional unit (default: MB)
  28. -h | --help
  29. Prints this usage text