调试

本页介绍如何在PyFlink进行调试

打印日志信息

客户端日志

你可以通过 print 或者标准的 Python logging 模块,在 PyFlink 作业中,Python UDF 之外的地方打印上下文和调试信息。 在提交作业时,日志信息会打印在客户端的日志文件中。

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # 创建 TableEnvironment
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
  6. # 使用 logging 模块
  7. import logging
  8. logging.warning(table.get_schema())
  9. # 使用 print 函数
  10. print(table.get_schema())

注意: 客户端缺省的日志级别是 WARNING,因此,只有日志级别在 WARNING 及以上的日志信息才会打印在客户端的日志文件中。

服务器端日志

你可以通过 print 或者标准的 Python logging 模块,在 Python UDF 中打印上下文和调试信息。 在作业运行的过程中,日志信息会打印在 TaskManager 的日志文件中。

  1. from pyflink.table import DataTypes
  2. from pyflink.table.udf import udf
  3. import logging
  4. @udf(result_type=DataTypes.BIGINT())
  5. def add(i, j):
  6. # 使用 logging 模块
  7. logging.info("debug")
  8. # 使用 print 函数
  9. print('debug')
  10. return i + j

注意: 服务器端缺省的日志级别是 INFO,因此,只有日志级别在 INFO 及以上的日志信息才会打印在 TaskManager 的日志文件中。

查看日志

如果设置了环境变量FLINK_HOME,日志将会放置在FLINK_HOME指向目录的log目录之下。否则,日志将会放在安装的Pyflink模块的 log目录下。你可以通过执行下面的命令来查找PyFlink模块的log目录的路径:

  1. $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"

调试Python UDFs

本地调试

你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。

远程调试

你可以利用PyCharm提供的pydevd_pycharm工具进行Python UDF的调试

  1. 在PyCharm里创建一个Python Remote Debug

    run -> Python Remote Debug -> + -> 选择一个port (e.g. 6789)

  2. 安装pydevd-pycharm工具

    1. $ pip install pydevd-pycharm
  3. 在你的Python UDF里面添加如下的代码

    1. import pydevd_pycharm
    2. pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
  4. 启动刚刚创建的Python Remote Dubug Server

  5. 运行你的Python代码

Profiling Python UDFs

你可以打开profile来分析性能瓶颈

  1. t_env.get_config().set("python.profile.enabled", "true")

你可以在日志里面查看profile的结果