Conversions between PyFlink Table and Pandas DataFrame

It supports to convert between PyFlink Table and Pandas DataFrame.

Convert Pandas DataFrame to PyFlink Table

It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint and provides the exactly once guarantees.

The following example shows how to create a PyFlink Table from a Pandas DataFrame:

  1. import pandas as pd
  2. import numpy as np
  3. # Create a Pandas DataFrame
  4. pdf = pd.DataFrame(np.random.rand(1000, 2))
  5. # Create a PyFlink Table from a Pandas DataFrame
  6. table = t_env.from_pandas(pdf)
  7. # Create a PyFlink Table from a Pandas DataFrame with the specified column names
  8. table = t_env.from_pandas(pdf, ['f0', 'f1'])
  9. # Create a PyFlink Table from a Pandas DataFrame with the specified column types
  10. table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
  11. # Create a PyFlink Table from a Pandas DataFrame with the specified row type
  12. table = t_env.from_pandas(pdf,
  13. DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
  14. DataTypes.FIELD("f1", DataTypes.DOUBLE())])

Convert PyFlink Table to Pandas DataFrame

It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size is determined by the config option python.fn-execution.arrow.batch.size. The serialized data will then be converted to Pandas DataFrame.

The following example shows how to convert a PyFlink Table to a Pandas DataFrame:

  1. import pandas as pd
  2. import numpy as np
  3. # Create a PyFlink Table
  4. pdf = pd.DataFrame(np.random.rand(1000, 2))
  5. table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
  6. # Convert the PyFlink Table to a Pandas DataFrame
  7. pdf = table.to_pandas()