Operators

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

DataStream Transformations

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see operators for an overview of the available transformations in Python DataStream API.

Functions

Transformations accept user-defined functions as input to define the functionality of the transformations. The following section describes different ways of defining Python user-defined functions in Python DataStream API.

Implementing Function Interfaces

Different Function interfaces are provided for different transformations in the Python DataStream API. For example, MapFunction is provided for the map transformation, FilterFunction is provided for the filter transformation, etc. Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for instance:

  1. # Implementing MapFunction
  2. class MyMapFunction(MapFunction):
  3. def map(self, value):
  4. return value + 1
  5. data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
  6. mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

Lambda Function

As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:

  1. data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
  2. mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

Note ConnectedStream.map() and ConnectedStream.flat_map() do not support lambda function and must accept CoMapFunction and CoFlatMapFunction separately.

Python Function

Users could also use Python function to define the functionality of the transformation:

  1. def my_map_func(value):
  2. return value + 1
  3. data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
  4. mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

Output Type

Users could specify the output type information of the transformation explicitly in Python DataStream API. If not specified, the output type will be Types.PICKLED_BYTE_ARRAY by default, and the result data will be serialized using pickle serializer. For more details about the pickle serializer, please refer to Pickle Serialization.

Generally, the output type needs to be specified in the following scenarios.

Convert DataStream into Table

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. from pyflink.table import StreamTableEnvironment
  4. def data_stream_api_demo():
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  7. t_env.execute_sql("""
  8. CREATE TABLE my_source (
  9. a INT,
  10. b VARCHAR
  11. ) WITH (
  12. 'connector' = 'datagen',
  13. 'number-of-rows' = '10'
  14. )
  15. """)
  16. ds = t_env.to_append_stream(
  17. t_env.from_path('my_source'),
  18. Types.ROW([Types.INT(), Types.STRING()]))
  19. def split(s):
  20. splits = s[1].split("|")
  21. for sp in splits:
  22. yield s[0], sp
  23. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  24. .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  25. .key_by(lambda i: i[1]) \
  26. .reduce(lambda i, j: (i[0] + j[0], i[1]))
  27. t_env.execute_sql("""
  28. CREATE TABLE my_sink (
  29. a INT,
  30. b VARCHAR
  31. ) WITH (
  32. 'connector' = 'print'
  33. )
  34. """)
  35. table = t_env.from_data_stream(ds)
  36. table_result = table.execute_insert("my_sink")
  37. # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
  38. # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
  39. table_result.wait()
  40. if __name__ == '__main__':
  41. data_stream_api_demo()

The output type must be specified for the flat_map operation in the above example which will be used as the output type of the reduce operation implicitly. The reason is that t_env.from_data_stream(ds) requires the output type of ds must be a composite type.

Write DataStream to Sink

  1. from pyflink.common.typeinfo import Types
  2. def split(s):
  3. splits = s[1].split("|")
  4. for sp in splits:
  5. yield s[0], sp
  6. ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
  7. .sink_to(...)

Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.

Operator Chaining

By default, multiple non-shuffle Python functions will be chained together to avoid the serialization and deserialization and improve the performance. There are also cases where you may want to disable the chaining, e.g., there is a flatmap function which will produce a large number of elements for each input element and disabling the chaining allows to process its output in a different parallelism.

Operator chaining could be disabled in one of the following ways:

Bundling Python Functions

To run Python functions in any non-local mode, it is strongly recommended bundling your Python functions definitions using the config option python-files, if your Python functions live outside the file where the main() function is defined. Otherwise, you may run into ModuleNotFoundError: No module named 'my_function' if you define Python functions in a file called my_function.py.

Loading resources in Python Functions

There are scenarios when you want to load some resources in Python functions first, then running computation over and over again, without having to re-load the resources. For example, you may want to load a large deep learning model only once, then run batch prediction against the model multiple times.

Overriding the open method inherited from the base class Function is exactly what you need.

  1. class Predict(MapFunction):
  2. def open(self, runtime_context: RuntimeContext):
  3. import pickle
  4. with open("resources.zip/resources/model.pkl", "rb") as f:
  5. self.model = pickle.load(f)
  6. def eval(self, x):
  7. return self.model.predict(x)