SQL

This is a complete list of Data Definition Language (DDL) and Data Manipulation Language (DML) constructs supported in Flink.

Query

SQL queries are specified with the sqlQuery() method of the TableEnvironment. The method returns the result of the SQL query as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.

In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, CREATE TABLE statement, DataStream, or DataSet. Alternatively, users can also register external catalogs in a TableEnvironment to specify the location of the data sources.

For convenience Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. Hence, Table objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.

Note: Flink’s SQL support is not yet feature complete. Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

Specifying a Query

The following examples show how to specify a SQL queries on registered and inlined tables.

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // SQL query with an inlined (unregistered) table
  6. Table table = tableEnv.fromDataStream(ds, "user, product, amount");
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
  9. // SQL query with a registered table
  10. // register the DataStream as table "Orders"
  11. tableEnv.registerDataStream("Orders", ds, "user, product, amount");
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. Table result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  15. // SQL update with a registered table
  16. // create and register a TableSink
  17. TableSink csvSink = new CsvTableSink("/path/to/file", ...);
  18. String[] fieldNames = {"product", "amount"};
  19. TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
  20. tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
  21. // run a SQL update query on the Table and emit the result to the TableSink
  22. tableEnv.sqlUpdate(
  23. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
  5. // SQL query with an inlined (unregistered) table
  6. val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
  7. val result = tableEnv.sqlQuery(
  8. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
  9. // SQL query with a registered table
  10. // register the DataStream under the name "Orders"
  11. tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. val result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  15. // SQL update with a registered table
  16. // create and register a TableSink
  17. val csvSink: CsvTableSink = new CsvTableSink("/path/to/file", ...)
  18. val fieldNames: Array[String] = Array("product", "amount")
  19. val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
  20. tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
  21. // run a SQL update query on the Table and emit the result to the TableSink
  22. tableEnv.sqlUpdate(
  23. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env)
  3. # SQL query with an inlined (unregistered) table
  4. # elements data type: BIGINT, STRING, BIGINT
  5. table = table_env.from_elements(..., ['user', 'product', 'amount'])
  6. result = table_env \
  7. .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
  8. # SQL update with a registered table
  9. # create and register a TableSink
  10. table_env.register_table("Orders", table)
  11. field_names = ["product", "amount"]
  12. field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
  13. csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
  14. table_env.register_table_sink("RubberOrders", csv_sink)
  15. # run a SQL update query on the Table and emit the result to the TableSink
  16. table_env \
  17. .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Supported Syntax

Flink parses SQL using Apache Calcite, which supports standard ANSI SQL. DDL statements are not supported by Flink.

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

  1. insert:
  2. INSERT INTO tableReference
  3. query
  4. query:
  5. values
  6. | {
  7. select
  8. | selectWithoutFrom
  9. | query UNION [ ALL ] query
  10. | query EXCEPT query
  11. | query INTERSECT query
  12. }
  13. [ ORDER BY orderItem [, orderItem ]* ]
  14. [ LIMIT { count | ALL } ]
  15. [ OFFSET start { ROW | ROWS } ]
  16. [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
  17. orderItem:
  18. expression [ ASC | DESC ]
  19. select:
  20. SELECT [ ALL | DISTINCT ]
  21. { * | projectItem [, projectItem ]* }
  22. FROM tableExpression
  23. [ WHERE booleanExpression ]
  24. [ GROUP BY { groupItem [, groupItem ]* } ]
  25. [ HAVING booleanExpression ]
  26. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  27. selectWithoutFrom:
  28. SELECT [ ALL | DISTINCT ]
  29. { * | projectItem [, projectItem ]* }
  30. projectItem:
  31. expression [ [ AS ] columnAlias ]
  32. | tableAlias . *
  33. tableExpression:
  34. tableReference [, tableReference ]*
  35. | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
  36. joinCondition:
  37. ON booleanExpression
  38. | USING '(' column [, column ]* ')'
  39. tableReference:
  40. tablePrimary
  41. [ matchRecognize ]
  42. [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
  43. tablePrimary:
  44. [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  45. | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  46. | UNNEST '(' expression ')'
  47. values:
  48. VALUES expression [, expression ]*
  49. groupItem:
  50. expression
  51. | '(' ')'
  52. | '(' expression [, expression ]* ')'
  53. | CUBE '(' expression [, expression ]* ')'
  54. | ROLLUP '(' expression [, expression ]* ')'
  55. | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  56. windowRef:
  57. windowName
  58. | windowSpec
  59. windowSpec:
  60. [ windowName ]
  61. '('
  62. [ ORDER BY orderItem [, orderItem ]* ]
  63. [ PARTITION BY expression [, expression ]* ]
  64. [
  65. RANGE numericOrIntervalExpression {PRECEDING}
  66. | ROWS numericExpression {PRECEDING}
  67. ]
  68. ')'
  69. matchRecognize:
  70. MATCH_RECOGNIZE '('
  71. [ PARTITION BY expression [, expression ]* ]
  72. [ ORDER BY orderItem [, orderItem ]* ]
  73. [ MEASURES measureColumn [, measureColumn ]* ]
  74. [ ONE ROW PER MATCH ]
  75. [ AFTER MATCH
  76. ( SKIP TO NEXT ROW
  77. | SKIP PAST LAST ROW
  78. | SKIP TO FIRST variable
  79. | SKIP TO LAST variable
  80. | SKIP TO variable )
  81. ]
  82. PATTERN '(' pattern ')'
  83. [ WITHIN intervalLiteral ]
  84. DEFINE variable AS condition [, variable AS condition ]*
  85. ')'
  86. measureColumn:
  87. expression AS alias
  88. pattern:
  89. patternTerm [ '|' patternTerm ]*
  90. patternTerm:
  91. patternFactor [ patternFactor ]*
  92. patternFactor:
  93. variable [ patternQuantifier ]
  94. patternQuantifier:
  95. '*'
  96. | '*?'
  97. | '+'
  98. | '+?'
  99. | '?'
  100. | '??'
  101. | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  102. | '{' repeat '}'

Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

  • The case of identifiers is preserved whether or not they are quoted.
  • After which, identifiers are matched case-sensitively.
  • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. "SELECT a AS my field FROM t").

String literals must be enclosed in single quotes (e.g., SELECT 'Hello World'). Duplicate a single quote for escaping (e.g., SELECT 'It''s me.'). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

  • Use the backslash (\) as escaping character (default): SELECT U&'\263A'
  • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'

Operations

Show and Use

OperationDescription
ShowBatchStreamingShow all catalogs
  1. SHOW CATALOGS;
Show all databases in the current catalog
  1. SHOW DATABASES;
Show all tables in the current database in the current catalog
  1. SHOW TABLES;
UseBatchStreamingSet current catalog for the session
  1. USE CATALOG mycatalog;
Set current database of the current catalog for the session
  1. USE mydatabase;

Scan, Projection, and Filter

OperationDescription
Scan / Select / AsBatchStreaming
  1. SELECT FROM OrdersSELECT a, c AS d FROM Orders
Where / FilterBatchStreaming
  1. SELECT FROM Orders WHERE b = 'red'SELECT FROM Orders WHERE a % 2 = 0
*User-defined Scalar Functions (Scalar UDF)BatchStreamingUDFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register scalar UDFs.
  1. SELECT PRETTY_PRINT(user) FROM Orders

Aggregations

OperationDescription
GroupBy AggregationBatchStreamingResult UpdatingNote: GroupBy on a streaming table produces an updating result. See the Dynamic Tables Streaming Concepts page for details.
  1. SELECT a, SUM(b) as dFROM OrdersGROUP BY a
GroupBy Window AggregationBatchStreamingUse a group window to compute a single result row per group. See Group Windows section for more details.
  1. SELECT user, SUM(amount)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window aggregationStreamingNote: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute
  1. SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)FROM OrdersSELECT COUNT(amount) OVER w, SUM(amount) OVER wFROM OrdersWINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
DistinctBatchStreamingResult Updating
  1. SELECT DISTINCT users FROM Orders
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.
Grouping sets, Rollup, CubeBatch
  1. SELECT SUM(amount)FROM OrdersGROUP BY GROUPING SETS ((user), (product))
HavingBatchStreaming
  1. SELECT SUM(amount)FROM OrdersGROUP BY usersHAVING SUM(amount) > 50
User-defined Aggregate Functions (UDAGG)BatchStreamingUDAGGs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDAGGs.
  1. SELECT MyAggregate(amount)FROM OrdersGROUP BY users

Joins

OperationDescription
Inner Equi-joinBatchStreamingCurrently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.
  1. SELECT FROM Orders INNER JOIN Product ON Orders.productId = Product.id
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.
Outer Equi-joinBatchStreamingResult UpdatingCurrently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.
  1. SELECT FROM Orders LEFT JOIN Product ON Orders.productId = Product.idSELECT FROM Orders RIGHT JOIN Product ON Orders.productId = Product.idSELECT FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.
Time-windowed JoinBatchStreamingNote: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.For example, the following predicates are valid window join conditions:- ltime = rtime- ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE- ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
  1. SELECT FROM Orders o, Shipments sWHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
Expanding arrays into a relationBatchStreamingUnnesting WITH ORDINALITY is not supported yet.
  1. SELECT users, tagFROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join with Table FunctionBatchStreamingJoins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function.User-defined table functions (UDTFs) must be registered before. See the UDF documentation for details on how to specify and register UDTFs.Inner JoinA row of the left (outer) table is dropped, if its table function call returns an empty result.
  1. SELECT users, tagFROM Orders, LATERAL TABLE(unnestudtf(tags)) t AS tag
Left Outer JoinIf a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
  1. SELECT users, tagFROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
Note: Currently, only literal TRUE is supported as predicate for a left outer join against a lateral table.
Join with Temporal Table FunctionStreamingTemporal tables are tables that track changes over time.A Temporal table function provides access to the state of a temporal table at a specific point in time. The syntax to join a table with a temporal table function is the same as in _Join with Table Function.Note: Currently only inner joins with temporal tables are supported.Assuming Rates is a temporal table function, the join can be expressed in SQL as follows:
  1. SELECT o_amount, r_rateFROM Orders, LATERAL TABLE (Rates(o_proctime))WHERE r_currency = o_currency
For more information please check the more detailed temporal tables concept description.
Join with Temporal TableBatchStreamingTemporal Tables are tables that track changes over time. A Temporal Table provides access to the versions of a temporal table at a specific point in time.Only inner and left joins with processing-time temporal tables are supported.The following example assumes that LatestRates is a Temporal Table which is materialized with the latest rate.
  1. SELECT o.amout, o.currency, r.rate, o.amount r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
For more information please check the more detailed Temporal Tables concept description.Only supported in Blink planner.

Set Operations

OperationDescription
UnionBatch
  1. SELECT FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION (SELECT user FROM Orders WHERE b = 0))
UnionAllBatchStreaming
  1. SELECT FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL (SELECT user FROM Orders WHERE b = 0))
Intersect / ExceptBatch
  1. SELECT FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT (SELECT user FROM Orders WHERE b = 0))
  1. SELECT FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT (SELECT user FROM Orders WHERE b = 0))
InBatchStreamingReturns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
  1. SELECT user, amountFROM OrdersWHERE product IN ( SELECT product FROM NewProducts)
Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.
ExistsBatchStreamingReturns true if the sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation.
  1. SELECT user, amountFROM OrdersWHERE product EXISTS ( SELECT product FROM NewProducts)
Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

OrderBy & Limit

OperationDescription
Order ByBatchStreamingNote: The result of streaming queries must be primarily sorted on an ascending time attribute. Additional sorting attributes are supported.
  1. SELECT FROM OrdersORDER BY orderTime
LimitBatchNote: The LIMIT clause requires an ORDER BY clause.
  1. SELECT FROM OrdersORDER BY orderTimeLIMIT 3

Top-N

Attention Top-N is only supported in Blink planner.

Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top-most records from batch/streaming table on a condition. This result set can be used for further analysis.

Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window PARTITION BY clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables.

The following shows the syntax of the TOP-N statement:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
  5. ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  6. FROM table_name)
  7. WHERE rownum <= N [AND conditions]

Parameter Specification:

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support ROW_NUMBER as the over window function. In the future, we will support RANK() and DENSE_RANK().
  • PARTITION BY col1[, col2…]: Specifies the partition columns. Each partition will have a Top-N result.
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]…]: Specifies the ordering columns. The ordering directions can be different on different columns.
  • WHERE rownum <= N: The rownum <= N is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained.
  • [AND conditions]: It is free to add other conditions in the where clause, but the other conditions can only be combined with rownum <= N using AND conjunction.

Attention in Streaming Mode The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream.It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.

The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say product_id is the unique key of the ShopSales, then the unique keys of the Top-N query are [category, rownum] and [product_id].

The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get “the top five products per category that have the maximum sales in realtime” we mentioned above.

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
  5. // register the DataStream as table "ShopSales"
  6. tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
  7. // select top-5 products per category which have the maximum sales.
  8. Table result1 = tableEnv.sqlQuery(
  9. "SELECT * " +
  10. "FROM (" +
  11. " SELECT *," +
  12. " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
  13. " FROM ShopSales)" +
  14. "WHERE row_num <= 5");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
  5. // register the DataStream under the name "ShopSales"
  6. tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
  7. // select top-5 products per category which have the maximum sales.
  8. val result1 = tableEnv.sqlQuery(
  9. """
  10. |SELECT *
  11. |FROM (
  12. | SELECT *,
  13. | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
  14. | FROM ShopSales)
  15. |WHERE row_num <= 5
  16. """.stripMargin)
No Ranking Output Optimization

As described above, the rownum field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say product-1001) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job.

The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (product-1001) needs to be sent to downstream, which can reduce much IO to the result table.

The following example shows how to optimize the above Top-N example in this way:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
  5. // register the DataStream as table "ShopSales"
  6. tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
  7. // select top-5 products per category which have the maximum sales.
  8. Table result1 = tableEnv.sqlQuery(
  9. "SELECT product_id, category, product_name, sales " + // omit row_num field in the output
  10. "FROM (" +
  11. " SELECT *," +
  12. " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
  13. " FROM ShopSales)" +
  14. "WHERE row_num <= 5");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
  5. // register the DataStream under the name "ShopSales"
  6. tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
  7. // select top-5 products per category which have the maximum sales.
  8. val result1 = tableEnv.sqlQuery(
  9. """
  10. |SELECT product_id, category, product_name, sales -- omit row_num field in the output
  11. |FROM (
  12. | SELECT *,
  13. | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
  14. | FROM ShopSales)
  15. |WHERE row_num <= 5
  16. """.stripMargin)

Attention in Streaming Mode In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the product_id is the unique key of the query, then the external table should also has product_id as the unique key.

Deduplication

Attention Deduplication is only supported in Blink planner.

Deduplication is removing rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. SUM, COUNT). So a deduplication is needed before further analysis.

Flink uses ROW_NUMBER() to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time.

The following shows the syntax of the Deduplication statement:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
  5. ORDER BY time_attr [asc|desc]) AS rownum
  6. FROM table_name)
  7. WHERE rownum = 1

Parameter Specification:

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY col1[, col2…]: Specifies the partition columns, i.e. the deduplicate key.
  • ORDER BY time_attr [asc|desc]: Specifies the ordering column, it must be a time attribute. Currently only support proctime attribute. Rowtime atttribute will be supported in the future. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
  • WHERE rownum = 1: The rownum = 1 is required for Flink to recognize this query is deduplication.

The following examples show how to specify SQL queries with Deduplication on streaming tables.

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...);
  5. // register the DataStream as table "Orders"
  6. tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime");
  7. // remove duplicate rows on order_id and keep the first occurrence row,
  8. // because there shouldn't be two orders with the same order_id.
  9. Table result1 = tableEnv.sqlQuery(
  10. "SELECT order_id, user, product, number " +
  11. "FROM (" +
  12. " SELECT *," +
  13. " ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" +
  14. " FROM Orders)" +
  15. "WHERE row_num = 1");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
  5. // register the DataStream under the name "Orders"
  6. tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime)
  7. // remove duplicate rows on order_id and keep the first occurrence row,
  8. // because there shouldn't be two orders with the same order_id.
  9. val result1 = tableEnv.sqlQuery(
  10. """
  11. |SELECT order_id, user, product, number
  12. |FROM (
  13. | SELECT *,
  14. | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
  15. | FROM Orders)
  16. |WHERE row_num = 1
  17. """.stripMargin)

Insert

OperationDescription
Insert IntoBatchStreamingOutput tables must be registered in the TableEnvironment (see Register a TableSink). Moreover, the schema of the registered table must match the schema of the query.
  1. INSERT INTO OutputTableSELECT users, tagFROM Orders

Group Windows

Group windows are defined in the GROUP BY clause of a SQL query. Just like queries with regular GROUP BY clauses, queries with a GROUP BY clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.

Group Window FunctionDescription
TUMBLE(time_attr, interval)Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (interval). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).
HOP(time_attr, interval, interval)Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second interval parameter) and hops by a specified hop interval (first interval parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).
SESSION(time_attr, interval)Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time interval of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).
Time Attributes

For SQL queries on streaming tables, the time_attr argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the documentation of time attributes to learn how to define time attributes.

For SQL on batch tables, the time_attr argument of the group window function must be an attribute of type TIMESTAMP.

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:

Auxiliary FunctionDescription
TUMBLESTART(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval)Returns the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window.
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval)Returns the timestamp of the _exclusive upper bound of the corresponding tumbling, hopping, or session window.Note: The exclusive upper bound timestamp cannot be used as a rowtime attribute in subsequent time-based operations, such as time-windowed joins and group window or over window aggregations.
TUMBLEROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval)Returns the timestamp of the _inclusive upper bound of the corresponding tumbling, hopping, or session window.The resulting attribute is a rowtime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations.
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)Returns a proctime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations.

Note: Auxiliary functions must be called with exactly same arguments as the group window function in the GROUP BY clause.

The following examples show how to specify SQL queries with group windows on streaming tables.

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // register the DataStream as table "Orders"
  6. tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
  7. // compute SUM(amount) per day (in event-time)
  8. Table result1 = tableEnv.sqlQuery(
  9. "SELECT user, " +
  10. " TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
  11. " SUM(amount) FROM Orders " +
  12. "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
  13. // compute SUM(amount) per day (in processing-time)
  14. Table result2 = tableEnv.sqlQuery(
  15. "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
  16. // compute every hour the SUM(amount) of the last 24 hours in event-time
  17. Table result3 = tableEnv.sqlQuery(
  18. "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
  19. // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
  20. Table result4 = tableEnv.sqlQuery(
  21. "SELECT user, " +
  22. " SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
  23. " SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " +
  24. " SUM(amount) " +
  25. "FROM Orders " +
  26. "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(Long, String, Int)] = env.addSource(...)
  5. // register the DataStream under the name "Orders"
  6. tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
  7. // compute SUM(amount) per day (in event-time)
  8. val result1 = tableEnv.sqlQuery(
  9. """
  10. |SELECT
  11. | user,
  12. | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
  13. | SUM(amount)
  14. | FROM Orders
  15. | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
  16. """.stripMargin)
  17. // compute SUM(amount) per day (in processing-time)
  18. val result2 = tableEnv.sqlQuery(
  19. "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
  20. // compute every hour the SUM(amount) of the last 24 hours in event-time
  21. val result3 = tableEnv.sqlQuery(
  22. "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
  23. // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
  24. val result4 = tableEnv.sqlQuery(
  25. """
  26. |SELECT
  27. | user,
  28. | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
  29. | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
  30. | SUM(amount)
  31. | FROM Orders
  32. | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
  33. """.stripMargin)

Pattern Recognition

OperationDescription
MATCH_RECOGNIZEStreamingSearches for a given pattern in a streaming table according to the MATCH_RECOGNIZE ISO standard. This makes it possible to express complex event processing (CEP) logic in SQL queries.For a more detailed description, see the dedicated page for detecting patterns in tables.
  1. SELECT T.aid, T.bid, T.cidFROM MyTableMATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES A.id AS aid, B.id AS bid, C.id AS cid PATTERN (A B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c') AS T

Drop Table

  1. DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

Drop a table with the given table name. If the table to drop does not exist, an exception is thrown.

IF EXISTS

If the table does not exist, nothing happens.

DDL

DDLs are specified with the sqlUpdate() method of the TableEnvironment. The method returns nothing for a success table creation. A Table can be register into the Catalog with a CREATE TABLE statement, then be referenced in the SQL queries in method sqlQuery() of TableEnvironment.

Note: Flink’s DDL support is not yet feature complete. Queries that include unsupported SQL features cause a TableException. The supported features of SQL DDL on batch and streaming tables are listed in the following sections.

Specifying a DDL

The following examples show how to specify a SQL DDL.

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // SQL query with a registered table
  4. // register a table named "Orders"
  5. tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  6. // run a SQL query on the Table and retrieve the result as a new Table
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  9. // SQL update with a registered table
  10. // register a TableSink
  11. tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
  12. // run a SQL update query on the Table and emit the result to the TableSink
  13. tableEnv.sqlUpdate(
  14. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // SQL query with a registered table
  4. // register a table named "Orders"
  5. tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  6. // run a SQL query on the Table and retrieve the result as a new Table
  7. val result = tableEnv.sqlQuery(
  8. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  9. // SQL update with a registered table
  10. // register a TableSink
  11. tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)");
  12. // run a SQL update query on the Table and emit the result to the TableSink
  13. tableEnv.sqlUpdate(
  14. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env)
  3. # SQL update with a registered table
  4. # register a TableSink
  5. table_env.sql_update("CREATE TABLE RubberOrders(product VARCHAR, amount INT) with (...)")
  6. # run a SQL update query on the Table and emit the result to the TableSink
  7. table_env \
  8. .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Create Table

  1. CREATE TABLE [catalog_name.][db_name.]table_name
  2. [(col_name1 col_type1 [COMMENT col_comment1], ...)]
  3. [COMMENT table_comment]
  4. [PARTITIONED BY (col_name1, col_name2, ...)]
  5. WITH (key1=val1, key2=val2, ...)

Create a table with the given table properties. If a table with the same name already exists in the database, an exception is thrown.

PARTITIONED BY

Partition the created table by the specified columns. A directory is created for each partition if this table is used as a filesystem sink.

WITH OPTIONS

Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector.

The key and value of expression key1=val1 should both be string literal. See details in Connect to External Systems for all the supported table properties of different connectors.

Notes: The table name can be of three formats: 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name. For catalog_name.db_name.table_name, the table would be registered into metastore with catalog named “catalog_name” and database named “db_name”; for db_name.table_name, the table would be registered into the current catalog of the execution table environment and database named “db_name”; for table_name, the table would be registered into the current catalog and database of the execution table environment.

Notes: The table registered with CREATE TABLE statement can be used as both table source and table sink, we can not decide if it is used as a source or sink until it is referenced in the DMLs.

Drop Table

  1. DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

Drop a table with the given table name. If the table to drop does not exist, an exception is thrown.

IF EXISTS

If the table does not exist, nothing happens.

Data Types

The SQL runtime is built on top of Flink’s DataSet and DataStream APIs. Internally, it also uses Flink’s TypeInformation to define data types. Fully supported types are listed in org.apache.flink.table.api.Types. The following table summarizes the relation between SQL Types, Table API types, and the resulting Java class.

Table APISQLJava type
Types.STRINGVARCHARjava.lang.String
Types.BOOLEANBOOLEANjava.lang.Boolean
Types.BYTETINYINTjava.lang.Byte
Types.SHORTSMALLINTjava.lang.Short
Types.INTINTEGER, INTjava.lang.Integer
Types.LONGBIGINTjava.lang.Long
Types.FLOATREAL, FLOATjava.lang.Float
Types.DOUBLEDOUBLEjava.lang.Double
Types.DECIMALDECIMALjava.math.BigDecimal
Types.SQL_DATEDATEjava.sql.Date
Types.SQL_TIMETIMEjava.sql.Time
Types.SQL_TIMESTAMPTIMESTAMP(3)java.sql.Timestamp
Types.INTERVAL_MONTHSINTERVAL YEAR TO MONTHjava.lang.Integer
Types.INTERVAL_MILLISINTERVAL DAY TO SECOND(3)java.lang.Long
Types.PRIMITIVE_ARRAYARRAYe.g. int[]
Types.OBJECT_ARRAYARRAYe.g. java.lang.Byte[]
Types.MAPMAPjava.util.HashMap
Types.MULTISETMULTISETe.g. java.util.HashMap<String, Integer> for a multiset of String
Types.ROWROWorg.apache.flink.types.Row

Generic types and (nested) composite types (e.g., POJOs, tuples, rows, Scala case classes) can be fields of a row as well.

Fields of composite types with arbitrary nesting can be accessed with value access functions.

Generic types are treated as a black box and can be passed on or processed by user-defined functions.

For DDLs, we support full data types defined in page Data Types.

Notes: Some of the data types are not supported in the sql query(the cast expression or literals). E.G. STRING, BYTES, TIME(p) WITHOUT TIME ZONE, TIME(p) WITH LOCAL TIME ZONE, TIMESTAMP(p) WITHOUT TIME ZONE, TIMESTAMP(p) WITH LOCAL TIME ZONE, ARRAY, MULTISET, ROW.

Reserved Keywords

Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. value, count).

  1. A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE