Hive 方言

从 1.11.0 开始,在使用 Hive 方言时,Flink 允许用户用 Hive 语法来编写 SQL 语句。通过提供与 Hive 语法的兼容性,我们旨在改善与 Hive 的互操作性,并减少用户需要在 Flink 和 Hive 之间切换来执行不同语句的情况。

使用 Hive 方言

Flink 目前支持两种 SQL 方言: defaulthive。你需要先切换到 Hive 方言,然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端和 Table API 设置方言。 还要注意,你可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。

SQL 客户端

SQL 方言可以通过 table.sql-dialect 属性指定。因此你可以通过 SQL 客户端 yaml 文件中的 configuration 部分来设置初始方言。

  1. execution:
  2. type: batch
  3. result-mode: table
  4. configuration:
  5. table.sql-dialect: hive

你同样可以在 SQL 客户端启动后设置方言。

  1. Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
  2. [INFO] Session property has been set.
  3. Flink SQL> set table.sql-dialect=default; -- to use default dialect
  4. [INFO] Session property has been set.

Table API

你可以使用 Table API 为 TableEnvironment 设置方言。

Java

  1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. // to use hive dialect
  4. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  5. // to use default dialect
  6. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

Python

  1. from pyflink.table import *
  2. settings = EnvironmentSettings.in_batch_mode()
  3. t_env = TableEnvironment.create(settings)
  4. # to use hive dialect
  5. t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
  6. # to use default dialect
  7. t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)

DDL

本章节列出了 Hive 方言支持的 DDL 语句。我们主要关注语法。你可以参考 Hive 文档 了解每个 DDL 语句的语义。

CATALOG

Show

  1. SHOW CURRENT CATALOG;

DATABASE

Show

  1. SHOW DATABASES;

Create

  1. CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
  2. [COMMENT database_comment]
  3. [LOCATION fs_path]
  4. [WITH DBPROPERTIES (property_name=property_value, ...)];

Alter

Update Properties
  1. ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...);
Update Owner
  1. ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role;
Update Location
  1. ALTER (DATABASE|SCHEMA) database_name SET LOCATION fs_path;

Drop

  1. DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];

Use

  1. USE database_name;

TABLE

Show

  1. SHOW TABLES;

Create

  1. CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
  2. [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])]
  3. [COMMENT table_comment]
  4. [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  5. [
  6. [ROW FORMAT row_format]
  7. [STORED AS file_format]
  8. ]
  9. [LOCATION fs_path]
  10. [TBLPROPERTIES (property_name=property_value, ...)]
  11. row_format:
  12. : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char]
  13. [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
  14. [NULL DEFINED AS char]
  15. | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)]
  16. file_format:
  17. : SEQUENCEFILE
  18. | TEXTFILE
  19. | RCFILE
  20. | ORC
  21. | PARQUET
  22. | AVRO
  23. | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
  24. column_constraint:
  25. : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]
  26. table_constraint:
  27. : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

Alter

Rename
  1. ALTER TABLE table_name RENAME TO new_table_name;
Update Properties
  1. ALTER TABLE table_name SET TBLPROPERTIES (property_name = property_value, property_name = property_value, ... );
Update Location
  1. ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION fs_path;

如果指定了 partition_spec,那么必须完整,即具有所有分区列的值。如果指定了,该操作将作用在对应分区上而不是表上。

Update File Format
  1. ALTER TABLE table_name [PARTITION partition_spec] SET FILEFORMAT file_format;

如果指定了 partition_spec,那么必须完整,即具有所有分区列的值。如果指定了,该操作将作用在对应分区上而不是表上。

Update SerDe Properties
  1. ALTER TABLE table_name [PARTITION partition_spec] SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties];
  2. ALTER TABLE table_name [PARTITION partition_spec] SET SERDEPROPERTIES serde_properties;
  3. serde_properties:
  4. : (property_name = property_value, property_name = property_value, ... )

如果指定了 partition_spec,那么必须完整,即具有所有分区列的值。如果指定了,该操作将作用在对应分区上而不是表上。

Add Partitions
  1. ALTER TABLE table_name ADD [IF NOT EXISTS] (PARTITION partition_spec [LOCATION fs_path])+;
Drop Partitions
  1. ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...];
Add/Replace Columns
  1. ALTER TABLE table_name
  2. ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
  3. [CASCADE|RESTRICT]
Change Column
  1. ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type
  2. [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];

Drop

  1. DROP TABLE [IF EXISTS] table_name;

VIEW

Create

  1. CREATE VIEW [IF NOT EXISTS] view_name [(column_name, ...) ]
  2. [COMMENT view_comment]
  3. [TBLPROPERTIES (property_name = property_value, ...)]
  4. AS SELECT ...;

Alter

注意: 变更视图只在 Table API 中有效,SQL 客户端不支持。

Rename
  1. ALTER VIEW view_name RENAME TO new_view_name;
Update Properties
  1. ALTER VIEW view_name SET TBLPROPERTIES (property_name = property_value, ... );
Update As Select
  1. ALTER VIEW view_name AS select_statement;

Drop

  1. DROP VIEW [IF EXISTS] view_name;

FUNCTION

Show

  1. SHOW FUNCTIONS;

Create

  1. CREATE FUNCTION function_name AS class_name;

Drop

  1. DROP FUNCTION [IF EXISTS] function_name;

DML & DQL Beta

Hive 方言支持常用的 Hive DMLDQL 。 下表列出了一些 Hive 方言支持的语法。

为了实现更好的语法和语义的兼容,强烈建议使用 HiveModule 并将其放在 Module 列表的首位,以便在函数解析时优先使用 Hive 内置函数。

Hive 方言不再支持 Flink SQL 语法 。 若需使用 Flink 语法,请切换到 default 方言。

以下是一个使用 Hive 方言的示例。

  1. Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
  2. [INFO] Execute statement succeed.
  3. Flink SQL> use catalog myhive;
  4. [INFO] Execute statement succeed.
  5. Flink SQL> load module hive;
  6. [INFO] Execute statement succeed.
  7. Flink SQL> use modules hive,core;
  8. [INFO] Execute statement succeed.
  9. Flink SQL> set table.sql-dialect=hive;
  10. [INFO] Session property has been set.
  11. Flink SQL> select explode(array(1,2,3)); -- call hive udtf
  12. +-----+
  13. | col |
  14. +-----+
  15. | 1 |
  16. | 2 |
  17. | 3 |
  18. +-----+
  19. 3 rows in set
  20. Flink SQL> create table tbl (key int,value string);
  21. [INFO] Execute statement succeed.
  22. Flink SQL> insert overwrite table tbl values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');
  23. [INFO] Submitting SQL update statement to the cluster...
  24. [INFO] SQL update statement has been successfully submitted to the cluster:
  25. Flink SQL> select * from tbl cluster by key; -- run cluster by
  26. 2021-04-22 16:13:57,005 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input paths to process : 1
  27. +-----+-------+
  28. | key | value |
  29. +-----+-------+
  30. | 1 | a |
  31. | 1 | a |
  32. | 5 | e |
  33. | 2 | b |
  34. | 3 | c |
  35. | 3 | c |
  36. | 3 | c |
  37. | 4 | d |
  38. +-----+-------+
  39. 8 rows in set

注意

以下是使用 Hive 方言的一些注意事项。

  • Hive 方言只能用于操作 Hive 对象,并要求当前 Catalog 是一个 HiveCatalog
  • Hive 方言只支持 db.table 这种两级的标识符,不支持带有 Catalog 名字的标识符。
  • 虽然所有 Hive 版本支持相同的语法,但是一些特定的功能是否可用仍取决于你使用的Hive 版本。例如,更新数据库位置 只在 Hive-2.4.0 或更高版本支持。
  • 执行 DML 和 DQL 时应该使用 HiveModule