Legacy Planner

Table planners are responsible for translating relational operators into an executable, optimized Flink job. Flink supports two different planner implementations; the modern planner (sometimes referred to as Blink) and the legacy planner. For production use cases, we recommend the modern planner which is the default.

The legacy planner is in maintenance mode and no longer under active development. The primary reason to continue using the legacy planner is DataSet interop.

If you are not using the Legacy planner for DataSet interop, the community strongly encourages you to consider the modern table planner. Both batch and stream processing pipelines can be expressed in the unified TableEnvironment.

** The legacy planner is deprecated and will be dropped in Flink 1.14.**

This page describes how to use the Legacy planner and where its semantics differ from the modern planner.

Setup

Dependencies

When deploying to a cluster, the legacy planner is bundled in Flinks distribution by default. If you want to run the Table API & SQL programs locally within your IDE, you must add the following set of modules to your application.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-planner_2.11</artifactId>
  4. <version>1.13.0</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_2.11</artifactId>
  10. <version>1.13.0</version>
  11. <scope>provided</scope>
  12. </dependency>

Configuring the TableEnvironment

When creating a TableEnvironment the Legacy planner is configured via the EnvironmentSettings.

Java

  1. EnvironmentSettings settings = EnvironmentSettings
  2. .newInstance()
  3. .useOldPlanner()
  4. .inStreamingMode()
  5. // or in batch mode
  6. //.inBatchMode()
  7. .build();
  8. TableEnvironment tEnv = TableEnvironment.create(settings);

Scala

  1. val settings = EnvironmentSettings
  2. .newInstance()
  3. .useOldPlanner()
  4. .inStreamingMode()
  5. // or in batch mode
  6. //.inBatchMode()
  7. .build()
  8. val tEnv = TableEnvironment.create(settings)

BatchTableEnvironment may used for DataSet and DataStream interop respectively.

Java

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

Scala

  1. val env = ExecutionEnvironment.getExecutionEnvironment()
  2. val tEnv = BatchTableEnvironment.create(env)

Python

  1. from pyflink.dataset import ExecutionEnvironment
  2. from pyflink.table import BatchTableEnvironment
  3. b_env = ExecutionEnvironment.get_execution_environment()
  4. t_env = BatchTableEnvironment.create(b_env, table_config)

Integration with DataSet

The primary use case for the Legacy planner is interoperation with the DataSet API. To translate DataSets to and from tables, applications must use the BatchTableEnvironment.

Create a View from a DataSet

A DataSet can be registered in a BatchTableEnvironment as a View. The schema of the resulting view depends on the data type of the registered collection.

Note: Views created from a DataSet can be registered as temporary views only.

Java

  1. BatchTableEnvironment tEnv = ...;
  2. DataSet<Tuple2<Long, String>> dataset = ...;
  3. tEnv.createTemporaryView("my-table", dataset, $("myLong"), $("myString"))

Scala

  1. val tEnv: BatchTableEnvironment = ???
  2. val dataset: DataSet[(Long, String)] = ???
  3. tEnv.createTemporaryView("my-table", dataset, $"myLong", $"myString")

Create a Table from a DataSet

A DataSet can be directly converted to a Table in a BatchTableEnvironment. The schema of the resulting view depends on the data type of the registered collection.

Java

  1. BatchTableEnvironment tEnv = ...;
  2. DataSet<Tuple2<Long, String>> dataset = ...;
  3. Table myTable = tEnv.fromDataSet("my-table", dataset, $("myLong"), $("myString"))

Scala

  1. val tEnv: BatchTableEnvironment = ???
  2. val dataset: DataSet[(Long, String)] = ???
  3. val table = tEnv.fromDataSet("my-table", dataset, $"myLong", $"myString")

Convert a Table to a DataSet

A Table can be converted to a DataSet. In this way, custom DataSet programs can be run on the result of a Table API or SQL query.

When converting from a Table, users must specify the data type of the results. Often the most convenient conversion type is Row. The following list gives an overview of the features of the different options.

  • Row: fields are mapped by position, arbitrary number of fields, support for null values, no type-safe access.
  • POJO: fields are mapped by name (POJO fields must be named as Table fields), arbitrary number of fields, support for null values, type-safe access.
  • Case Class: fields are mapped by position, no support for null values, type-safe access.
  • Tuple: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for null values, type-safe access.
  • Atomic Type: Table must have a single field, no support for null values, type-safe access.

    Java

  1. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
  2. Table table = tableEnv.fromValues(
  3. DataTypes.Row(
  4. DataTypes.FIELD("name", DataTypes.STRING()),
  5. DataTypes.FIELD("age", DataTypes.INT()),
  6. row("john", 35),
  7. row("sarah", 32));
  8. // Convert the Table into a DataSet of Row by specifying a class
  9. DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
  10. // Convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
  11. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
  12. DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);

Scala

  1. val tableEnv = BatchTableEnvironment.create(env)
  2. val table = tableEnv.fromValues(
  3. DataTypes.Row(
  4. DataTypes.FIELD("name", DataTypes.STRING()),
  5. DataTypes.FIELD("age", DataTypes.INT()),
  6. row("john", 35),
  7. row("sarah", 32));
  8. // Convert the Table into a DataSet of Row
  9. val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
  10. // Convert the Table into a DataSet of Tuple2[String, Int]
  11. val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

Attention Once the Table is converted to a DataSet, we must use the ExecutionEnvironment.execute method to execute the DataSet program.

Data Types

The legacy planner, introduced before Flink 1.9, primarily supports type information. It has only limited support for data types. It is possible to declare data types that can be translated into type information such that the legacy planner understands them.

The following table summarizes the difference between data type and type information. Most simple types, as well as the row type remain the same. Time types, array types, and the decimal type need special attention. Other hints as the ones mentioned are not allowed.

For the Type Information column the table omits the prefix org.apache.flink.table.api.Types.

For the Data Type Representation column the table omits the prefix org.apache.flink.table.api.DataTypes.

Type InformationJava Expression StringData Type RepresentationRemarks for Data Type
STRING()STRINGSTRING()
BOOLEAN()BOOLEANBOOLEAN()
BYTE()BYTETINYINT()
SHORT()SHORTSMALLINT()
INT()INTINT()
LONG()LONGBIGINT()
FLOAT()FLOATFLOAT()
DOUBLE()DOUBLEDOUBLE()
ROW(…)ROW<…>ROW(…)
BIG_DEC()DECIMAL[DECIMAL()]Not a 1:1 mapping as precision and scale are ignored and Java’s variable precision and scale are used.
SQL_DATE()SQL_DATEDATE()
.bridgedTo(java.sql.Date.class)
SQL_TIME()SQL_TIMETIME(0)
.bridgedTo(java.sql.Time.class)
SQL_TIMESTAMP()SQL_TIMESTAMPTIMESTAMP(3)
.bridgedTo(java.sql.Timestamp.class)
INTERVAL_MONTHS()INTERVAL_MONTHSINTERVAL(MONTH())
.bridgedTo(Integer.class)
INTERVAL_MILLIS()INTERVAL_MILLISINTERVAL(DataTypes.SECOND(3))
.bridgedTo(Long.class)
PRIMITIVE_ARRAY(…)PRIMITIVE_ARRAY<…>ARRAY(DATATYPE.notNull()
.bridgedTo(PRIMITIVE.class))
Applies to all JVM primitive types except for byte.
PRIMITIVE_ARRAY(BYTE())PRIMITIVE_ARRAY<BYTE>BYTES()
OBJECT_ARRAY(…)OBJECT_ARRAY<…>ARRAY(
DATATYPE.bridgedTo(OBJECT.class))
MULTISET(…)MULTISET(…)
MAP(…, …)MAP<…,…>MAP(…)
other generic typesRAW(…)

Attention If there is a problem with the new type system. Users can fallback to type information defined in org.apache.flink.table.api.Types at any time.

Unsupported Features

The following features are not supported by the legacy planner.

Unsupported Built-In Functions

The following built-in functions are not supported by the legacy planner.

  • PI
  • ASCII(string)
  • CHR(integer)
  • DECODE(binary, string)
  • ENCODE(string1, string2)
  • INSTR(string1, string2)
  • LEFT(string, integer)
  • RIGHT(string, integer)
  • LOCATE(string1, string2[, integer])
  • PARSE_URL(string1, string2[, string3])
  • REGEXP(string1, string2)
  • REVERSE(string)
  • SPLIT_INDEX(string1, string2, integer1)
  • STR_TO_MAP(string1[, string2, string3]])
  • SUBSTR(string[, integer1[, integer2]])
  • CONVERT_TZ(string1, string2, string3)
  • FROM_UNIXTIME(numeric[, string])
  • UNIX_TIMESTAMP()
  • UNIX_TIMESTAMP(string1[, string2])
  • TO_DATE(string1[, string2])
  • TO_TIMESTAMP(string1[, string2])
  • NOW()
  • IF(condition, true_value, false_value)
  • IS_ALPHA(string)
  • IS_DECIMAL(string)
  • IS_DIGIT(string)
  • VARIANCE([ ALL | DISTINCT ] expression)
  • RANK()
  • DENSE_RANK()
  • ROW_NUMBER()
  • LEAD(expression [, offset] [, default] )
  • LAG(expression [, offset] [, default])
  • FIRST_VALUE(expression)
  • LAST_VALUE(expression)
  • LISTAGG(expression [, separator])

DATE_FORMAT(timestamp, string) is available in the legacy planner but has serious bugs and should not be used. Please implement a custom UDF instead or use EXTRACT as a workaround.