Conversions between PyFlink Table and Pandas DataFrame

PyFlink Table API supports conversion between PyFlink Table and Pandas DataFrame.

Pandas DataFrames can be converted into a PyFlink Table. Internally, PyFlink will serialize the Pandas DataFrame using Arrow columnar format on the client. The serialized data will be processed and deserialized in Arrow source during execution. The Arrow source can also be used in streaming jobs, and is integrated with checkpointing to provide exactly-once guarantees.

The following example shows how to create a PyFlink Table from a Pandas DataFrame:

  1. from pyflink.table import DataTypes
  2. import pandas as pd
  3. import numpy as np
  4. # Create a Pandas DataFrame
  5. pdf = pd.DataFrame(np.random.rand(1000, 2))
  6. # Create a PyFlink Table from a Pandas DataFrame
  7. table = t_env.from_pandas(pdf)
  8. # Create a PyFlink Table from a Pandas DataFrame with the specified column names
  9. table = t_env.from_pandas(pdf, ['f0', 'f1'])
  10. # Create a PyFlink Table from a Pandas DataFrame with the specified column types
  11. table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
  12. # Create a PyFlink Table from a Pandas DataFrame with the specified row type
  13. table = t_env.from_pandas(pdf,
  14. DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
  15. DataTypes.FIELD("f1", DataTypes.DOUBLE())]))

PyFlink Tables can additionally be converted into a Pandas DataFrame. The resulting rows will be serialized as multiple Arrow batches of Arrow columnar format on the client. The maximum Arrow batch size is configured via the option python.fn-execution.arrow.batch.size. The serialized data will then be converted to a Pandas DataFrame. Because the contents of the table will be collected on the client, please ensure that the results of the table can fit in memory before calling this method. You can limit the number of rows collected to client side via Table.limit

The following example shows how to convert a PyFlink Table to a Pandas DataFrame:

  1. from pyflink.table.expressions import col
  2. import pandas as pd
  3. import numpy as np
  4. # Create a PyFlink Table
  5. pdf = pd.DataFrame(np.random.rand(1000, 2))
  6. table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
  7. # Convert the PyFlink Table to a Pandas DataFrame
  8. pdf = table.limit(100).to_pandas()