连接器

本篇描述了如何在 PyFlink 中使用连接器,并着重介绍了在 Python 程序中使用 Flink 连接器时需要注意的细节。

Note 想要了解常见的连接器信息和通用配置,请查阅相关的 Java/Scala 文档

下载连接器(connector)和格式(format)jar 包

由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的, 要在 PyFlink 作业中使用,首先需要将其指定为作业的 依赖

  1. table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

如何使用连接器

在 PyFlink Table API 中,DDL 是定义 source 和 sink 比较推荐的方式,这可以通过 TableEnvironment 中的 execute_sql() 方法来完成,然后就可以在作业中使用这张表了。

  1. source_ddl = """
  2. CREATE TABLE source_table(
  3. a VARCHAR,
  4. b INT
  5. ) WITH (
  6. 'connector' =' = 'kafka',
  7. 'topic' = 'source_topic',
  8. 'properties.bootstrap.servers' = 'kafka:9092',
  9. 'properties.group.id' = 'test_3',
  10. 'scan.startup.mode' = 'latest-offset',
  11. 'format' = 'json'
  12. )
  13. """
  14. sink_ddl = """
  15. CREATE TABLE sink_table(
  16. a VARCHAR
  17. ) WITH (
  18. 'connector' = 'kafka',
  19. 'topic' = 'sink_topic',
  20. 'properties.bootstrap.servers' = 'kafka:9092',
  21. 'format' = 'json'
  22. )
  23. """
  24. t_env.execute_sql(source_ddl)
  25. t_env.execute_sql(sink_ddl)
  26. t_env.sql_query("SELECT a FROM source_table") \
  27. .execute_insert("sink_table").wait()

下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。

  1. from pyflink.table import TableEnvironment, EnvironmentSettings
  2. def log_processing():
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. t_env = TableEnvironment.create(env_settings)
  5. # specify connector and format jars
  6. t_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
  7. source_ddl = """
  8. CREATE TABLE source_table(
  9. a VARCHAR,
  10. b INT
  11. ) WITH (
  12. 'connector' = 'kafka',
  13. 'topic' = 'source_topic',
  14. 'properties.bootstrap.servers' = 'kafka:9092',
  15. 'properties.group.id' = 'test_3',
  16. 'scan.startup.mode' = 'latest-offset',
  17. 'format' = 'json'
  18. )
  19. """
  20. sink_ddl = """
  21. CREATE TABLE sink_table(
  22. a VARCHAR
  23. ) WITH (
  24. 'connector' = 'kafka',
  25. 'topic' = 'sink_topic',
  26. 'properties.bootstrap.servers' = 'kafka:9092',
  27. 'format' = 'json'
  28. )
  29. """
  30. t_env.execute_sql(source_ddl)
  31. t_env.execute_sql(sink_ddl)
  32. t_env.sql_query("SELECT a FROM source_table") \
  33. .execute_insert("sink_table").wait()
  34. if __name__ == '__main__':
  35. log_processing()

内置的 Sources 和 Sinks

有些 source 和 sink 被内置在 Flink 中,可以直接使用。这些内置的 source 包括将 Pandas DataFrame 作为数据源, 或者将一个元素集合作为数据源。内置的 sink 包括将数据转换为 Pandas DataFrame 等。

和 Pandas 之间互转

PyFlink 表支持与 Pandas DataFrame 之间互相转换。

  1. from pyflink.table.expressions import col
  2. import pandas as pd
  3. import numpy as np
  4. # 创建一个 PyFlink 表
  5. pdf = pd.DataFrame(np.random.rand(1000, 2))
  6. table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
  7. # 将 PyFlink 表转换成 Pandas DataFrame
  8. pdf = table.to_pandas()

from_elements()

from_elements() 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。

  1. from pyflink.table import DataTypes
  2. table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
  3. # 使用第二个参数指定自定义字段名
  4. table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
  5. # 使用第二个参数指定自定义表结构
  6. table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
  7. DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
  8. DataTypes.FIELD("b", DataTypes.STRING())]))

以上查询返回的表如下:

  1. +----+-------+
  2. | a | b |
  3. +====+=======+
  4. | 1 | Hi |
  5. +----+-------+
  6. | 2 | Hello |
  7. +----+-------+

用户自定义的 source 和 sink

在某些情况下,你可能想要自定义 source 或 sink。目前,source 和 sink 必须使用 Java/Scala 实现,你可以定义一个 TableFactory , 然后通过 DDL 在 PyFlink 作业中来使用它们。更多详情,可查阅 Java/Scala 文档