向量化自定义函数

向量化 Python 用户自定义函数,是在执行时,通过在 JVM 和 Python VM 之间以 Arrow 列存格式批量传输数据,来执行的函数。 向量化 Python 用户自定义函数的性能通常比非向量化 Python 用户自定义函数要高得多, 因为向量化 Python 用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。 此外,用户可以利用流行的 Python 库(例如 Pandas,Numpy 等)来实现向量化 Python 用户自定义函数的逻辑。 这些 Python 库通常经过高度优化,并提供了高性能的数据结构和功能。 向量化用户自定义函数的定义,与非向量化用户自定义函数具有相似的方式, 用户只需要在调用 udf 或者 udaf 装饰器时添加一个额外的参数 func_type="pandas",将其标记为一个向量化用户自定义函数即可。

注意: 要执行 Python 向量化自定义函数,客户端和集群端都需要安装 Python 3.6 以上版本(3.6、3.7 或 3.8),并安装 PyFlink。

向量化标量函数

向量化 Python 标量函数以 pandas.Series 类型的参数作为输入,并返回与输入长度相同的 pandas.Series。 在内部实现中,Flink 会将输入数据拆分为多个批次,并将每一批次的输入数据转换为 Pandas.Series 类型, 然后为每一批输入数据调用用户自定义的向量化 Python 标量函数。请参阅配置选项 python.fn-execution.arrow.batch.size, 以获取有关如何配置批次大小的更多详细信息。

向量化 Python 标量函数可以在任何可以使用非向量化 Python 标量函数的地方使用。

以下示例显示了如何定义自己的向量化 Python 标量函数,该函数计算两列的总和,并在查询中使用它:

  1. from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
  2. from pyflink.table.expressions import col
  3. from pyflink.table.udf import udf
  4. @udf(result_type=DataTypes.BIGINT(), func_type="pandas")
  5. def add(i, j):
  6. return i + j
  7. settings = EnvironmentSettings.in_batch_mode()
  8. table_env = TableEnvironment.create(settings)
  9. # use the vectorized Python scalar function in Python Table API
  10. my_table.select(add(col("bigint"), col("bigint")))
  11. # 在SQL API中使用Python向量化标量函数
  12. table_env.create_temporary_function("add", add)
  13. table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

向量化聚合函数

向量化 Python 聚合函数以一个或多个 pandas.Series 类型的参数作为输入,并返回一个标量值作为输出。

注意 现在返回类型还不支持 RowTypeMapType

向量化 Python 聚合函数能够用在 GroupBy Aggregation(Batch),GroupBy Window Aggregation(Batch and Stream) 和 Over Window Aggregation(Batch and Stream bounded over window)。关于聚合的更多使用细节,你可以参考 相关文档.

注意 向量化聚合函数不支持部分聚合,而且一个组或者窗口内的所有数据, 在执行的过程中,会被同时加载到内存,所以需要确保所配置的内存大小足够容纳这些数据。

以下示例显示了如何定一个自己的向量化聚合函数,该函数计算一列的平均值,并在 GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation 使用它:

  1. from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
  2. from pyflink.table.expressions import col, lit
  3. from pyflink.table.udf import udaf
  4. from pyflink.table.window import Tumble
  5. @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
  6. def mean_udaf(v):
  7. return v.mean()
  8. settings = EnvironmentSettings.in_batch_mode()
  9. table_env = TableEnvironment.create(settings)
  10. my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
  11. # 在 GroupBy Aggregation 中使用向量化聚合函数
  12. my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))
  13. # 在 GroupBy Window Aggregation 中使用向量化聚合函数
  14. tumble_window = Tumble.over(lit(1).hours) \
  15. .on(col("rowtime")) \
  16. .alias("w")
  17. my_table.window(tumble_window) \
  18. .group_by(col("w")) \
  19. .select(col('w').start, col('w').end, mean_udaf(col('b')))
  20. # 在 Over Window Aggregation 中使用向量化聚合函数
  21. table_env.create_temporary_function("mean_udaf", mean_udaf)
  22. table_env.sql_query("""
  23. SELECT a,
  24. mean_udaf(b)
  25. over (PARTITION BY a ORDER BY rowtime
  26. ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
  27. FROM MyTable""")

除了直接定义一个 Python 函数之外,还支持多种方式来定义向量化 Python 聚合函数。 以下示例显示了多种定义向量化 Python 聚合函数的方式。该函数需要两个类型为 bigint 的参数作为输入参数,并返回它们的最大值的和作为结果。

  1. from pyflink.table import DataTypes
  2. from pyflink.table.udf import AggregateFunction, udaf
  3. # 方式一:扩展基类 `AggregateFunction`
  4. class MaxAdd(AggregateFunction):
  5. def open(self, function_context):
  6. mg = function_context.get_metric_group()
  7. self.counter = mg.add_group("key", "value").counter("my_counter")
  8. self.counter_sum = 0
  9. def get_value(self, accumulator):
  10. # counter
  11. self.counter.inc(10)
  12. self.counter_sum += 10
  13. return accumulator[0]
  14. def create_accumulator(self):
  15. return []
  16. def accumulate(self, accumulator, *args):
  17. result = 0
  18. for arg in args:
  19. result += arg.max()
  20. accumulator.append(result)
  21. max_add = udaf(MaxAdd(), result_type=DataTypes.BIGINT(), func_type="pandas")
  22. # 方式二:普通 Python 函数
  23. @udaf(result_type=DataTypes.BIGINT(), func_type="pandas")
  24. def max_add(i, j):
  25. return i.max() + j.max()
  26. # 方式三:lambda 函数
  27. max_add = udaf(lambda i, j: i.max() + j.max(), result_type=DataTypes.BIGINT(), func_type="pandas")
  28. # 方式四:callable 函数
  29. class CallableMaxAdd(object):
  30. def __call__(self, i, j):
  31. return i.max() + j.max()
  32. max_add = udaf(CallableMaxAdd(), result_type=DataTypes.BIGINT(), func_type="pandas")
  33. # 方式五:partial 函数
  34. def partial_max_add(i, j, k):
  35. return i.max() + j.max() + k
  36. max_add = udaf(functools.partial(partial_max_add, k=1), result_type=DataTypes.BIGINT(), func_type="pandas")