Catalogs

Catalogs provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems.

One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.

Catalog enables users to reference existing metadata in their data systems, and automatically maps them to Flink’s corresponding metadata. For example, Flink can map JDBC tables to Flink table automatically, and users don’t have to manually re-writing DDLs in Flink. Catalog greatly simplifies steps required to get started with Flink with users’ existing system, and greatly enhanced user experiences.

Catalog Types

GenericInMemoryCatalog

The GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.

JdbcCatalog

The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol. PostgresCatalog is the only implementation of JDBC Catalog at the moment. See JdbcCatalog documentation for more details on setting up the catalog.

HiveCatalog

The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s Hive documentation provides full details on setting up the catalog and interfacing with an existing Hive installation.

Warning The Hive Metastore stores all meta-object names in lower case. This is unlike GenericInMemoryCatalog which is case-sensitive

User-Defined Catalog

Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface. To use custom catalogs in SQL CLI, users should develop both a catalog and its corresponding catalog factory by implementing the CatalogFactory interface.

The catalog factory defines a set of properties for configuring the catalog when the SQL CLI bootstraps. The set of properties will be passed to a discovery service where the service tries to match the properties to a CatalogFactory and initiate a corresponding catalog instance.

How to Create and Register Flink Tables to Catalog

Using SQL DDL

Users can use SQL DDL to create tables in catalogs in both Table API and SQL.

  1. TableEnvironment tableEnv = ...
  2. // Create a HiveCatalog
  3. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog);
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
  10. tableEnv.listTables(); // should return the tables in current catalog and database.
  1. val tableEnv = ...
  2. // Create a HiveCatalog
  3. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog)
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  10. tableEnv.listTables() // should return the tables in current catalog and database.
  1. from pyflink.table.catalog import HiveCatalog
  2. # Create a HiveCatalog
  3. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  4. # Register the catalog
  5. t_env.register_catalog("myhive", catalog)
  6. # Create a catalog database
  7. t_env.execute_sql("CREATE DATABASE mydb WITH (...)")
  8. # Create a catalog table
  9. t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  10. # should return the tables in current catalog and database.
  11. t_env.list_tables()
  1. // the catalog should have been registered via yaml file
  2. Flink SQL> CREATE DATABASE mydb WITH (...);
  3. Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. mytable

For detailed information, please check out Flink SQL CREATE DDL.

Using Java, Scala or Python

Users can use Java, Scala or Python to create catalog tables programmatically.

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.catalog.*;
  3. import org.apache.flink.table.catalog.hive.HiveCatalog;
  4. import org.apache.flink.table.descriptors.Kafka;
  5. TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
  6. // Create a HiveCatalog
  7. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  8. // Register the catalog
  9. tableEnv.registerCatalog("myhive", catalog);
  10. // Create a catalog database
  11. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
  12. // Create a catalog table
  13. TableSchema schema = TableSchema.builder()
  14. .field("name", DataTypes.STRING())
  15. .field("age", DataTypes.INT())
  16. .build();
  17. catalog.createTable(
  18. new ObjectPath("mydb", "mytable"),
  19. new CatalogTableImpl(
  20. schema,
  21. new Kafka()
  22. .version("0.11")
  23. ....
  24. .startFromEarlist()
  25. .toProperties(),
  26. "my comment"
  27. ),
  28. false
  29. );
  30. List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.catalog._
  3. import org.apache.flink.table.catalog.hive.HiveCatalog
  4. import org.apache.flink.table.descriptors.Kafka
  5. val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
  6. // Create a HiveCatalog
  7. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
  8. // Register the catalog
  9. tableEnv.registerCatalog("myhive", catalog)
  10. // Create a catalog database
  11. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
  12. // Create a catalog table
  13. val schema = TableSchema.builder()
  14. .field("name", DataTypes.STRING())
  15. .field("age", DataTypes.INT())
  16. .build()
  17. catalog.createTable(
  18. new ObjectPath("mydb", "mytable"),
  19. new CatalogTableImpl(
  20. schema,
  21. new Kafka()
  22. .version("0.11")
  23. ....
  24. .startFromEarlist()
  25. .toProperties(),
  26. "my comment"
  27. ),
  28. false
  29. )
  30. val tables = catalog.listTables("mydb") // tables should contain "mytable"
  1. from pyflink.table import *
  2. from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
  3. from pyflink.table.descriptors import Kafka
  4. settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
  5. t_env = BatchTableEnvironment.create(environment_settings=settings)
  6. # Create a HiveCatalog
  7. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  8. # Register the catalog
  9. t_env.register_catalog("myhive", catalog)
  10. # Create a catalog database
  11. database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  12. catalog.create_database("mydb", database)
  13. # Create a catalog table
  14. table_schema = TableSchema.builder() \
  15. .field("name", DataTypes.STRING()) \
  16. .field("age", DataTypes.INT()) \
  17. .build()
  18. table_properties = Kafka() \
  19. .version("0.11") \
  20. .start_from_earlist() \
  21. .to_properties()
  22. catalog_table = CatalogBaseTable.create_table(
  23. schema=table_schema, properties=table_properties, comment="my comment")
  24. catalog.create_table(
  25. ObjectPath("mydb", "mytable"),
  26. catalog_table,
  27. False)
  28. # tables should contain "mytable"
  29. tables = catalog.list_tables("mydb")

Catalog API

Note: only catalog program APIs are listed here. Users can achieve many of the same funtionalities with SQL DDL. For detailed DDL information, please refer to SQL CREATE DDL.

Database operations

  1. // create database
  2. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
  3. // drop database
  4. catalog.dropDatabase("mydb", false);
  5. // alter database
  6. catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
  7. // get database
  8. catalog.getDatabase("mydb");
  9. // check if a database exist
  10. catalog.databaseExists("mydb");
  11. // list databases in a catalog
  12. catalog.listDatabases();
  1. from pyflink.table.catalog import CatalogDatabase
  2. # create database
  3. catalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  4. catalog.create_database("mydb", catalog_database, False)
  5. # drop database
  6. catalog.drop_database("mydb", False)
  7. # alter database
  8. catalog.alter_database("mydb", catalog_database, False)
  9. # get database
  10. catalog.get_database("mydb")
  11. # check if a database exist
  12. catalog.database_exists("mydb")
  13. # list databases in a catalog
  14. catalog.list_databases()

Table operations

  1. // create table
  2. catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  3. // drop table
  4. catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
  5. // alter table
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  7. // rename table
  8. catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
  9. // get table
  10. catalog.getTable("mytable");
  11. // check if a table exist or not
  12. catalog.tableExists("mytable");
  13. // list tables in a database
  14. catalog.listTables("mydb");
  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. from pyflink.table.descriptors import Kafka
  4. table_schema = TableSchema.builder() \
  5. .field("name", DataTypes.STRING()) \
  6. .field("age", DataTypes.INT()) \
  7. .build()
  8. table_properties = Kafka() \
  9. .version("0.11") \
  10. .start_from_earlist() \
  11. .to_properties()
  12. catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")
  13. # create table
  14. catalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  15. # drop table
  16. catalog.drop_table(ObjectPath("mydb", "mytable"), False)
  17. # alter table
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename table
  20. catalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")
  21. # get table
  22. catalog.get_table("mytable")
  23. # check if a table exist or not
  24. catalog.table_exists("mytable")
  25. # list tables in a database
  26. catalog.list_tables("mydb")

View operations

  1. // create view
  2. catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
  3. // drop view
  4. catalog.dropTable(new ObjectPath("mydb", "myview"), false);
  5. // alter view
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
  7. // rename view
  8. catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
  9. // get view
  10. catalog.getTable("myview");
  11. // check if a view exist or not
  12. catalog.tableExists("mytable");
  13. // list views in a database
  14. catalog.listViews("mydb");
  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. table_schema = TableSchema.builder() \
  4. .field("name", DataTypes.STRING()) \
  5. .field("age", DataTypes.INT()) \
  6. .build()
  7. catalog_table = CatalogBaseTable.create_view(
  8. original_query="select * from t1",
  9. expanded_query="select * from test-catalog.db1.t1",
  10. schema=table_schema,
  11. properties={},
  12. comment="This is a view"
  13. )
  14. catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)
  15. # drop view
  16. catalog.drop_table(ObjectPath("mydb", "myview"), False)
  17. # alter view
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename view
  20. catalog.rename_table(ObjectPath("mydb", "myview"), "my_new_view", False)
  21. # get view
  22. catalog.get_table("myview")
  23. # check if a view exist or not
  24. catalog.table_exists("mytable")
  25. # list views in a database
  26. catalog.list_views("mydb")

Partition operations

  1. // create view
  2. catalog.createPartition(
  3. new ObjectPath("mydb", "mytable"),
  4. new CatalogPartitionSpec(...),
  5. new CatalogPartitionImpl(...),
  6. false);
  7. // drop partition
  8. catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
  9. // alter partition
  10. catalog.alterPartition(
  11. new ObjectPath("mydb", "mytable"),
  12. new CatalogPartitionSpec(...),
  13. new CatalogPartitionImpl(...),
  14. false);
  15. // get partition
  16. catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  17. // check if a partition exist or not
  18. catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  19. // list partitions of a table
  20. catalog.listPartitions(new ObjectPath("mydb", "mytable"));
  21. // list partitions of a table under a give partition spec
  22. catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  23. // list partitions of a table by expression filter
  24. catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
  1. from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartition
  2. catalog_partition = CatalogPartition.create_instance({}, "my partition")
  3. catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})
  4. catalog.create_partition(
  5. ObjectPath("mydb", "mytable"),
  6. catalog_partition_spec,
  7. catalog_partition,
  8. False)
  9. # drop partition
  10. catalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)
  11. # alter partition
  12. catalog.alter_partition(
  13. ObjectPath("mydb", "mytable"),
  14. CatalogPartitionSpec(...),
  15. catalog_partition,
  16. False)
  17. # get partition
  18. catalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  19. # check if a partition exist or not
  20. catalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  21. # list partitions of a table
  22. catalog.list_partitions(ObjectPath("mydb", "mytable"))
  23. # list partitions of a table under a give partition spec
  24. catalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)

Function operations

  1. // create function
  2. catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  3. // drop function
  4. catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
  5. // alter function
  6. catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  7. // get function
  8. catalog.getFunction("myfunc");
  9. // check if a function exist or not
  10. catalog.functionExists("myfunc");
  11. // list functions in a database
  12. catalog.listFunctions("mydb");
  1. from pyflink.table.catalog import ObjectPath, CatalogFunction
  2. catalog_function = CatalogFunction.create_instance(class_name="my.python.udf")
  3. # create function
  4. catalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  5. # drop function
  6. catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
  7. # alter function
  8. catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  9. # get function
  10. catalog.get_function("myfunc")
  11. # check if a function exist or not
  12. catalog.function_exists("myfunc")
  13. # list functions in a database
  14. catalog.list_functions("mydb")

Table API and SQL for Catalog

Registering a Catalog

Users have access to a default in-memory catalog named default_catalog, that is always created by default. This catalog by default has a single database called default_database. Users can also register additional catalogs into an existing Flink session.

  1. tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
  1. t_env.register_catalog(catalog)

All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box.

CatalogType Value
GenericInMemorygeneric_in_memory
Hivehive
  1. catalogs:
  2. - name: myCatalog
  3. type: custom_catalog
  4. hive-conf-dir: ...

Changing the Current Catalog And Database

Flink will always search for tables, views, and UDF’s in the current catalog and database.

  1. tableEnv.useCatalog("myCatalog");
  2. tableEnv.useDatabase("myDb");
  1. t_env.use_catalog("myCatalog")
  2. t_env.use_database("myDb")
  1. Flink SQL> USE CATALOG myCatalog;
  2. Flink SQL> USE myDB;

Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.

  1. tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
  1. t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")
  1. Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

List Available Catalogs

  1. tableEnv.listCatalogs();
  1. t_env.list_catalogs()
  1. Flink SQL> show catalogs;

List Available Databases

  1. tableEnv.listDatabases();
  1. t_env.list_databases()
  1. Flink SQL> show databases;

List Available Tables

  1. tableEnv.listTables();
  1. t_env.list_tables()
  1. Flink SQL> show tables;