Catalogs Beta

Catalogs provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems.

One of the most crucial aspects of data processing is managing metadata.It may be transient metadata like temporary tables, or UDFs registered against the table environment.Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.

Catalog Types

GenericInMemoryCatalog

Flink sessions always have a built-in GenericInMemoryCatalog named default_catalog, which has a built-in default database named default_database.All temporary metadata, such tables defined using TableEnvironment#registerTable is registered to this catalog.

HiveCatalog

The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s Hive documentation provides full details on setting up the catalog and interfacing with an existing Hive installation.

Warning The Hive Metastore stores all meta-object names in lower case. This is unlike GenericInMemoryCatalog which is case-sensitive

User-Defined Catalog

Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface.To use custom catalogs in SQL CLI, users should develop both a catalog and its corresponding catalog factory by implementing the CatalogFactory interface.

The catalog factory defines a set of properties for configuring the catalog when the SQL CLI bootstraps.The set of properties will be passed to a discovery service where the service tries to match the properties to a CatalogFactory and initiate a corresponding catalog instance.

Catalog API

Registering a Catalog

Users can register additional catalogs into an existing Flink session.

  1. tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box.

CatalogType Value
GenericInMemorygeneric_in_memory
Hivehive
  1. catalogs:
  2. - name: myCatalog
  3. type: custom_catalog
  4. hive-conf-dir: ...

Changing the Current Catalog And Database

Flink will always search for tables, views, and UDF’s in the current catalog and database.

  1. tableEnv.useCatalog("myCatalog");
  2. tableEnv.useDatabase("myDb");
  1. Flink SQL> USE CATALOG myCatalog;
  2. Flink SQL> USE myDB;

Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.

  1. tableEnv.scan("not_the_current_catalog", "not_the_current_db", "my_table");
  1. Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

List Available Catalogs

  1. tableEnv.listCatalogs();
  1. Flink SQL> show catalogs;

List Available Databases

  1. tableEnv.listDatabases();
  1. Flink SQL> show databases;

List Available Tables

  1. tableEnv.listTables();
  1. Flink SQL> show tables;