Queries

SELECT statements and VALUES statements are specified with the sqlQuery() method of the TableEnvironment. The method returns the result of the SELECT statement (or the VALUES statements) as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataStream, or written to a TableSink. SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.

In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, CREATE TABLE statement, DataStream. Alternatively, users can also register catalogs in a TableEnvironment to specify the location of the data sources.

For convenience, Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. So, Table objects can be directly inlined into SQL queries as shown in the examples below.

Note: Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

Specifying a Query

The following examples show how to specify a SQL queries on registered and inlined tables.

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // SQL query with an inlined (unregistered) table
  6. Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
  9. // SQL query with a registered table
  10. // register the DataStream as view "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. Table result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  15. // create and register a TableSink
  16. final Schema schema = new Schema()
  17. .field("product", DataTypes.STRING())
  18. .field("amount", DataTypes.INT());
  19. tableEnv.connect(new FileSystem().path("/path/to/file"))
  20. .withFormat(...)
  21. .withSchema(schema)
  22. .createTemporaryTable("RubberOrders");
  23. // run an INSERT SQL on the Table and emit the result to the TableSink
  24. tableEnv.executeSql(
  25. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
  5. // SQL query with an inlined (unregistered) table
  6. val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
  7. val result = tableEnv.sqlQuery(
  8. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
  9. // SQL query with a registered table
  10. // register the DataStream under the name "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. val result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  15. // create and register a TableSink
  16. val schema = new Schema()
  17. .field("product", DataTypes.STRING())
  18. .field("amount", DataTypes.INT())
  19. tableEnv.connect(new FileSystem().path("/path/to/file"))
  20. .withFormat(...)
  21. .withSchema(schema)
  22. .createTemporaryTable("RubberOrders")
  23. // run an INSERT SQL on the Table and emit the result to the TableSink
  24. tableEnv.executeSql(
  25. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env)
  3. # SQL query with an inlined (unregistered) table
  4. # elements data type: BIGINT, STRING, BIGINT
  5. table = table_env.from_elements(..., ['user', 'product', 'amount'])
  6. result = table_env \
  7. .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
  8. # create and register a TableSink
  9. t_env.connect(FileSystem().path("/path/to/file")))
  10. .with_format(Csv()
  11. .field_delimiter(',')
  12. .deriveSchema())
  13. .with_schema(Schema()
  14. .field("product", DataTypes.STRING())
  15. .field("amount", DataTypes.BIGINT()))
  16. .create_temporary_table("RubberOrders")
  17. # run an INSERT SQL on the Table and emit the result to the TableSink
  18. table_env \
  19. .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Execute a Query

A SELECT statement or a VALUES statement can be executed to collect the content to local through the TableEnvironment.executeSql() method. The method returns the result of the SELECT statement (or the VALUES statement) as a TableResult. Similar to a SELECT statement, a Table object can be executed using the Table.execute() method to collect the content of the query to the local client. TableResult.collect() method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the CloseableIterator#close() method. We can also print the select result to client console through the TableResult.print() method. The result data in TableResult can be accessed only once. Thus, collect() and print() must not be called after each other.

TableResult.collect() and TableResult.print() have slightly different behaviors under different checkpointing settings (to enable checkpointing for a streaming job, see checkpointing config).

  • For batch jobs or streaming jobs without checkpointing, TableResult.collect() and TableResult.print() have neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts.
  • For streaming jobs with exactly-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes.
  • For streaming jobs with at-least-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.

    Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
  4. // execute SELECT statement
  5. TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
  6. // use try-with-resources statement to make sure the iterator will be closed automatically
  7. try (CloseableIterator<Row> it = tableResult1.collect()) {
  8. while(it.hasNext()) {
  9. Row row = it.next();
  10. // handle row
  11. }
  12. }
  13. // execute Table
  14. TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
  15. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tableEnv = StreamTableEnvironment.create(env, settings)
  3. // enable checkpointing
  4. tableEnv.getConfig.getConfiguration.set(
  5. ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
  6. tableEnv.getConfig.getConfiguration.set(
  7. ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
  8. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  9. // execute SELECT statement
  10. val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
  11. val it = tableResult1.collect()
  12. try while (it.hasNext) {
  13. val row = it.next
  14. // handle row
  15. }
  16. finally it.close() // close the iterator to avoid resource leak
  17. // execute Table
  18. val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
  19. tableResult2.print()

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env, settings)
  3. # enable checkpointing
  4. table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
  5. table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
  6. table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  7. # execute SELECT statement
  8. table_result1 = table_env.execute_sql("SELECT * FROM Orders")
  9. table_result1.print()
  10. # execute Table
  11. table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
  12. table_result2.print()

Syntax

Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

Grammar ↕

  1. query:
  2. values
  3. | {
  4. select
  5. | selectWithoutFrom
  6. | query UNION [ ALL ] query
  7. | query EXCEPT query
  8. | query INTERSECT query
  9. }
  10. [ ORDER BY orderItem [, orderItem ]* ]
  11. [ LIMIT { count | ALL } ]
  12. [ OFFSET start { ROW | ROWS } ]
  13. [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
  14. orderItem:
  15. expression [ ASC | DESC ]
  16. select:
  17. SELECT [ ALL | DISTINCT ]
  18. { * | projectItem [, projectItem ]* }
  19. FROM tableExpression
  20. [ WHERE booleanExpression ]
  21. [ GROUP BY { groupItem [, groupItem ]* } ]
  22. [ HAVING booleanExpression ]
  23. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  24. selectWithoutFrom:
  25. SELECT [ ALL | DISTINCT ]
  26. { * | projectItem [, projectItem ]* }
  27. projectItem:
  28. expression [ [ AS ] columnAlias ]
  29. | tableAlias . *
  30. tableExpression:
  31. tableReference [, tableReference ]*
  32. | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
  33. joinCondition:
  34. ON booleanExpression
  35. | USING '(' column [, column ]* ')'
  36. tableReference:
  37. tablePrimary
  38. [ matchRecognize ]
  39. [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
  40. tablePrimary:
  41. [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  42. | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  43. | UNNEST '(' expression ')'
  44. tablePath:
  45. [ [ catalogName . ] schemaName . ] tableName
  46. systemTimePeriod:
  47. FOR SYSTEM_TIME AS OF dateTimeExpression
  48. dynamicTableOptions:
  49. /*+ OPTIONS(key=val [, key=val]*) */
  50. key:
  51. stringLiteral
  52. val:
  53. stringLiteral
  54. values:
  55. VALUES expression [, expression ]*
  56. groupItem:
  57. expression
  58. | '(' ')'
  59. | '(' expression [, expression ]* ')'
  60. | CUBE '(' expression [, expression ]* ')'
  61. | ROLLUP '(' expression [, expression ]* ')'
  62. | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  63. windowRef:
  64. windowName
  65. | windowSpec
  66. windowSpec:
  67. [ windowName ]
  68. '('
  69. [ ORDER BY orderItem [, orderItem ]* ]
  70. [ PARTITION BY expression [, expression ]* ]
  71. [
  72. RANGE numericOrIntervalExpression {PRECEDING}
  73. | ROWS numericExpression {PRECEDING}
  74. ]
  75. ')'
  76. matchRecognize:
  77. MATCH_RECOGNIZE '('
  78. [ PARTITION BY expression [, expression ]* ]
  79. [ ORDER BY orderItem [, orderItem ]* ]
  80. [ MEASURES measureColumn [, measureColumn ]* ]
  81. [ ONE ROW PER MATCH ]
  82. [ AFTER MATCH
  83. ( SKIP TO NEXT ROW
  84. | SKIP PAST LAST ROW
  85. | SKIP TO FIRST variable
  86. | SKIP TO LAST variable
  87. | SKIP TO variable )
  88. ]
  89. PATTERN '(' pattern ')'
  90. [ WITHIN intervalLiteral ]
  91. DEFINE variable AS condition [, variable AS condition ]*
  92. ')'
  93. measureColumn:
  94. expression AS alias
  95. pattern:
  96. patternTerm [ '|' patternTerm ]*
  97. patternTerm:
  98. patternFactor [ patternFactor ]*
  99. patternFactor:
  100. variable [ patternQuantifier ]
  101. patternQuantifier:
  102. '*'
  103. | '*?'
  104. | '+'
  105. | '+?'
  106. | '?'
  107. | '??'
  108. | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  109. | '{' repeat '}'

Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

  • The case of identifiers is preserved whether or not they are quoted.
  • After which, identifiers are matched case-sensitively.
  • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. “SELECT a AS `my field` FROM t”).

String literals must be enclosed in single quotes (e.g., SELECT 'Hello World'). Duplicate a single quote for escaping (e.g., SELECT 'It''s me.'). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

  • Use the backslash (\) as escaping character (default): SELECT U&'\263A'
  • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'

Operations