User-defined Functions

PyFlink Table API empowers users to do data transformations with Python user-defined functions.

Currently, it supports two kinds of Python user-defined functions: the general Python user-defined functions which process data one row at a time and vectorized Python user-defined functions which process data one batch at a time.

Bundling UDFs

To run Python UDFs (as well as Pandas UDFs) in any non-local mode, it is strongly recommended bundling your Python UDF definitions using the config option python-files, if your Python UDFs live outside the file where the main() function is defined. Otherwise, you may run into ModuleNotFoundError: No module named 'my_udf' if you define Python UDFs in a file called

Loading resources in UDFs

There are scenarios when you want to load some resources in UDFs first, then running computation (i.e., eval) over and over again, without having to re-load the resources. For example, you may want to load a large deep learning model only once, then run batch prediction against the model multiple times.

Overriding the open method of UserDefinedFunction is exactly what you need.

  1. class Predict(ScalarFunction):
  2. def open(self, function_context):
  3. import pickle
  4. with open("", "rb") as f:
  5. self.model = pickle.load(f)
  6. def eval(self, x):
  7. return self.model.predict(x)
  8. predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")

Accessing job parameters

The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the global job parameters, etc.

The following information can be obtained by calling the corresponding methods of FunctionContext:

get_metric_group()Metric group for this parallel subtask.
get_job_parameter(name, default_value)Global job parameter value associated with given key.
  1. class HashCode(ScalarFunction):
  2. def open(self, function_context: FunctionContext):
  3. # access the global "hashcode_factor" parameter
  4. # "12" would be the default value if the parameter does not exist
  5. self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
  6. def eval(self, s: str):
  7. return hash(s) * self.factor
  8. hash_code = udf(HashCode(), result_type=DataTypes.INT())
  9. TableEnvironment t_env = TableEnvironment.create(...)
  10. t_env.get_config().set('', 'hashcode_factor:31')
  11. t_env.create_temporary_system_function("hashCode", hash_code)
  12. t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")

Testing User-Defined Functions

Suppose you have defined a Python user-defined function as following:

  1. add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())

To unit test it, you need to extract the original Python function using ._func and then unit test it:

  1. f = add._func
  2. assert f(1, 2) == 3