HiveCatalog

Hive Metastore has evolved into the de facto metadata hub over the years in Hadoop ecosystem. Many companies have a singleHive Metastore service instance in their production to manage all of their metadata, either Hive metadata or non-Hive metadata, as the source of truth.

For users who have both Hive and Flink deployments, HiveCatalog enables them to use Hive Metastore to manage Flink’s metadata.

For users who have just Flink deployment, HiveCatalog is the only persistent catalog provided out-of-box by Flink.Without a persistent catalog, users using Flink SQL CREATE DDL have to repeatedlycreate meta-objects like a Kafka table in each session, which wastes a lot of time. HiveCatalog fills this gap by empoweringusers to create tables and other meta-objects only once, and reference and manage them with convenience later on across sessions.

Set up HiveCatalog

Dependencies

Setting up a HiveCatalog in Flink requires the same dependencies as those of an overall Flink-Hive integration.

Configuration

Setting up a HiveCatalog in Flink requires the same configuration as those of an overall Flink-Hive integration.

How to use HiveCatalog

Once configured properly, HiveCatalog should just work out of box. Users can create Flink meta-objects with DDL, and shoudsee them immediately afterwards.

HiveCatalog can be used to handle two kinds of tables: Hive-compatible tables and generic tables. Hive-compatible tablesare those stored in a Hive-compatible way, in terms of both metadata and data in the storage layer. Therefore, Hive-compatible tablescreated via Flink can be queried from Hive side.

Generic tables, on the other hand, are specific to Flink. When creating generic tables with HiveCatalog, we’re just usingHMS to persist the metadata. While these tables are visible to Hive, it’s unlikely Hive is able to understandthe metadata. And therefore using such tables in Hive leads to undefined behavior.

Flink uses the property ‘is_generic’ to tell whether a table is Hive-compatible or generic. When creating a table withHiveCatalog, it’s by default considered generic. If you’d like to create a Hive-compatible table, make sure to setis_generic to false in your table properties.

As stated above, generic tables shouldn’t be used from Hive. In Hive CLI, you can call DESCRIBE FORMATTED for a table anddecide whether it’s generic or not by checking the is_generic property. Generic tables will have is_generic=true.

Example

We will walk through a simple example here.

step 1: set up a Hive Metastore

Have a Hive Metastore running.

Here, we set up a local Hive Metastore and our hive-site.xml file in local path /opt/hive-conf/hive-site.xml.We have some configs like the following:

  1. <configuration>
  2. <property>
  3. <name>javax.jdo.option.ConnectionURL</name>
  4. <value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
  5. <description>metadata is stored in a MySQL server</description>
  6. </property>
  7. <property>
  8. <name>javax.jdo.option.ConnectionDriverName</name>
  9. <value>com.mysql.jdbc.Driver</value>
  10. <description>MySQL JDBC driver class</description>
  11. </property>
  12. <property>
  13. <name>javax.jdo.option.ConnectionUserName</name>
  14. <value>...</value>
  15. <description>user name for connecting to mysql server</description>
  16. </property>
  17. <property>
  18. <name>javax.jdo.option.ConnectionPassword</name>
  19. <value>...</value>
  20. <description>password for connecting to mysql server</description>
  21. </property>
  22. <property>
  23. <name>hive.metastore.uris</name>
  24. <value>thrift://localhost:9083</value>
  25. <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
  26. </property>
  27. <property>
  28. <name>hive.metastore.schema.verification</name>
  29. <value>true</value>
  30. </property>
  31. </configuration>

Test connection to the HMS with Hive Cli. Running some commands, we can see we have a database named default and there’s no table in it.

  1. hive> show databases;
  2. OK
  3. default
  4. Time taken: 0.032 seconds, Fetched: 1 row(s)
  5. hive> show tables;
  6. OK
  7. Time taken: 0.028 seconds, Fetched: 0 row(s)

Add all Hive dependencies to /lib dir in Flink distribution, and modify SQL CLI’s yaml config file sql-cli-defaults.yaml as following:

  1. execution:
  2. planner: blink
  3. type: streaming
  4. ...
  5. current-catalog: myhive # set the HiveCatalog as the current catalog of the session
  6. current-database: mydatabase
  7. catalogs:
  8. - name: myhive
  9. type: hive
  10. hive-conf-dir: /opt/hive-conf # contains hive-site.xml
  11. hive-version: 2.3.4

step 3: set up a Kafka cluster

Bootstrap a local Kafka 2.3.0 cluster with a topic named “test”, and produce some simple data to the topic as tuple of name and age.

  1. localhost$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  2. >tom,15
  3. >john,21

These message can be seen by starting a Kafka console consumer.

  1. localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  2. tom,15
  3. john,21

Start Flink SQL Client, create a simple Kafka 2.3.0 table via DDL, and verify its schema.

  1. Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
  2. 'connector.type' = 'kafka',
  3. 'connector.version' = 'universal',
  4. 'connector.topic' = 'test',
  5. 'connector.properties.zookeeper.connect' = 'localhost:2181',
  6. 'connector.properties.bootstrap.servers' = 'localhost:9092',
  7. 'format.type' = 'csv',
  8. 'update-mode' = 'append'
  9. );
  10. [INFO] Table has been created.
  11. Flink SQL> DESCRIBE mykafka;
  12. root
  13. |-- name: STRING
  14. |-- age: INT

Verify the table is also visible to Hive via Hive Cli, and note that the table has property is_generic=true:

  1. hive> show tables;
  2. OK
  3. mykafka
  4. Time taken: 0.038 seconds, Fetched: 1 row(s)
  5. hive> describe formatted mykafka;
  6. OK
  7. # col_name data_type comment
  8. # Detailed Table Information
  9. Database: default
  10. Owner: null
  11. CreateTime: ......
  12. LastAccessTime: UNKNOWN
  13. Retention: 0
  14. Location: ......
  15. Table Type: MANAGED_TABLE
  16. Table Parameters:
  17. flink.connector.properties.bootstrap.servers localhost:9092
  18. flink.connector.properties.zookeeper.connect localhost:2181
  19. flink.connector.topic test
  20. flink.connector.type kafka
  21. flink.connector.version universal
  22. flink.format.type csv
  23. flink.generic.table.schema.0.data-type VARCHAR(2147483647)
  24. flink.generic.table.schema.0.name name
  25. flink.generic.table.schema.1.data-type INT
  26. flink.generic.table.schema.1.name age
  27. flink.update-mode append
  28. is_generic true
  29. transient_lastDdlTime ......
  30. # Storage Information
  31. SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  32. InputFormat: org.apache.hadoop.mapred.TextInputFormat
  33. OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
  34. Compressed: No
  35. Num Buckets: -1
  36. Bucket Columns: []
  37. Sort Columns: []
  38. Storage Desc Params:
  39. serialization.format 1
  40. Time taken: 0.158 seconds, Fetched: 36 row(s)

Run a simple select query from Flink SQL Client in a Flink cluster, either standalone or yarn-session.

  1. Flink SQL> select * from mykafka;

Produce some more messages in the Kafka topic

  1. localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  2. tom,15
  3. john,21
  4. kitty,30
  5. amy,24
  6. kaiky,18

You should see results produced by Flink in SQL Client now, as:

  1. SQL Query Result (Table)
  2. Refresh: 1 s Page: Last of 1
  3. name age
  4. tom 15
  5. john 21
  6. kitty 30
  7. amy 24
  8. kaiky 18

Supported Types

HiveCatalog supports all Flink types for generic tables.

For Hive-compatible tables, HiveCatalog needs to map Flink data types to corresponding Hive types as described inthe following table:

Flink Data TypeHive Data Type
CHAR(p)CHAR(p)
VARCHAR(p)VARCHAR(p)
STRINGSTRING
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s)DECIMAL(p, s)
DATEDATE
TIMESTAMP(9)TIMESTAMP
BYTESBINARY
ARRAY<T>LIST<T>
MAP<K, V>MAP<K, V>
ROWSTRUCT

Something to note about the type mapping:

  • Hive’s CHAR(p) has a maximum length of 255
  • Hive’s VARCHAR(p) has a maximum length of 65535
  • Hive’s MAP only supports primitive key types while Flink’s MAP can be any data type
  • Hive’s UNION type is not supported
  • Hive’s TIMESTAMP always has precision 9 and doesn’t support other precisions. Hive UDFs, on the other hand, can process TIMESTAMP values with a precision <= 9.
  • Hive doesn’t support Flink’s TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, and MULTISET
  • Flink’s INTERVAL type cannot be mapped to Hive INTERVAL type yet