Catalogs

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

Catalog 类型

GenericInMemoryCatalog

GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

JdbcCatalog

JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 仅有的两种实现。 参考 JdbcCatalog 文档 获取关于配置 JDBC catalog 的详细信息。

HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。

警告 Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。

用户自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。 想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

从 Flink v1.16 开始, TableEnvironment 引入了一个用户类加载器,以在 table 程序、SQL Client、SQL Gateway 中保持一致的类加载行为。该类加载器会统一管理所有的用户 jar 包,包括通过 ADD JARCREATE FUNCTION .. USING JAR .. 添加的 jar 资源。 在用户自定义 catalog 中,应该将 Thread.currentThread().getContextClassLoader() 替换成该用户类加载器去加载类。否则,可能会发生 ClassNotFoundException 的异常。该用户类加载器可以通过 CatalogFactory.Context#getClassLoader 获得。

Catalog 中支持时间旅行的接口

从 1.18 开始, Flink 框架开始支持时间旅行查询表的历史数据。如果要查询表的历史数据,需要这张表所属于的 catalog 实现 getTable(ObjectPath tablePath, long timestamp) 方法,如下所示:

  1. public class MyCatalogSupportTimeTravel implements Catalog {
  2. @Override
  3. public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
  4. throws TableNotExistException {
  5. // Build a schema corresponding to the specific time point.
  6. Schema schema = buildSchema(timestamp);
  7. // Set parameters to read data at the corresponding time point.
  8. Map<String, String> options = buildOptions(timestamp);
  9. // Build CatalogTable
  10. CatalogTable catalogTable =
  11. CatalogTable.of(schema, "", Collections.emptyList(), options, timestamp);
  12. return catalogTable;
  13. }
  14. }
  15. public class MyDynamicTableFactory implements DynamicTableSourceFactory {
  16. @Override
  17. public DynamicTableSource createDynamicTableSource(Context context) {
  18. final ReadableConfig configuration =
  19. Configuration.fromMap(context.getCatalogTable().getOptions());
  20. // Get snapshot from CatalogTable
  21. final Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
  22. // Build DynamicTableSource using snapshot options.
  23. final DynamicTableSource dynamicTableSource = buildDynamicSource(configuration, snapshot);
  24. return dynamicTableSource;
  25. }
  26. }

使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

Java

  1. TableEnvironment tableEnv = ...;
  2. // Create a HiveCatalog
  3. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog);
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
  10. tableEnv.listTables(); // should return the tables in current catalog and database.

Scala

  1. val tableEnv = ...
  2. // Create a HiveCatalog
  3. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog);
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
  10. tableEnv.listTables(); // should return the tables in current catalog and database.

Python

  1. from pyflink.table.catalog import HiveCatalog
  2. # Create a HiveCatalog
  3. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  4. # Register the catalog
  5. t_env.register_catalog("myhive", catalog)
  6. # Create a catalog database
  7. t_env.execute_sql("CREATE DATABASE mydb WITH (...)")
  8. # Create a catalog table
  9. t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  10. # should return the tables in current catalog and database.
  11. t_env.list_tables()

SQL Client

  1. // the catalog should have been registered via yaml file
  2. Flink SQL> CREATE DATABASE mydb WITH (...);
  3. Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. mytable

更多详细信息,请参考Flink SQL CREATE DDL

使用 Java/Scala

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.catalog.*;
  3. import org.apache.flink.table.catalog.hive.HiveCatalog;
  4. TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
  5. // Create a HiveCatalog
  6. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  7. // Register the catalog
  8. tableEnv.registerCatalog("myhive", catalog);
  9. // Create a catalog database
  10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
  11. // Create a catalog table
  12. final Schema schema = Schema.newBuilder()
  13. .column("name", DataTypes.STRING())
  14. .column("age", DataTypes.INT())
  15. .build();
  16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
  17. .schema(schema)
  18. // …
  19. .build());
  20. List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.catalog._
  3. import org.apache.flink.table.catalog.hive.HiveCatalog
  4. val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
  5. // Create a HiveCatalog
  6. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
  7. // Register the catalog
  8. tableEnv.registerCatalog("myhive", catalog)
  9. // Create a catalog database
  10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
  11. // Create a catalog table
  12. val schema = Schema.newBuilder()
  13. .column("name", DataTypes.STRING())
  14. .column("age", DataTypes.INT())
  15. .build()
  16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
  17. .schema(schema)
  18. // …
  19. .build())
  20. val tables = catalog.listTables("mydb") // tables should contain "mytable"

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
  3. from pyflink.table.descriptors import Kafka
  4. settings = EnvironmentSettings.in_batch_mode()
  5. t_env = TableEnvironment.create(settings)
  6. # Create a HiveCatalog
  7. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  8. # Register the catalog
  9. t_env.register_catalog("myhive", catalog)
  10. # Create a catalog database
  11. database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  12. catalog.create_database("mydb", database)
  13. # Create a catalog table
  14. schema = Schema.new_builder() \
  15. .column("name", DataTypes.STRING()) \
  16. .column("age", DataTypes.INT()) \
  17. .build()
  18. catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka")
  19. .schema(schema)
  20. # …
  21. .build())
  22. # tables should contain "mytable"
  23. tables = catalog.list_tables("mydb")

Catalog API

注意:这里只列出了编程方式的 Catalog API,用户可以使用 SQL DDL 实现许多相同的功能。 关于 DDL 的详细信息请参考 SQL CREATE DDL

数据库操作

Java/Scala

  1. // create database
  2. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
  3. // drop database
  4. catalog.dropDatabase("mydb", false);
  5. // alter database
  6. catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
  7. // get database
  8. catalog.getDatabase("mydb");
  9. // check if a database exist
  10. catalog.databaseExists("mydb");
  11. // list databases in a catalog
  12. catalog.listDatabases("mycatalog");

Python

  1. from pyflink.table.catalog import CatalogDatabase
  2. # create database
  3. catalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  4. catalog.create_database("mydb", catalog_database, False)
  5. # drop database
  6. catalog.drop_database("mydb", False)
  7. # alter database
  8. catalog.alter_database("mydb", catalog_database, False)
  9. # get database
  10. catalog.get_database("mydb")
  11. # check if a database exist
  12. catalog.database_exists("mydb")
  13. # list databases in a catalog
  14. catalog.list_databases()

表操作

Java/Scala

  1. // create table
  2. catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  3. // drop table
  4. catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
  5. // alter table
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  7. // rename table
  8. catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
  9. // get table
  10. catalog.getTable("mytable");
  11. // check if a table exist or not
  12. catalog.tableExists("mytable");
  13. // list tables in a database
  14. catalog.listTables("mydb");

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. from pyflink.table.descriptors import Kafka
  4. table_schema = TableSchema.builder() \
  5. .field("name", DataTypes.STRING()) \
  6. .field("age", DataTypes.INT()) \
  7. .build()
  8. table_properties = Kafka() \
  9. .version("0.11") \
  10. .start_from_earlist() \
  11. .to_properties()
  12. catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")
  13. # create table
  14. catalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  15. # drop table
  16. catalog.drop_table(ObjectPath("mydb", "mytable"), False)
  17. # alter table
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename table
  20. catalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")
  21. # get table
  22. catalog.get_table("mytable")
  23. # check if a table exist or not
  24. catalog.table_exists("mytable")
  25. # list tables in a database
  26. catalog.list_tables("mydb")

视图操作

Java/Scala

  1. // create view
  2. catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
  3. // drop view
  4. catalog.dropTable(new ObjectPath("mydb", "myview"), false);
  5. // alter view
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
  7. // rename view
  8. catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
  9. // get view
  10. catalog.getTable("myview");
  11. // check if a view exist or not
  12. catalog.tableExists("mytable");
  13. // list views in a database
  14. catalog.listViews("mydb");

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. table_schema = TableSchema.builder() \
  4. .field("name", DataTypes.STRING()) \
  5. .field("age", DataTypes.INT()) \
  6. .build()
  7. catalog_table = CatalogBaseTable.create_view(
  8. original_query="select * from t1",
  9. expanded_query="select * from test-catalog.db1.t1",
  10. schema=table_schema,
  11. properties={},
  12. comment="This is a view"
  13. )
  14. catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)
  15. # drop view
  16. catalog.drop_table(ObjectPath("mydb", "myview"), False)
  17. # alter view
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename view
  20. catalog.rename_table(ObjectPath("mydb", "myview"), "my_new_view", False)
  21. # get view
  22. catalog.get_table("myview")
  23. # check if a view exist or not
  24. catalog.table_exists("mytable")
  25. # list views in a database
  26. catalog.list_views("mydb")

分区操作

Java/Scala

  1. // create view
  2. catalog.createPartition(
  3. new ObjectPath("mydb", "mytable"),
  4. new CatalogPartitionSpec(...),
  5. new CatalogPartitionImpl(...),
  6. false);
  7. // drop partition
  8. catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
  9. // alter partition
  10. catalog.alterPartition(
  11. new ObjectPath("mydb", "mytable"),
  12. new CatalogPartitionSpec(...),
  13. new CatalogPartitionImpl(...),
  14. false);
  15. // get partition
  16. catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  17. // check if a partition exist or not
  18. catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  19. // list partitions of a table
  20. catalog.listPartitions(new ObjectPath("mydb", "mytable"));
  21. // list partitions of a table under a give partition spec
  22. catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  23. // list partitions of a table by expression filter
  24. catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

Python

  1. from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartition
  2. catalog_partition = CatalogPartition.create_instance({}, "my partition")
  3. catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})
  4. catalog.create_partition(
  5. ObjectPath("mydb", "mytable"),
  6. catalog_partition_spec,
  7. catalog_partition,
  8. False)
  9. # drop partition
  10. catalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)
  11. # alter partition
  12. catalog.alter_partition(
  13. ObjectPath("mydb", "mytable"),
  14. CatalogPartitionSpec(...),
  15. catalog_partition,
  16. False)
  17. # get partition
  18. catalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  19. # check if a partition exist or not
  20. catalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  21. # list partitions of a table
  22. catalog.list_partitions(ObjectPath("mydb", "mytable"))
  23. # list partitions of a table under a give partition spec
  24. catalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)

函数操作

Java/Scala

  1. // create function
  2. catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  3. // drop function
  4. catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
  5. // alter function
  6. catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  7. // get function
  8. catalog.getFunction("myfunc");
  9. // check if a function exist or not
  10. catalog.functionExists("myfunc");
  11. // list functions in a database
  12. catalog.listFunctions("mydb");

Python

  1. from pyflink.table.catalog import ObjectPath, CatalogFunction
  2. catalog_function = CatalogFunction.create_instance(class_name="my.python.udf")
  3. # create function
  4. catalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  5. # drop function
  6. catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
  7. # alter function
  8. catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  9. # get function
  10. catalog.get_function("myfunc")
  11. # check if a function exist or not
  12. catalog.function_exists("myfunc")
  13. # list functions in a database
  14. catalog.list_functions("mydb")

通过 Table API 和 SQL Client 操作 Catalog

注册 Catalog

用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。

Java/Scala

  1. tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

Python

  1. t_env.register_catalog(catalog)

YAML

使用 YAML 定义的 Catalog 必须提供 type 属性,以表示指定的 Catalog 类型。 以下几种类型可以直接使用。

CatalogType Value
GenericInMemorygeneric_in_memory
Hivehive
  1. catalogs:
  2. - name: myCatalog
  3. type: custom_catalog
  4. hive-conf-dir: ...

修改当前的 Catalog 和数据库

Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。

Java/Scala

  1. tableEnv.useCatalog("myCatalog");
  2. tableEnv.useDatabase("myDb");

Python

  1. t_env.use_catalog("myCatalog")
  2. t_env.use_database("myDb")

SQL

  1. Flink SQL> USE CATALOG myCatalog;
  2. Flink SQL> USE myDB;

通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。

Java/Scala

  1. tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

Python

  1. t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")

SQL

  1. Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

列出可用的 Catalog

Java/Scala

  1. tableEnv.listCatalogs();

Python

  1. t_env.list_catalogs()

SQL

  1. Flink SQL> show catalogs;

列出可用的数据库

Java/Scala

  1. tableEnv.listDatabases();

Python

  1. t_env.list_databases()

SQL

  1. Flink SQL> show databases;

列出可用的表

Java/Scala

  1. tableEnv.listTables();

Python

  1. t_env.list_tables()

SQL

  1. Flink SQL> show tables;

Catalog Modification Listener

Flink supports registering customized listener for catalog modification, such as database and table ddl. Flink will create a CatalogModificationEvent event for ddl and notify CatalogModificationListener. You can implement a listener and do some customized operations when receiving the event, such as report the information to some external meta-data systems.

Implement Catalog Listener

There are two interfaces for the catalog modification listener: CatalogModificationListenerFactory to create the listener and CatalogModificationListener to receive and process the event. You need to implement these interfaces and below is an example.

  1. /** Factory used to create a {@link CatalogModificationListener} instance. */
  2. public class YourCatalogListenerFactory implements CatalogModificationListenerFactory {
  3. /** The identifier for the customized listener factory, you can named it yourself. */
  4. private static final String IDENTIFIER = "your_factory";
  5. @Override
  6. public String factoryIdentifier() {
  7. return IDENTIFIER;
  8. }
  9. @Override
  10. public CatalogModificationListener createListener(Context context) {
  11. return new YourCatalogListener(Create http client from context);
  12. }
  13. }
  14. /** Customized catalog modification listener. */
  15. public class YourCatalogListener implements CatalogModificationListener {
  16. private final HttpClient client;
  17. YourCatalogListener(HttpClient client) {
  18. this.client = client;
  19. }
  20. @Override
  21. public void onEvent(CatalogModificationEvent event) {
  22. // Report the database and table information via http client.
  23. }
  24. }

Register Catalog Listener

After implemented above catalog modification factory and listener, you can register it to the table environment.

  1. Configuration configuration = new Configuration();
  2. // Add the factory identifier, you can set multiple listeners in the configuraiton.
  3. configuration.set(TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList("your_factory"));
  4. TableEnvironment env = TableEnvironment.create(
  5. EnvironmentSettings.newInstance()
  6. .withConfiguration(configuration)
  7. .build());
  8. // Create/Alter/Drop database and table.
  9. env.executeSql("CREATE TABLE ...").wait();

For sql-gateway, you can add the option table.catalog-modification.listeners in the flink-conf.yaml and start the gateway, or you can also use SET to specify the listener for ddl, for example, in sql-client or jdbc-driver.

  1. Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
  2. Flink SQL> CREATE TABLE test_table(...);

Catalog Store

Catalog Store 用于保存 Catalog 的配置信息, 配置 Catalog Store 之后,在 session 中创建的 catalog 信息会持久化至 Catalog Store 对应的外部系统中,即使 session 重建, 之前创建的 Catalog 依旧可以从 Catalog Store 中重新获取。

Catalog Store 的配置

用户可以以不同的方式配置 Catalog Store,一种是使用Table API,另一种是使用 YAML 配置。

在 Table API 中使用 Catalog Store 实例来注册 Catalog Store 。

  1. // Initialize a catalog Store
  2. CatalogStore catalogStore = new FileCatalogStore("file://path/to/catalog/store/");
  3. // set up the catalog store
  4. final EnvironmentSettings settings =
  5. EnvironmentSettings.newInstance().inBatchMode()
  6. .withCatalogStore(catalogStore)
  7. .build();
  8. final TableEnvironment tableEnv = TableEnvironment.create(settings);

在 Table API 中使用 configuration 注册 Catalog Store 。

  1. // set up configuration
  2. Configuration configuration = new Configuration();
  3. configuration.set("table.catalog-store.kind", "file");
  4. configuration.set("table.catalog-store.file.path", "file://path/to/catalog/store/");
  5. // set up the configuration.
  6. final EnvironmentSettings settings =
  7. EnvironmentSettings.newInstance().inBatchMode()
  8. .withConfiguration(configuration)
  9. .build();
  10. final TableEnvironment tableEnv = TableEnvironment.create(settings);

在 SQL Gateway 中,推荐在 flink-conf.yaml 文件中进行配置,所有的 session 可以自动使用已经创建好的 Catalog 。 配置的格式如下,一般情况下需要配置 Catalog Store 的 kind ,以及 Catalog Store 需要的其他参数配置。

  1. table.catalog-store.kind: file
  2. table.catalog-store.file.path: /path/to/catalog/store/

Catalog Store 类型

Flink 框架内置了两种 Catalog Store,分别是 GenericInMemoryCatalogStore 和 FileCatalogStore。用户也可以自定义 Catalog Store 。

GenericInMemoryCatalogStore

GenericInMemoryCatalogStore 是基于内存实现的 Catalog Store,所有的 Catalog 配置只在 session 的生命周期内可用, session 重建之后 store 中保存的 Catalog 配置也会自动清理。

参数描述
kind
指定要使用的 Catalog Store 类型,此处应为 ‘generic_in_memory’

FileCatalogStore

FileCatalogStore 可以将用户的 Catalog 配置信息保存至文件中,使用 FileCatalogStore 需要指定 Catalog 配置需要 保存的目录,不同的 Catalog 会对应不同的文件并和 Catalog Name 一一对应。

这是一个示例目录结构,用于表示使用 FileCatalogStore 保存 catalog 配置的情况:

  1. - /path/to/save/the/catalog/
  2. - catalog1.yaml
  3. - catalog2.yaml
  4. - catalog3.yaml
参数描述
kind
指定要使用的 Catalog Store 类型,此处应为 ‘file’
path
指定要使用的 Catalog Store 保存的路径,必须是一个合法的目录,当前只支持本地目录

用户自定义 Catalog Store

Catalog Store 是可拓展的, 用户可以通过实现 Catalog Store 的接口来自定义 Catalog Store。如果需要 SQL CLI 或者 SQL Gateway 中使用 Catalog Store,还需要这个 Catalog Store 实现对应的 CatalogStoreFactory 接口。

  1. public class CustomCatalogStoreFactory implements CatalogStoreFactory {
  2. public static final String IDENTIFIER = "custom-kind";
  3. // Used to connect external storage systems
  4. private CustomClient client;
  5. @Override
  6. public CatalogStore createCatalogStore() {
  7. return new CustomCatalogStore();
  8. }
  9. @Override
  10. public void open(Context context) throws CatalogException {
  11. // initialize the resources, such as http client
  12. client = initClient(context);
  13. }
  14. @Override
  15. public void close() throws CatalogException {
  16. // release the resources
  17. }
  18. @Override
  19. public String factoryIdentifier() {
  20. // table store kind identifier
  21. return IDENTIFIER;
  22. }
  23. @Override
  24. public Set<ConfigOption<?>> requiredOptions() {
  25. // define the required options
  26. Set<ConfigOption> options = new HashSet();
  27. options.add(OPTION_1);
  28. options.add(OPTION_2);
  29. return options;
  30. }
  31. @Override
  32. public Set<ConfigOption<?>> optionalOptions() {
  33. // define the optional options
  34. }
  35. }
  36. public class CustomCatalogStore extends AbstractCatalogStore {
  37. private Client client;
  38. public CustomCatalogStore(Client client) {
  39. this.client = client;
  40. }
  41. @Override
  42. public void storeCatalog(String catalogName, CatalogDescriptor catalog)
  43. throws CatalogException {
  44. // store the catalog
  45. }
  46. @Override
  47. public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
  48. throws CatalogException {
  49. // remove the catalog descriptor
  50. }
  51. @Override
  52. public Optional<CatalogDescriptor> getCatalog(String catalogName) {
  53. // retrieve the catalog configuration and build the catalog descriptor
  54. }
  55. @Override
  56. public Set<String> listCatalogs() {
  57. // list all catalogs
  58. }
  59. @Override
  60. public boolean contains(String catalogName) {
  61. }
  62. }