Catalogs
Catalogs provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems.
One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.
Catalog enables users to reference existing metadata in their data systems, and automatically maps them to Flink’s corresponding metadata. For example, Flink can map JDBC tables to Flink table automatically, and users don’t have to manually re-writing DDLs in Flink. Catalog greatly simplifies steps required to get started with Flink with users’ existing system, and greatly enhanced user experiences.
Catalog Types
GenericInMemoryCatalog
The GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.
JdbcCatalog
The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol. Postgres Catalog and MySQL Catalog are the only two implementations of JDBC Catalog at the moment. See JdbcCatalog documentation for more details on setting up the catalog.
HiveCatalog
The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s Hive documentation provides full details on setting up the catalog and interfacing with an existing Hive installation.
The Hive Metastore stores all meta-object names in lower case. This is unlike
GenericInMemoryCatalogwhich is case-sensitive
User-Defined Catalog
Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface.
In order to use custom catalogs with Flink SQL, users should implement a corresponding catalog factory by implementing the CatalogFactory interface. The factory is discovered using Java’s Service Provider Interfaces (SPI). Classes that implement this interface can be added to META_INF/services/org.apache.flink.table.factories.Factory in JAR files. The provided factory identifier will be used for matching against the required type property in a SQL CREATE CATALOG DDL statement.
Since Flink v1.16, TableEnvironment introduces a user class loader to have a consistent class loading behavior in table programs, SQL Client and SQL Gateway. The user classloader manages all user jars such as jar added by
ADD JARorCREATE FUNCTION .. USING JAR ..statements. User-defined catalogs should replaceThread.currentThread().getContextClassLoader()with the user class loader to load classes. Otherwise,ClassNotFoundExceptionmaybe thrown. The user class loader can be accessed viaCatalogFactory.Context#getClassLoader.
How to Create and Register Flink Tables to Catalog
Using SQL DDL
Users can use SQL DDL to create tables in catalogs in both Table API and SQL.
Java
TableEnvironment tableEnv = ...;// Create a HiveCatalogCatalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");// Register the catalogtableEnv.registerCatalog("myhive", catalog);// Create a catalog databasetableEnv.executeSql("CREATE DATABASE mydb WITH (...)");// Create a catalog tabletableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");tableEnv.listTables(); // should return the tables in current catalog and database.
Scala
val tableEnv = ...// Create a HiveCatalogval catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")// Register the catalogtableEnv.registerCatalog("myhive", catalog)// Create a catalog databasetableEnv.executeSql("CREATE DATABASE mydb WITH (...)")// Create a catalog tabletableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")tableEnv.listTables() // should return the tables in current catalog and database.
Python
from pyflink.table.catalog import HiveCatalog# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databaset_env.execute_sql("CREATE DATABASE mydb WITH (...)")# Create a catalog tablet_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")# should return the tables in current catalog and database.t_env.list_tables()
SQL Client
// the catalog should have been registered via yaml fileFlink SQL> CREATE DATABASE mydb WITH (...);Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);Flink SQL> SHOW TABLES;mytable
For detailed information, please check out Flink SQL CREATE DDL.
Using Java, Scala or Python
Users can use Java, Scala or Python to create catalog tables programmatically.
Java
import org.apache.flink.table.api.*;import org.apache.flink.table.catalog.*;import org.apache.flink.table.catalog.hive.HiveCatalog;TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());// Create a HiveCatalogCatalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");// Register the catalogtableEnv.registerCatalog("myhive", catalog);// Create a catalog databasecatalog.createDatabase("mydb", new CatalogDatabaseImpl(...));// Create a catalog tablefinal Schema schema = Schema.newBuilder().column("name", DataTypes.STRING()).column("age", DataTypes.INT()).build();tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka").schema(schema)// ….build());List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
Scala
import org.apache.flink.table.api._import org.apache.flink.table.catalog._import org.apache.flink.table.catalog.hive.HiveCatalogval tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())// Create a HiveCatalogval catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")// Register the catalogtableEnv.registerCatalog("myhive", catalog)// Create a catalog databasecatalog.createDatabase("mydb", new CatalogDatabaseImpl(...))// Create a catalog tableval schema = Schema.newBuilder().column("name", DataTypes.STRING()).column("age", DataTypes.INT()).build()tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka").schema(schema)// ….build())val tables = catalog.listTables("mydb") // tables should contain "mytable"
Python
from pyflink.table import *from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTablesettings = EnvironmentSettings.in_batch_mode()t_env = TableEnvironment.create(settings)# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databasedatabase = CatalogDatabase.create_instance({"k1": "v1"}, None)catalog.create_database("mydb", database)# Create a catalog tableschema = Schema.new_builder() \.column("name", DataTypes.STRING()) \.column("age", DataTypes.INT()) \.build()catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka").schema(schema)# ….build())# tables should contain "mytable"tables = catalog.list_tables("mydb")
Catalog API
Note: only catalog program APIs are listed here. Users can achieve many of the same functionalities with SQL DDL. For detailed DDL information, please refer to SQL CREATE DDL.
Database operations
Java/Scala
// create databasecatalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);// drop databasecatalog.dropDatabase("mydb", false);// alter databasecatalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);// get databasecatalog.getDatabase("mydb");// check if a database existcatalog.databaseExists("mydb");// list databases in a catalogcatalog.listDatabases();
Python
from pyflink.table.catalog import CatalogDatabase# create databasecatalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)catalog.create_database("mydb", catalog_database, False)# drop databasecatalog.drop_database("mydb", False)# alter databasecatalog.alter_database("mydb", catalog_database, False)# get databasecatalog.get_database("mydb")# check if a database existcatalog.database_exists("mydb")# list databases in a catalogcatalog.list_databases()
Table operations
Java/Scala
// create tablecatalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// drop tablecatalog.dropTable(new ObjectPath("mydb", "mytable"), false);// alter tablecatalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// rename tablecatalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");// get tablecatalog.getTable("mytable");// check if a table exist or notcatalog.tableExists("mytable");// list tables in a databasecatalog.listTables("mydb");
Python
from pyflink.table import *from pyflink.table.catalog import CatalogBaseTable, ObjectPathfrom pyflink.table.descriptors import Kafkatable_schema = TableSchema.builder() \.field("name", DataTypes.STRING()) \.field("age", DataTypes.INT()) \.build()table_properties = Kafka() \.version("0.11") \.start_from_earlist() \.to_properties()catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")# create tablecatalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)# drop tablecatalog.drop_table(ObjectPath("mydb", "mytable"), False)# alter tablecatalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)# rename tablecatalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")# get tablecatalog.get_table("mytable")# check if a table exist or notcatalog.table_exists("mytable")# list tables in a databasecatalog.list_tables("mydb")
View operations
Java/Scala
// create viewcatalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);// drop viewcatalog.dropTable(new ObjectPath("mydb", "myview"), false);// alter viewcatalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);// rename viewcatalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);// get viewcatalog.getTable("myview");// check if a view exist or notcatalog.tableExists("mytable");// list views in a databasecatalog.listViews("mydb");
Python
from pyflink.table import *from pyflink.table.catalog import CatalogBaseTable, ObjectPathtable_schema = TableSchema.builder() \.field("name", DataTypes.STRING()) \.field("age", DataTypes.INT()) \.build()catalog_table = CatalogBaseTable.create_view(original_query="select * from t1",expanded_query="select * from test-catalog.db1.t1",schema=table_schema,properties={},comment="This is a view")catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)# drop viewcatalog.drop_table(ObjectPath("mydb", "myview"), False)# alter viewcatalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)# rename viewcatalog.rename_table(ObjectPath("mydb", "myview"), "my_new_view", False)# get viewcatalog.get_table("myview")# check if a view exist or notcatalog.table_exists("mytable")# list views in a databasecatalog.list_views("mydb")
Partition operations
Java/Scala
// create viewcatalog.createPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// drop partitioncatalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);// alter partitioncatalog.alterPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// get partitioncatalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// check if a partition exist or notcatalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// list partitions of a tablecatalog.listPartitions(new ObjectPath("mydb", "mytable"));// list partitions of a table under a give partition speccatalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// list partitions of a table by expression filtercatalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
Python
from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartitioncatalog_partition = CatalogPartition.create_instance({}, "my partition")catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})catalog.create_partition(ObjectPath("mydb", "mytable"),catalog_partition_spec,catalog_partition,False)# drop partitioncatalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)# alter partitioncatalog.alter_partition(ObjectPath("mydb", "mytable"),CatalogPartitionSpec(...),catalog_partition,False)# get partitioncatalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)# check if a partition exist or notcatalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)# list partitions of a tablecatalog.list_partitions(ObjectPath("mydb", "mytable"))# list partitions of a table under a give partition speccatalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)
Function operations
Java/Scala
// create functioncatalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// drop functioncatalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);// alter functioncatalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// get functioncatalog.getFunction("myfunc");// check if a function exist or notcatalog.functionExists("myfunc");// list functions in a databasecatalog.listFunctions("mydb");
Python
from pyflink.table.catalog import ObjectPath, CatalogFunctioncatalog_function = CatalogFunction.create_instance(class_name="my.python.udf")# create functioncatalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)# drop functioncatalog.drop_function(ObjectPath("mydb", "myfunc"), False)# alter functioncatalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)# get functioncatalog.get_function("myfunc")# check if a function exist or notcatalog.function_exists("myfunc")# list functions in a databasecatalog.list_functions("mydb")
Table API and SQL for Catalog
Registering a Catalog
Users have access to a default in-memory catalog named default_catalog, that is always created by default. This catalog by default has a single database called default_database. Users can also register additional catalogs into an existing Flink session.
Java/Scala
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
Python
t_env.register_catalog(catalog)
YAML
All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box.
| Catalog | Type Value |
|---|---|
| GenericInMemory | generic_in_memory |
| Hive | hive |
catalogs:- name: myCatalogtype: custom_cataloghive-conf-dir: ...
Changing the Current Catalog And Database
Flink will always search for tables, views, and UDF’s in the current catalog and database.
Java/Scala
tableEnv.useCatalog("myCatalog");tableEnv.useDatabase("myDb");
Python
t_env.use_catalog("myCatalog")t_env.use_database("myDb")
SQL
Flink SQL> USE CATALOG myCatalog;Flink SQL> USE myDB;
Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.
Java/Scala
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
Python
t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")
SQL
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
List Available Catalogs
Java/Scala
tableEnv.listCatalogs();
Python
t_env.list_catalogs()
SQL
Flink SQL> show catalogs;
List Available Databases
Java/Scala
tableEnv.listDatabases();
Python
t_env.list_databases()
SQL
Flink SQL> show databases;
List Available Tables
Java/Scala
tableEnv.listTables();
Python
t_env.list_tables()
SQL
Flink SQL> show tables;