Conversions between PyFlink Table and Pandas DataFrame

PyFlink Table API supports conversion to and from Pandas DataFrame.

Convert Pandas DataFrame to PyFlink Table

Pandas DataFrames can be converted into a PyFlink Table. Internally, PyFlink will serialize the Pandas DataFrame using Arrow columnar format on the client. The serialized data will be processed and deserialized in Arrow source during execution. The Arrow source can also be used in streaming jobs, and is integrated with checkpointing to provide exactly-once guarantees.

The following example shows how to create a PyFlink Table from a Pandas DataFrame:

  1. import pandas as pd
  2. import numpy as np
  3. # Create a Pandas DataFrame
  4. pdf = pd.DataFrame(np.random.rand(1000, 2))
  5. # Create a PyFlink Table from a Pandas DataFrame
  6. table = t_env.from_pandas(pdf)
  7. # Create a PyFlink Table from a Pandas DataFrame with the specified column names
  8. table = t_env.from_pandas(pdf, ['f0', 'f1'])
  9. # Create a PyFlink Table from a Pandas DataFrame with the specified column types
  10. table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
  11. # Create a PyFlink Table from a Pandas DataFrame with the specified row type
  12. table = t_env.from_pandas(pdf,
  13. DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
  14. DataTypes.FIELD("f1", DataTypes.DOUBLE())])

Convert PyFlink Table to Pandas DataFrame

PyFlink Tables can additionally be converted into a Pandas DataFrame. The resulting rows will be serialized as multiple Arrow batches of Arrow columnar format on the client. The maximum Arrow batch size is configured via the option python.fn-execution.arrow.batch.size. The serialized data will then be converted to a Pandas DataFrame. Because the contents of the table will be collected on the client, please ensure that the results of the table can fit in memory before calling this method. You can limit the number of rows collected to client side via Table.limit

The following example shows how to convert a PyFlink Table to a Pandas DataFrame:

  1. import pandas as pd
  2. import numpy as np
  3. # Create a PyFlink Table
  4. pdf = pd.DataFrame(np.random.rand(1000, 2))
  5. table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
  6. # Convert the PyFlink Table to a Pandas DataFrame
  7. pdf = table.limit(100).to_pandas()