Queries

SELECT statements and VALUES statements are specified with the sqlQuery() method of the TableEnvironment. The method returns the result of the SELECT statement (or the VALUES statements) as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataStream, or written to a TableSink. SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.

In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, CREATE TABLE statement, DataStream. Alternatively, users can also register catalogs in a TableEnvironment to specify the location of the data sources.

For convenience, Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. So, Table objects can be directly inlined into SQL queries as shown in the examples below.

Note: Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

Specifying a Query

The following examples show how to specify a SQL queries on registered and inlined tables.

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // SQL query with an inlined (unregistered) table
  6. Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
  9. // SQL query with a registered table
  10. // register the DataStream as view "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. Table result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  15. // create and register a TableSink
  16. final Schema schema = Schema.newBuilder()
  17. .column("product", DataTypes.STRING())
  18. .column("amount", DataTypes.INT())
  19. .build();
  20. final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem")
  21. .schema(schema)
  22. .format(FormatDescriptor.forFormat("csv")
  23. .option("field-delimiter", ",")
  24. .build())
  25. .build();
  26. tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor);
  27. // run an INSERT SQL on the Table and emit the result to the TableSink
  28. tableEnv.executeSql(
  29. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
  5. // SQL query with an inlined (unregistered) table
  6. val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
  7. val result = tableEnv.sqlQuery(
  8. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
  9. // SQL query with a registered table
  10. // register the DataStream under the name "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. val result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  15. // create and register a TableSink
  16. val schema = Schema.newBuilder()
  17. .column("product", DataTypes.STRING())
  18. .column("amount", DataTypes.INT())
  19. .build()
  20. val sinkDescriptor = TableDescriptor.forConnector("filesystem")
  21. .schema(schema)
  22. .format(FormatDescriptor.forFormat("csv")
  23. .option("field-delimiter", ",")
  24. .build())
  25. .build()
  26. tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor)
  27. // run an INSERT SQL on the Table and emit the result to the TableSink
  28. tableEnv.executeSql(
  29. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env)
  3. # SQL query with an inlined (unregistered) table
  4. # elements data type: BIGINT, STRING, BIGINT
  5. table = table_env.from_elements(..., ['user', 'product', 'amount'])
  6. result = table_env \
  7. .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
  8. # create and register a TableSink
  9. schema = Schema.new_builder()
  10. .column("product", DataTypes.STRING())
  11. .column("amount", DataTypes.INT())
  12. .build()
  13. sink_descriptor = TableDescriptor.for_connector("filesystem")
  14. .schema(schema)
  15. .format(FormatDescriptor.for_format("csv")
  16. .option("field-delimiter", ",")
  17. .build())
  18. .build()
  19. t_env.create_temporary_table("RubberOrders", sink_descriptor)
  20. # run an INSERT SQL on the Table and emit the result to the TableSink
  21. table_env \
  22. .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Back to top

Execute a Query

A SELECT statement or a VALUES statement can be executed to collect the content to local through the TableEnvironment.executeSql() method. The method returns the result of the SELECT statement (or the VALUES statement) as a TableResult. Similar to a SELECT statement, a Table object can be executed using the Table.execute() method to collect the content of the query to the local client. TableResult.collect() method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the CloseableIterator#close() method. We can also print the select result to client console through the TableResult.print() method. The result data in TableResult can be accessed only once. Thus, collect() and print() must not be called after each other.

TableResult.collect() and TableResult.print() have slightly different behaviors under different checkpointing settings (to enable checkpointing for a streaming job, see checkpointing config).

  • For batch jobs or streaming jobs without checkpointing, TableResult.collect() and TableResult.print() have neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts.
  • For streaming jobs with exactly-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes.
  • For streaming jobs with at-least-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.

    Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
  4. // execute SELECT statement
  5. TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
  6. // use try-with-resources statement to make sure the iterator will be closed automatically
  7. try (CloseableIterator<Row> it = tableResult1.collect()) {
  8. while(it.hasNext()) {
  9. Row row = it.next();
  10. // handle row
  11. }
  12. }
  13. // execute Table
  14. TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
  15. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tableEnv = StreamTableEnvironment.create(env, settings)
  3. // enable checkpointing
  4. tableEnv.getConfig.set(
  5. ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
  6. tableEnv.getConfig.set(
  7. ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
  8. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  9. // execute SELECT statement
  10. val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
  11. val it = tableResult1.collect()
  12. try while (it.hasNext) {
  13. val row = it.next
  14. // handle row
  15. }
  16. finally it.close() // close the iterator to avoid resource leak
  17. // execute Table
  18. val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
  19. tableResult2.print()

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env, settings)
  3. # enable checkpointing
  4. table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
  5. table_env.get_config().set("execution.checkpointing.interval", "10s")
  6. table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  7. # execute SELECT statement
  8. table_result1 = table_env.execute_sql("SELECT * FROM Orders")
  9. table_result1.print()
  10. # execute Table
  11. table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
  12. table_result2.print()

Back to top

Syntax

Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

Grammar ↕

  1. query:
  2. values
  3. | WITH withItem [ , withItem ]* query
  4. | {
  5. select
  6. | selectWithoutFrom
  7. | query UNION [ ALL ] query
  8. | query EXCEPT query
  9. | query INTERSECT query
  10. }
  11. [ ORDER BY orderItem [, orderItem ]* ]
  12. [ LIMIT { count | ALL } ]
  13. [ OFFSET start { ROW | ROWS } ]
  14. [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
  15. withItem:
  16. name
  17. [ '(' column [, column ]* ')' ]
  18. AS '(' query ')'
  19. orderItem:
  20. expression [ ASC | DESC ]
  21. select:
  22. SELECT [ ALL | DISTINCT ]
  23. { * | projectItem [, projectItem ]* }
  24. FROM tableExpression
  25. [ WHERE booleanExpression ]
  26. [ GROUP BY { groupItem [, groupItem ]* } ]
  27. [ HAVING booleanExpression ]
  28. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  29. selectWithoutFrom:
  30. SELECT [ ALL | DISTINCT ]
  31. { * | projectItem [, projectItem ]* }
  32. projectItem:
  33. expression [ [ AS ] columnAlias ]
  34. | tableAlias . *
  35. tableExpression:
  36. tableReference [, tableReference ]*
  37. | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
  38. joinCondition:
  39. ON booleanExpression
  40. | USING '(' column [, column ]* ')'
  41. tableReference:
  42. tablePrimary
  43. [ matchRecognize ]
  44. [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
  45. tablePrimary:
  46. [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  47. | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  48. | [ LATERAL ] '(' query ')'
  49. | UNNEST '(' expression ')'
  50. tablePath:
  51. [ [ catalogName . ] databaseName . ] tableName
  52. systemTimePeriod:
  53. FOR SYSTEM_TIME AS OF dateTimeExpression
  54. dynamicTableOptions:
  55. /*+ OPTIONS(key=val [, key=val]*) */
  56. key:
  57. stringLiteral
  58. val:
  59. stringLiteral
  60. values:
  61. VALUES expression [, expression ]*
  62. groupItem:
  63. expression
  64. | '(' ')'
  65. | '(' expression [, expression ]* ')'
  66. | CUBE '(' expression [, expression ]* ')'
  67. | ROLLUP '(' expression [, expression ]* ')'
  68. | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  69. windowRef:
  70. windowName
  71. | windowSpec
  72. windowSpec:
  73. [ windowName ]
  74. '('
  75. [ ORDER BY orderItem [, orderItem ]* ]
  76. [ PARTITION BY expression [, expression ]* ]
  77. [
  78. RANGE numericOrIntervalExpression {PRECEDING}
  79. | ROWS numericExpression {PRECEDING}
  80. ]
  81. ')'
  82. matchRecognize:
  83. MATCH_RECOGNIZE '('
  84. [ PARTITION BY expression [, expression ]* ]
  85. [ ORDER BY orderItem [, orderItem ]* ]
  86. [ MEASURES measureColumn [, measureColumn ]* ]
  87. [ ONE ROW PER MATCH ]
  88. [ AFTER MATCH
  89. ( SKIP TO NEXT ROW
  90. | SKIP PAST LAST ROW
  91. | SKIP TO FIRST variable
  92. | SKIP TO LAST variable
  93. | SKIP TO variable )
  94. ]
  95. PATTERN '(' pattern ')'
  96. [ WITHIN intervalLiteral ]
  97. DEFINE variable AS condition [, variable AS condition ]*
  98. ')'
  99. measureColumn:
  100. expression AS alias
  101. pattern:
  102. patternTerm [ '|' patternTerm ]*
  103. patternTerm:
  104. patternFactor [ patternFactor ]*
  105. patternFactor:
  106. variable [ patternQuantifier ]
  107. patternQuantifier:
  108. '*'
  109. | '*?'
  110. | '+'
  111. | '+?'
  112. | '?'
  113. | '??'
  114. | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  115. | '{' repeat '}'

Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

  • The case of identifiers is preserved whether or not they are quoted.
  • After which, identifiers are matched case-sensitively.
  • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. SELECT a AS `my field` FROM t).

String literals must be enclosed in single quotes (e.g., SELECT 'Hello World'). Duplicate a single quote for escaping (e.g., SELECT 'It''s me').

  1. Flink SQL> SELECT 'Hello World', 'It''s me';
  2. +-------------+---------+
  3. | EXPR$0 | EXPR$1 |
  4. +-------------+---------+
  5. | Hello World | It's me |
  6. +-------------+---------+
  7. 1 row in set

Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

  • Use the backslash (\) as escaping character (default): SELECT U&'\263A'
  • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'

Back to top

Operations

Back to top