Modules
Modules allow users to extend Flink’s built-in objects, such as defining functions that behave like Flink built-in functions. They are pluggable, and while Flink provides a few pre-built modules, users can write their own.
For example, users can define their own geo functions and plug them into Flink as built-in functions to be used in Flink SQL and Table APIs. Another example is users can load an out-of-shelf Hive module to use Hive built-in functions as Flink built-in functions.
Furthermore, a module can provide built-in table source and sink factories which disable Flink’s default discovery mechanism based on Java’s Service Provider Interfaces (SPI), or influence how connectors of temporary tables should be created without a corresponding catalog.
Module Types
CoreModule
CoreModule contains all of Flink’s system (built-in) functions and is loaded and enabled by default.
HiveModule
The HiveModule provides Hive built-in functions as Flink’s system functions to SQL and Table API users. Flink’s Hive documentation provides full details on setting up the module.
User-Defined Module
Users can develop custom modules by implementing the Module interface. To use custom modules in SQL CLI, users should develop both a module and its corresponding module factory by implementing the ModuleFactory interface.
A module factory defines a set of properties for configuring the module when the SQL CLI bootstraps. Properties are passed to a discovery service where the service tries to match the properties to a ModuleFactory and instantiate a corresponding module instance.
Module Lifecycle and Resolution Order
A module can be loaded, enabled, disabled and unloaded. When TableEnvironment loads a module initially, it enables the module by default. Flink supports multiple modules and keeps track of the loading order to resolve metadata. Besides, Flink only resolves the functions among enabled modules. E.g., when there are two functions of the same name residing in two modules, there will be three conditions.
- If both of the modules are enabled, then Flink resolves the function according to the resolution order of the modules.
- If one of them is disabled, then Flink resolves the function to the enabled module.
- If both of the modules are disabled, then Flink cannot resolve the function.
Users can change the resolution order by using modules in a different declared order. E.g., users can specify Flink to find functions first in Hive by USE MODULES hive, core.
Besides, users can also disable modules by not declaring them. E.g., users can specify Flink to disable core module by USE MODULES hive (However, it is strongly not recommended disabling core module). Disable a module does not unload it, and users can enable it again by using it. E.g., users can bring back core module and place it in the first by USE MODULES core, hive. A module can be enabled only when it is loaded already. Using an unloaded module will throw an Exception. Eventually, users can unload a module.
The difference between disabling and unloading a module is that TableEnvironment still keeps the disabled modules, and users can list all loaded modules to view the disabled modules.
Namespace
Objects provided by modules are considered part of Flink’s system (built-in) objects; thus, they don’t have any namespaces.
How to Load, Unload, Use and List Modules
Using SQL
Users can use SQL to load/unload/use/list modules in both Table API and SQL CLI.
Java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();TableEnvironment tableEnv = TableEnvironment.create(settings);// Show initially loaded and enabled modulestableEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | core |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print();// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// +-------------+------+// Load a hive moduletableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')");// Show all enabled modulestableEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | core |// | hive |// +-------------+// Show all loaded modules with both name and use statustableEnv.executeSql("SHOW FULL MODULES").print();// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// | hive | true |// +-------------+------+// Change resolution ordertableEnv.executeSql("USE MODULES hive, core");tableEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | hive |// | core |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print();// +-------------+------+// | module name | used |// +-------------+------+// | hive | true |// | core | true |// +-------------+------+// Disable core moduletableEnv.executeSql("USE MODULES hive");tableEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | hive |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | true |// | core | false |// +-------------+-------+// Unload hive moduletableEnv.executeSql("UNLOAD MODULE hive");tableEnv.executeSql("SHOW MODULES").print();// Empty settableEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | false |// +-------------+-------+
Scala
val settings = EnvironmentSettings.inStreamingMode()val tableEnv = TableEnvironment.create(setting)// Show initially loaded and enabled modulestableEnv.executeSql("SHOW MODULES").print()// +-------------+// | module name |// +-------------+// | core |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print()// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// +-------------+------+// Load a hive moduletableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')")// Show all enabled modulestableEnv.executeSql("SHOW MODULES").print()// +-------------+// | module name |// +-------------+// | core |// | hive |// +-------------+// Show all loaded modules with both name and use statustableEnv.executeSql("SHOW FULL MODULES")// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// | hive | true |// +-------------+------+// Change resolution ordertableEnv.executeSql("USE MODULES hive, core")tableEnv.executeSql("SHOW MODULES").print()// +-------------+// | module name |// +-------------+// | hive |// | core |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print()// +-------------+------+// | module name | used |// +-------------+------+// | hive | true |// | core | true |// +-------------+------+// Disable core moduletableEnv.executeSql("USE MODULES hive")tableEnv.executeSql("SHOW MODULES").print()// +-------------+// | module name |// +-------------+// | hive |// +-------------+tableEnv.executeSql("SHOW FULL MODULES").print()// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | true |// | core | false |// +-------------+-------+// Unload hive moduletableEnv.executeSql("UNLOAD MODULE hive")tableEnv.executeSql("SHOW MODULES").print()// Empty settableEnv.executeSql("SHOW FULL MODULES").print()// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | false |// +-------------+-------+
Python
from pyflink.table import *# environment configurationsettings = EnvironmentSettings.inStreamingMode()t_env = TableEnvironment.create(settings)# Show initially loaded and enabled modulest_env.execute_sql("SHOW MODULES").print()# +-------------+# | module name |# +-------------+# | core |# +-------------+t_env.execute_sql("SHOW FULL MODULES").print()# +-------------+------+# | module name | used |# +-------------+------+# | core | true |# +-------------+------+# Load a hive modulet_env.execute_sql("LOAD MODULE hive WITH ('hive-version' = '...')")# Show all enabled modulest_env.execute_sql("SHOW MODULES").print()# +-------------+# | module name |# +-------------+# | core |# | hive |# +-------------+# Show all loaded modules with both name and use statust_env.execute_sql("SHOW FULL MODULES").print()# +-------------+------+# | module name | used |# +-------------+------+# | core | true |# | hive | true |# +-------------+------+# Change resolution ordert_env.execute_sql("USE MODULES hive, core")t_env.execute_sql("SHOW MODULES").print()# +-------------+# | module name |# +-------------+# | hive |# | core |# +-------------+t_env.execute_sql("SHOW FULL MODULES").print()# +-------------+------+# | module name | used |# +-------------+------+# | hive | true |# | core | true |# +-------------+------+# Disable core modulet_env.execute_sql("USE MODULES hive")t_env.execute_sql("SHOW MODULES").print()# +-------------+# | module name |# +-------------+# | hive |# +-------------+t_env.execute_sql("SHOW FULL MODULES").print()# +-------------+-------+# | module name | used |# +-------------+-------+# | hive | true |# | core | false |# +-------------+-------+# Unload hive modulet_env.execute_sql("UNLOAD MODULE hive")t_env.execute_sql("SHOW MODULES").print()# Empty sett_env.execute_sql("SHOW FULL MODULES").print()# +-------------+-------+# | module name | used |# +-------------+-------+# | hive | false |# +-------------+-------+
SQL Client
-- Show initially loaded and enabled modulesFlink SQL> SHOW MODULES;+-------------+| module name |+-------------+| core |+-------------+1 row in setFlink SQL> SHOW FULL MODULES;+-------------+------+| module name | used |+-------------+------+| core | true |+-------------+------+1 row in set-- Load a hive moduleFlink SQL> LOAD MODULE hive WITH ('hive-version' = '...');-- Show all enabled modulesFlink SQL> SHOW MODULES;+-------------+| module name |+-------------+| core || hive |+-------------+2 rows in set-- Show all loaded modules with both name and use statusFlink SQL> SHOW FULL MODULES;+-------------+------+| module name | used |+-------------+------+| core | true || hive | true |+-------------+------+2 rows in set-- Change resolution orderFlink SQL> USE MODULES hive, core ;Flink SQL> SHOW MODULES;+-------------+| module name |+-------------+| hive || core |+-------------+2 rows in setFlink SQL> SHOW FULL MODULES;+-------------+------+| module name | used |+-------------+------+| hive | true || core | true |+-------------+------+2 rows in set-- Unload hive moduleFlink SQL> UNLOAD MODULE hive;Flink SQL> SHOW MODULES;Empty setFlink SQL> SHOW FULL MODULES;+-------------+-------+| module name | used |+-------------+-------+| hive | false |+-------------+-------+1 row in set
YAML
All modules defined using YAML must provide a type property that specifies the type. The following types are supported out of the box.
| Module | Type Value |
|---|---|
| CoreModule | core |
| HiveModule | hive |
modules:- name: coretype: core- name: hivetype: hive
When using SQL, module name is used to perform the module discovery. It is parsed as a simple identifier and case-sensitive.
Using Java, Scala or Python
Users can use Java, Scala or Python to load/unload/use/list modules programmatically.
Java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();TableEnvironment tableEnv = TableEnvironment.create(settings);// Show initially loaded and enabled modulestableEnv.listModules();// +-------------+// | module name |// +-------------+// | core |// +-------------+tableEnv.listFullModules();// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// +-------------+------+// Load a hive moduletableEnv.loadModule("hive", new HiveModule());// Show all enabled modulestableEnv.listModules();// +-------------+// | module name |// +-------------+// | core |// | hive |// +-------------+// Show all loaded modules with both name and use statustableEnv.listFullModules();// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// | hive | true |// +-------------+------+// Change resolution ordertableEnv.useModules("hive", "core");tableEnv.listModules();// +-------------+// | module name |// +-------------+// | hive |// | core |// +-------------+tableEnv.listFullModules();// +-------------+------+// | module name | used |// +-------------+------+// | hive | true |// | core | true |// +-------------+------+// Disable core moduletableEnv.useModules("hive");tableEnv.listModules();// +-------------+// | module name |// +-------------+// | hive |// +-------------+tableEnv.listFullModules();// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | true |// | core | false |// +-------------+-------+// Unload hive moduletableEnv.unloadModule("hive");tableEnv.listModules();// Empty settableEnv.listFullModules();// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | false |// +-------------+-------+
Scala
val settings = EnvironmentSettings.inStreamingMode()val tableEnv = TableEnvironment.create(setting)// Show initially loaded and enabled modulestableEnv.listModules()// +-------------+// | module name |// +-------------+// | core |// +-------------+tableEnv.listFullModules()// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// +-------------+------+// Load a hive moduletableEnv.loadModule("hive", new HiveModule())// Show all enabled modulestableEnv.listModules()// +-------------+// | module name |// +-------------+// | core |// | hive |// +-------------+// Show all loaded modules with both name and use statustableEnv.listFullModules()// +-------------+------+// | module name | used |// +-------------+------+// | core | true |// | hive | true |// +-------------+------+// Change resolution ordertableEnv.useModules("hive", "core")tableEnv.listModules()// +-------------+// | module name |// +-------------+// | hive |// | core |// +-------------+tableEnv.listFullModules()// +-------------+------+// | module name | used |// +-------------+------+// | hive | true |// | core | true |// +-------------+------+// Disable core moduletableEnv.useModules("hive")tableEnv.listModules()// +-------------+// | module name |// +-------------+// | hive |// +-------------+tableEnv.listFullModules()// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | true |// | core | false |// +-------------+-------+// Unload hive moduletableEnv.unloadModule("hive")tableEnv.listModules()// Empty settableEnv.listFullModules()// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | false |// +-------------+-------+
Python
from pyflink.table import *# environment configurationsettings = EnvironmentSettings.inStreamingMode()t_env = TableEnvironment.create(settings)# Show initially loaded and enabled modulest_env.list_modules()# +-------------+# | module name |# +-------------+# | core |# +-------------+t_env.list_full_modules()# +-------------+------+# | module name | used |# +-------------+------+# | core | true |# +-------------+------+# Load a hive modulet_env.load_module("hive", HiveModule())# Show all enabled modulest_env.list_modules()# +-------------+# | module name |# +-------------+# | core |# | hive |# +-------------+# Show all loaded modules with both name and use statust_env.list_full_modules()# +-------------+------+# | module name | used |# +-------------+------+# | core | true |# | hive | true |# +-------------+------+# Change resolution ordert_env.use_modules("hive", "core")t_env.list_modules()# +-------------+# | module name |# +-------------+# | hive |# | core |# +-------------+t_env.list_full_modules()# +-------------+------+# | module name | used |# +-------------+------+# | hive | true |# | core | true |# +-------------+------+# Disable core modulet_env.use_modules("hive")t_env.list_modules()# +-------------+# | module name |# +-------------+# | hive |# +-------------+t_env.list_full_modules()# +-------------+-------+# | module name | used |# +-------------+-------+# | hive | true |# | core | false |# +-------------+-------+# Unload hive modulet_env.unload_module("hive")t_env.list_modules()# Empty sett_env.list_full_modules()# +-------------+-------+# | module name | used |# +-------------+-------+# | hive | false |# +-------------+-------+