ALTER 语句

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。

Flink SQL 目前支持以下 ALTER 语句:

  • ALTER TABLE
  • ALTER VIEW
  • ALTER DATABASE
  • ALTER FUNCTION

执行 ALTER 语句

Java

可以使用 TableEnvironment 中的 executeSql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

Scala

可以使用 TableEnvironment 中的 executeSql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

Python

可以使用 TableEnvironment 中的 execute_sql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,execute_sql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

SQL CLI

可以在 SQL CLI 中执行 ALTER 语句。

以下的例子展示了如何在 SQL CLI 中执行一个 ALTER 语句。

Java

  1. TableEnvironment tableEnv = TableEnvironment.create(...);
  2. // 注册名为 “Orders” 的表
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
  4. // 字符串数组: ["Orders"]
  5. String[] tables = tableEnv.listTables();
  6. // or tableEnv.executeSql("SHOW TABLES").print();
  7. // 新增列 `order` 并置于第一位
  8. tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST");
  9. // 新增更多列, 以及主键和 watermark
  10. tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)");
  11. // 修改列类型, 注释及 watermark 策略
  12. tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)");
  13. // 删除 watermark
  14. tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK");
  15. // 删除列
  16. tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)");
  17. // 重命名列
  18. tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id");
  19. // "Orders" 的表名改为 "NewOrders"
  20. tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders");
  21. // 字符串数组:["NewOrders"]
  22. String[] tables = tableEnv.listTables();
  23. // or tableEnv.executeSql("SHOW TABLES").print();

Scala

  1. val tableEnv = TableEnvironment.create(...)
  2. // 注册名为 “Orders” 的表
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. // 新增列 `order` 并置于第一位
  5. tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST")
  6. // 新增更多列, 以及主键和 watermark
  7. tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)")
  8. // 修改列类型, 注释, 以及主键和 watermark
  9. tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)")
  10. // 删除 watermark
  11. tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK")
  12. // 删除列
  13. tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)")
  14. // 重命名列
  15. tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id")
  16. // 字符串数组: ["Orders"]
  17. val tables = tableEnv.listTables()
  18. // or tableEnv.executeSql("SHOW TABLES").print()
  19. // rename "Orders" to "NewOrders"
  20. tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders")
  21. // 字符串数组:["NewOrders"]
  22. val tables = tableEnv.listTables()
  23. // or tableEnv.executeSql("SHOW TABLES").print()

Python

  1. table_env = TableEnvironment.create(...)
  2. # 字符串数组: ["Orders"]
  3. tables = table_env.list_tables()
  4. # or table_env.execute_sql("SHOW TABLES").print()
  5. # 新增列 `order` 并置于第一位
  6. table_env.execute_sql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST");
  7. # 新增更多列, 主键及 watermark
  8. table_env.execute_sql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)");
  9. # 修改列类型, 列注释, 主键及 watermark
  10. table_env.execute_sql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)");
  11. # 删除 watermark
  12. table_env.execute_sql("ALTER TABLE Orders DROP WATERMARK");
  13. # 删除列
  14. table_env.execute_sql("ALTER TABLE Orders DROP (amount, ts, category)");
  15. # 重命名列
  16. table_env.execute_sql("ALTER TABLE Orders RENAME `order` TO order_id");
  17. # 把 "Orders" 的表名改为 "NewOrders"
  18. table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders");
  19. # 字符串数组:["NewOrders"]
  20. tables = table_env.list_tables()
  21. # or table_env.execute_sql("SHOW TABLES").print()

SQL CLI

  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Execute statement succeed.
  3. Flink SQL> ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST;
  4. [INFO] Execute statement succeed.
  5. Flink SQL> DESCRIBE Orders;
  6. +---------+--------+------+-----+--------+-----------+------------------+
  7. | name | type | null | key | extras | watermark | comment |
  8. +---------+--------+------+-----+--------+-----------+------------------+
  9. | order | INT | TRUE | | | | order identifier |
  10. | user | BIGINT | TRUE | | | | |
  11. | product | STRING | TRUE | | | | |
  12. | amount | INT | TRUE | | | | |
  13. +---------+--------+------+-----+--------+-----------+------------------+
  14. 4 rows in set
  15. Flink SQL> ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR);
  16. [INFO] Execute statement succeed.
  17. Flink SQL> DESCRIBE Orders;
  18. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  19. | name | type | null | key | extras | watermark | comment |
  20. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  21. | order | INT | FALSE | PRI(order) | | | order identifier |
  22. | user | BIGINT | TRUE | | | | |
  23. | product | STRING | TRUE | | | | |
  24. | category | STRING | TRUE | | | | |
  25. | amount | INT | TRUE | | | | |
  26. | ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' HOUR | |
  27. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  28. 6 rows in set
  29. Flink SQL> ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts);
  30. [INFO] Execute statement succeed.
  31. Flink SQL> DESCRIBE Orders;
  32. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  33. | name | type | null | key | extras | watermark | comment |
  34. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  35. | order | INT | FALSE | PRI(order) | | | order identifier |
  36. | category | STRING | TRUE | | | | category identifier |
  37. | user | BIGINT | TRUE | | | | |
  38. | product | STRING | TRUE | | | | |
  39. | amount | DOUBLE | FALSE | | | | |
  40. | ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` | |
  41. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  42. 6 rows in set
  43. Flink SQL> ALTER TABLE Orders DROP WATERMARK;
  44. [INFO] Execute statement succeed.
  45. Flink SQL> DESCRIBE Orders;
  46. +----------+--------------+-------+------------+--------+-----------+---------------------+
  47. | name | type | null | key | extras | watermark | comment |
  48. +----------+--------------+-------+------------+--------+-----------+---------------------+
  49. | order | INT | FALSE | PRI(order) | | | order identifier |
  50. | category | STRING | TRUE | | | | category identifier |
  51. | user | BIGINT | TRUE | | | | |
  52. | product | STRING | TRUE | | | | |
  53. | amount | DOUBLE | FALSE | | | | |
  54. | ts | TIMESTAMP(3) | TRUE | | | | |
  55. +----------+--------------+-------+------------+--------+-----------+---------------------+
  56. 6 rows in set
  57. Flink SQL> ALTER TABLE Orders DROP (amount, ts, category);
  58. [INFO] Execute statement succeed.
  59. Flink SQL> DESCRIBE Orders;
  60. +---------+--------+-------+------------+--------+-----------+------------------+
  61. | name | type | null | key | extras | watermark | comment |
  62. +---------+--------+-------+------------+--------+-----------+------------------+
  63. | order | INT | FALSE | PRI(order) | | | order identifier |
  64. | user | BIGINT | TRUE | | | | |
  65. | product | STRING | TRUE | | | | |
  66. +---------+--------+-------+------------+--------+-----------+------------------+
  67. 3 rows in set
  68. Flink SQL> ALTER TABLE Orders RENAME `order` to `order_id`;
  69. [INFO] Execute statement succeed.
  70. Flink SQL> DESCRIBE Orders;
  71. +----------+--------+-------+---------------+--------+-----------+------------------+
  72. | name | type | null | key | extras | watermark | comment |
  73. +----------+--------+-------+---------------+--------+-----------+------------------+
  74. | order_id | INT | FALSE | PRI(order_id) | | | order identifier |
  75. | user | BIGINT | TRUE | | | | |
  76. | product | STRING | TRUE | | | | |
  77. +----------+--------+-------+---------------+--------+-----------+------------------+
  78. 3 rows in set
  79. Flink SQL> SHOW TABLES;
  80. +------------+
  81. | table name |
  82. +------------+
  83. | Orders |
  84. +------------+
  85. 1 row in set
  86. Flink SQL> ALTER TABLE Orders RENAME TO NewOrders;
  87. [INFO] Execute statement succeed.
  88. Flink SQL> SHOW TABLES;
  89. +------------+
  90. | table name |
  91. +------------+
  92. | NewOrders |
  93. +------------+
  94. 1 row in set

ALTER TABLE

当前支持的 ALTER TABLE 语法如下

  1. ALTER TABLE [IF EXISTS] table_name {
  2. ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...]}
  3. | MODIFY { <schema_component> | (<schema_component> [, ...]) }
  4. | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...]}
  5. | RENAME old_column_name TO new_column_name
  6. | RENAME TO new_table_name
  7. | SET (key1=val1, ...)
  8. | RESET (key1, ...)
  9. }
  10. <schema_component>:
  11. { <column_component> | <constraint_component> | <watermark_component> }
  12. <column_component>:
  13. column_name <column_definition> [FIRST | AFTER column_name]
  14. <constraint_component>:
  15. [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
  16. <watermark_component>:
  17. WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  18. <column_definition>:
  19. { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment]
  20. <physical_column_definition>:
  21. column_type
  22. <metadata_column_definition>:
  23. column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
  24. <computed_column_definition>:
  25. AS computed_column_expression
  26. <partition_component>:
  27. PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)]

IF EXISTS

若表不存在,则不进行任何操作。

ADD

使用 ADD 语句向已有表中增加 columnsconstraintswatermark, partitions

向表新增列时可通过 FIRST or AFTER col_name 指定位置,不指定位置时默认追加在最后。

ADD 语句示例如下。

  1. -- 新增一列
  2. ALTER TABLE MyTable ADD category_id STRING COMMENT 'identifier of the category';
  3. -- 新增列,主键和 watermark
  4. ALTER TABLE MyTable ADD (
  5. log_ts STRING COMMENT 'log timestamp string' FIRST,
  6. ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
  7. PRIMARY KEY (id) NOT ENFORCED,
  8. WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
  9. );
  10. -- 新增一个分区
  11. ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1');
  12. -- 新增两个分区
  13. ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2');

注意 指定列为主键列时会隐式修改该列的 nullability 为 false。

MODIFY

使用 MODIFY 语句修改列的位置 、类型 、注释 、nullability,主键或 watermark。

可使用 FIRSTAFTER col_name 将已有列移动至指定位置,不指定时默认保持位置不变。

MODIFY 语句示例如下。

  1. -- modify a column type, comment and position
  2. ALTER TABLE MyTable MODIFY measurement double COMMENT 'unit is bytes per second' AFTER `id`;
  3. -- modify definition of column log_ts and ts, primary key, watermark. They must exist in table schema
  4. ALTER TABLE MyTable MODIFY (
  5. log_ts STRING COMMENT 'log timestamp string' AFTER `id`, -- reorder columns
  6. ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
  7. PRIMARY KEY (id) NOT ENFORCED,
  8. WATERMARK FOR ts AS ts -- modify watermark strategy
  9. );

注意 指定列为主键列时会隐式修改该列的 nullability 为 false。

DROP

使用 DROP 语句删除列 、主键 、 分区或 watermark。

DROP 语句示例如下。

  1. -- 删除一个列
  2. ALTER TABLE MyTable DROP measurement;
  3. -- 删除多个列
  4. ALTER TABLE MyTable DROP (col1, col2, col3);
  5. -- 删除主键
  6. ALTER TABLE MyTable DROP PRIMARY KEY;
  7. -- 删除一个分区
  8. ALTER TABLE MyTable DROP PARTITION (`id` = 1);
  9. -- 删除两个分区
  10. ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2);
  11. -- 删除 watermark
  12. ALTER TABLE MyTable DROP WATERMARK;

RENAME

使用 RENAME 语句修改列名或表名。

RENAME 语句示例如下。

  1. -- rename column
  2. ALTER TABLE MyTable RENAME request_body TO payload;
  3. -- rename table
  4. ALTER TABLE MyTable RENAME TO MyTable2;

SET

为指定的表设置一个或多个属性。若个别属性已经存在于表中,则使用新值覆盖旧值。

SET 语句示例如下。

  1. -- set 'rows-per-second'
  2. ALTER TABLE DataGenSource SET ('rows-per-second' = '10');

RESET

为指定的表重置一个或多个属性。

RESET 语句示例如下。

  1. -- reset 'rows-per-second' to the default value
  2. ALTER TABLE DataGenSource RESET ('rows-per-second');

ALTER VIEW

  1. ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name

Renames a given view to a new name within the same catalog and database.

  1. ALTER VIEW [catalog_name.][db_name.]view_name AS new_query_expression

Changes the underlying query defining the given view to a new query.

ALTER DATABASE

  1. ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。

ALTER FUNCTION

  1. ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  2. [IF EXISTS] [catalog_name.][db_name.]function_name
  3. AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

修改一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个新的 identifier ,可指定 language tag 。若函数不存在,删除会抛出异常。

如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。关于 JAVA/SCALA UDF 的实现,请参考 自定义函数

如果 language tag 是 PYTHON , 则 identifier 是 UDF 对象的全限定名,例如 pyflink.table.tests.test_udf.add。关于 PYTHON UDF 的实现,请参考 Python UDFs

TEMPORARY

修改一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。

TEMPORARY SYSTEM

修改一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。

IF EXISTS

若函数不存在,则不进行任何操作。

LANGUAGE JAVA|SCALA|PYTHON

Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。