向量化自定义函数

向量化 Python 用户自定义函数,是在执行时,通过在 JVM 和 Python VM 之间以 Arrow 列存格式批量传输数据,来执行的函数。 向量化 Python 用户自定义函数的性能通常比非向量化 Python 用户自定义函数要高得多, 因为向量化 Python 用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。 此外,用户可以利用流行的 Python 库(例如 Pandas,Numpy 等)来实现向量化 Python 用户自定义函数的逻辑。 这些 Python 库通常经过高度优化,并提供了高性能的数据结构和功能。 向量化用户自定义函数的定义,与非向量化用户自定义函数具有相似的方式, 用户只需要在调用 udf 或者 udaf 装饰器时添加一个额外的参数 func_type="pandas",将其标记为一个向量化用户自定义函数即可。

注意: 要执行 Python 向量化自定义函数,客户端和集群端都需要安装 Python 3.6 以上版本(3.6、3.7 或 3.8),并安装 PyFlink。

向量化标量函数

向量化 Python 标量函数以 pandas.Series 类型的参数作为输入,并返回与输入长度相同的 pandas.Series。 在内部实现中,Flink 会将输入数据拆分为多个批次,并将每一批次的输入数据转换为 Pandas.Series 类型, 然后为每一批输入数据调用用户自定义的向量化 Python 标量函数。请参阅配置选项 python.fn-execution.arrow.batch.size, 以获取有关如何配置批次大小的更多详细信息。

向量化 Python 标量函数可以在任何可以使用非向量化 Python 标量函数的地方使用。

以下示例显示了如何定义自己的向量化 Python 标量函数,该函数计算两列的总和,并在查询中使用它:

  1. @udf(result_type=DataTypes.BIGINT(), func_type="pandas")
  2. def add(i, j):
  3. return i + j
  4. settings = EnvironmentSettings.in_batch_mode()
  5. table_env = TableEnvironment.create(settings)
  6. # use the vectorized Python scalar function in Python Table API
  7. my_table.select(add(my_table.bigint, my_table.bigint))
  8. # 在SQL API中使用Python向量化标量函数
  9. table_env.create_temporary_function("add", add)
  10. table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

向量化聚合函数

向量化 Python 聚合函数以一个或多个 pandas.Series 类型的参数作为输入,并返回一个标量值作为输出。

注意 现在返回类型还不支持 RowTypeMapType

向量化 Python 聚合函数能够用在 GroupBy Aggregation(Batch),GroupBy Window Aggregation(Batch and Stream) 和 Over Window Aggregation(Batch and Stream bounded over window)。关于聚合的更多使用细节,你可以参考 相关文档.

注意 向量化聚合函数不支持部分聚合,而且一个组或者窗口内的所有数据, 在执行的过程中,会被同时加载到内存,所以需要确保所配置的内存大小足够容纳这些数据。

以下示例显示了如何定一个自己的向量化聚合函数,该函数计算一列的平均值,并在 GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation 使用它:

  1. @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
  2. def mean_udaf(v):
  3. return v.mean()
  4. settings = EnvironmentSettings.in_batch_mode()
  5. table_env = TableEnvironment.create(settings)
  6. my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
  7. # 在 GroupBy Aggregation 中使用向量化聚合函数
  8. my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b)))
  9. # 在 GroupBy Window Aggregation 中使用向量化聚合函数
  10. tumble_window = Tumble.over(expr.lit(1).hours) \
  11. .on(expr.col("rowtime")) \
  12. .alias("w")
  13. my_table.window(tumble_window) \
  14. .group_by("w") \
  15. .select("w.start, w.end, mean_udaf(b)")
  16. # 在 Over Window Aggregation 中使用向量化聚合函数
  17. table_env.create_temporary_function("mean_udaf", mean_udaf)
  18. table_env.sql_query("""
  19. SELECT a,
  20. mean_udaf(b)
  21. over (PARTITION BY a ORDER BY rowtime
  22. ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
  23. FROM MyTable""")

除了直接定义一个 Python 函数之外,还支持多种方式来定义向量化 Python 聚合函数。 以下示例显示了多种定义向量化 Python 聚合函数的方式。该函数需要两个类型为 bigint 的参数作为输入参数,并返回它们的最大值的和作为结果。

  1. # 方式一:扩展基类 `AggregateFunction`
  2. class MaxAdd(AggregateFunction):
  3. def open(self, function_context):
  4. mg = function_context.get_metric_group()
  5. self.counter = mg.add_group("key", "value").counter("my_counter")
  6. self.counter_sum = 0
  7. def get_value(self, accumulator):
  8. # counter
  9. self.counter.inc(10)
  10. self.counter_sum += 10
  11. return accumulator[0]
  12. def create_accumulator(self):
  13. return []
  14. def accumulate(self, accumulator, *args):
  15. result = 0
  16. for arg in args:
  17. result += arg.max()
  18. accumulator.append(result)
  19. max_add = udaf(MaxAdd(), result_type=DataTypes.BIGINT(), func_type="pandas")
  20. # 方式二:普通 Python 函数
  21. @udaf(result_type=DataTypes.BIGINT(), func_type="pandas")
  22. def max_add(i, j):
  23. return i.max() + j.max()
  24. # 方式三:lambda 函数
  25. max_add = udaf(lambda i, j: i.max() + j.max(), result_type=DataTypes.BIGINT(), func_type="pandas")
  26. # 方式四:callable 函数
  27. class CallableMaxAdd(object):
  28. def __call__(self, i, j):
  29. return i.max() + j.max()
  30. max_add = udaf(CallableMaxAdd(), result_type=DataTypes.BIGINT(), func_type="pandas")
  31. # 方式五:partial 函数
  32. def partial_max_add(i, j, k):
  33. return i.max() + j.max() + k
  34. max_add = udaf(functools.partial(partial_max_add, k=1), result_type=DataTypes.BIGINT(), func_type="pandas")