HiveServer2 Endpoint

Flink SQL Gateway supports deploying as a HiveServer2 Endpoint which is compatible with HiveServer2 wire protocol and allows users to interact (e.g. submit Hive SQL) with Flink SQL Gateway with existing Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on.

Setting Up

Before the trip of the SQL Gateway with the HiveServer2 Endpoint, please prepare the required dependencies.

Configure HiveServer2 Endpoint

The HiveServer2 Endpoint is not the default endpoint for the SQL Gateway. You can configure to use the HiveServer2 Endpoint by calling

  1. $ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=<path to hive conf>

or add the following configuration into conf/flink-conf.yaml (please replace the <path to hive conf> with your hive conf path).

  1. sql-gateway.endpoint.type: hiveserver2
  2. sql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir: <path to hive conf>

Connecting to HiveServer2

After starting the SQL Gateway, you are able to submit SQL with Apache Hive Beeline.

  1. $ ./beeline
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/Users/ohmeatball/Work/hive-related/apache-hive-2.3.9-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/usr/local/Cellar/hadoop/3.2.1_1/libexec/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  7. Beeline version 2.3.9 by Apache Hive
  8. beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl
  9. Connecting to jdbc:hive2://localhost:10000/default;auth=noSasl
  10. Enter username for jdbc:hive2://localhost:10000/default:
  11. Enter password for jdbc:hive2://localhost:10000/default:
  12. Connected to: Apache Flink (version 1.16)
  13. Driver: Hive JDBC (version 2.3.9)
  14. Transaction isolation: TRANSACTION_REPEATABLE_READ
  15. 0: jdbc:hive2://localhost:10000/default> CREATE TABLE Source (
  16. . . . . . . . . . . . . . . . . . . . .> a INT,
  17. . . . . . . . . . . . . . . . . . . . .> b STRING
  18. . . . . . . . . . . . . . . . . . . . .> );
  19. +---------+
  20. | result |
  21. +---------+
  22. | OK |
  23. +---------+
  24. 0: jdbc:hive2://localhost:10000/default> CREATE TABLE Sink (
  25. . . . . . . . . . . . . . . . . . . . .> a INT,
  26. . . . . . . . . . . . . . . . . . . . .> b STRING
  27. . . . . . . . . . . . . . . . . . . . .> );
  28. +---------+
  29. | result |
  30. +---------+
  31. | OK |
  32. +---------+
  33. 0: jdbc:hive2://localhost:10000/default> INSERT INTO Sink SELECT * FROM Source;
  34. +-----------------------------------+
  35. | job id |
  36. +-----------------------------------+
  37. | 55ff290b57829998ea6e9acc240a0676 |
  38. +-----------------------------------+
  39. 1 row selected (2.427 seconds)

Endpoint Options

Below are the options supported when creating a HiveServer2 Endpoint instance with YAML file or DDL.

KeyRequiredDefaultTypeDescription
sql-gateway.endpoint.type
required“rest”List<String>Specify which endpoint to use, here should be ‘hiveserver2’.
sql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir
required(none)StringURI to your Hive conf dir containing hive-site.xml. The URI needs to be supported by Hadoop FileSystem. If the URI is relative, i.e. without a scheme, local file system is assumed. If the option is not specified, hive-site.xml is searched in class path.
sql-gateway.endpoint.hiveserver2.catalog.default-database
optional“default”StringThe default database to use when the catalog is set as the current catalog.
sql-gateway.endpoint.hiveserver2.catalog.name
optional“hive”StringName for the pre-registered hive catalog.
sql-gateway.endpoint.hiveserver2.module.name
optional“hive”StringName for the pre-registered hive module.
sql-gateway.endpoint.hiveserver2.thrift.exponential.backoff.slot.length
optional100 msDurationBinary exponential backoff slot time for Thrift clients during login to HiveServer2,for retries until hitting Thrift client timeout
sql-gateway.endpoint.hiveserver2.thrift.host
optional(none)StringThe server address of HiveServer2 host to be used for communication.Default is empty, which means the to bind to the localhost. This is only necessary if the host has multiple network addresses.
sql-gateway.endpoint.hiveserver2.thrift.login.timeout
optional20 sDurationTimeout for Thrift clients during login to HiveServer2
sql-gateway.endpoint.hiveserver2.thrift.max.message.size
optional104857600LongMaximum message size in bytes a HS2 server will accept.
sql-gateway.endpoint.hiveserver2.thrift.port
optional10000IntegerThe port of the HiveServer2 endpoint.
sql-gateway.endpoint.hiveserver2.thrift.worker.keepalive-time
optional1 minDurationKeepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
sql-gateway.endpoint.hiveserver2.thrift.worker.threads.max
optional512IntegerThe maximum number of Thrift worker threads
sql-gateway.endpoint.hiveserver2.thrift.worker.threads.min
optional5IntegerThe minimum number of Thrift worker threads

HiveServer2 Protocol Compatibility

The Flink SQL Gateway with HiveServer2 Endpoint aims to provide the same experience compared to the HiveServer2 of Apache Hive. Therefore, HiveServer2 Endpoint automatically initialize the environment to have more consistent experience for Hive users:

  • create the Hive Catalog as the default catalog;
  • use Hive built-in function by loading Hive function module and place it first in the function module list;
  • switch to the Hive dialect (table.sql-dialect = hive);
  • switch to batch execution mode (execution.runtime-mode = BATCH);
  • execute DML statements (e.g. INSERT INTO) blocking and one by one (table.dml-sync = true).

With these essential prerequisites, you can submit the Hive SQL in Hive style but execute it in the Flink environment.

Clients & Tools

The HiveServer2 Endpoint is compatible with the HiveServer2 wire protocol. Therefore, the tools that manage the Hive SQL also work for the SQL Gateway with the HiveServer2 Endpoint. Currently, Hive JDBC, Hive Beeline, Dbeaver, Apache Superset and so on are tested to be able to connect to the Flink SQL Gateway with HiveServer2 Endpoint and submit SQL.

Hive JDBC

SQL Gateway is compatible with HiveServer2. You can write a program that uses Hive JDBC to connect to SQL Gateway. To build the program, add the following dependencies in your project pom.xml.

  1. <dependency>
  2. <groupId>org.apache.hive</groupId>
  3. <artifactId>hive-jdbc</artifactId>
  4. <version>${hive.version}</version>
  5. </dependency>

After reimport the dependencies, you can use the following program to connect and list tables in the Hive Catalog.

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.ResultSet;
  4. import java.sql.Statement;
  5. public class JdbcConnection {
  6. public static void main(String[] args) throws Exception {
  7. try (
  8. // Please replace the JDBC URI with your actual host, port and database.
  9. Connection connection = DriverManager.getConnection("jdbc:hive2://{host}:{port}/{database};auth=noSasl");
  10. Statement statement = connection.createStatement()) {
  11. statement.execute("SHOW TABLES");
  12. ResultSet resultSet = statement.getResultSet();
  13. while (resultSet.next()) {
  14. System.out.println(resultSet.getString(1));
  15. }
  16. }
  17. }
  18. }

DBeaver

DBeaver uses Hive JDBC to connect to the HiveServer2. So DBeaver can connect to the Flink SQL Gateway to submit Hive SQL. Considering the API compatibility, you can connect to the Flink SQL Gateway like HiveServer2. Please refer to the guidance about how to use DBeaver to connect to the Flink SQL Gateway with the HiveServer2 Endpoint.

Attention Currently, HiveServer2 Endpoint doesn’t support authentication. Please use the following JDBC URL to connect to the DBeaver:

  1. jdbc:hive2://{host}:{port}/{database};auth=noSasl

After the setup, you can explore Flink with DBeaver.

DBeaver

Apache Superset

Apache Superset is a powerful data exploration and visualization platform. With the API compatibility, you can connect to the Flink SQL Gateway like Hive. Please refer to the guidance for more details.

Apache Superset

Attention Currently, HiveServer2 Endpoint doesn’t support authentication. Please use the following JDBC URL to connect to the Apache Superset:

  1. hive://hive@{host}:{port}/{database}?auth=NOSASL

Streaming SQL

Flink is a batch-streaming unified engine. You can switch to the streaming SQL with the following SQL

  1. SET table.sql-dialect=default;
  2. SET execution.runtime-mode=streaming;
  3. SET table.dml-sync=false;

After that, the environment is ready to parse the Flink SQL, optimize with the streaming planner and submit the job in async mode.

Notice: The RowKind in the HiveServer2 API is always INSERT. Therefore, HiveServer2 Endpoint doesn’t support to present the CDC data.

Supported Types

The HiveServer2 Endpoint is built on the Hive2 now and supports all Hive2 available types. For Hive-compatible tables, the HiveServer2 Endpoint obeys the same rule as the HiveCatalog to convert the Flink types to Hive Types and serialize them to the thrift object. Please refer to the HiveCatalog for the type mappings.