SQL Client Beta

Flink’s Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.

The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

Animated demo of the Flink SQL Client CLI running table programs on a cluster

Attention The SQL Client is in an early development phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway.

Getting Started

This section describes how to setup and run your first Flink SQL program from the command-line.

The SQL Client is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the Cluster & Deployment part. If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command:

  1. ./bin/start-cluster.sh

Starting the SQL Client CLI

The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded mode is supported. You can start the CLI by calling:

  1. ./bin/sql-client.sh embedded

By default, the SQL Client will read its configuration from the environment file located in ./conf/sql-client-defaults.yaml. See the configuration part for more information about the structure of environment files.

Running SQL Queries

Once the CLI has been started, you can use the HELP command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the Enter key to execute it:

  1. SELECT 'Hello World';

This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the Q key.

The CLI supports three modes for maintaining and visualizing results.

The table mode materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:

  1. SET execution.result-mode=table;

The changelog mode does not materialize results and visualizes the result stream that is produced by a continuous query consisting of insertions (+) and retractions (-).

  1. SET execution.result-mode=changelog;

The tableau mode is more like a traditional way which will display the results in the screen directly with a tableau format. The displaying content will be influenced by the query execution type(execution.type).

  1. SET execution.result-mode=tableau;

Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. Otherwise, if you want to terminate a running query, just type CTRL-C in this case, the job and the printing will be stopped.

You can use the following query to see all the result modes in action:

  1. SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

This query performs a bounded word count example.

In changelog mode, the visualized changelog should be similar to:

  1. + Bob, 1
  2. + Alice, 1
  3. + Greg, 1
  4. - Bob, 1
  5. + Bob, 2

In table mode, the visualized result table is continuously updated until the table program ends with:

  1. Bob, 2
  2. Alice, 1
  3. Greg, 1

In tableau mode, if you ran the query in streaming mode, the displayed result would be:

  1. +-----+----------------------+----------------------+
  2. | +/- | name | cnt |
  3. +-----+----------------------+----------------------+
  4. | + | Bob | 1 |
  5. | + | Alice | 1 |
  6. | + | Greg | 1 |
  7. | - | Bob | 1 |
  8. | + | Bob | 2 |
  9. +-----+----------------------+----------------------+
  10. Received a total of 5 rows

And if you ran the query in batch mode, the displayed result would be:

  1. +-------+-----+
  2. | name | cnt |
  3. +-------+-----+
  4. | Alice | 1 |
  5. | Bob | 2 |
  6. | Greg | 1 |
  7. +-------+-----+
  8. 3 rows in set

All these result modes can be useful during the prototyping of SQL queries. In all these modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured maximum number of rows (max-table-result-rows).

Attention Queries that are executed in a batch environment, can only be retrieved using the table or tableau result mode.

After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the INSERT INTO statement. The configuration section explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.

Back to top

Configuration

The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.

  1. ./bin/sql-client.sh embedded --help
  2. Mode "embedded" submits Flink jobs from the local machine.
  3. Syntax: embedded [OPTIONS]
  4. "embedded" mode options:
  5. -d,--defaults <environment file> The environment properties with which
  6. every new session is initialized.
  7. Properties might be overwritten by
  8. session properties.
  9. -e,--environment <environment file> The environment properties to be
  10. imported into the session. It might
  11. overwrite default environment
  12. properties.
  13. -h,--help Show the help message with
  14. descriptions of all options.
  15. -hist,--history <History file path> The file which you want to save the
  16. command history into. If not
  17. specified, we will auto-generate one
  18. under your user's home directory.
  19. -j,--jar <JAR file> A JAR file to be imported into the
  20. session. The file might contain
  21. user-defined classes needed for the
  22. execution of statements such as
  23. functions, table sources, or sinks.
  24. Can be used multiple times.
  25. -l,--library <JAR directory> A JAR file directory with which every
  26. new session is initialized. The files
  27. might contain user-defined classes
  28. needed for the execution of
  29. statements such as functions, table
  30. sources, or sinks. Can be used
  31. multiple times.
  32. -pyarch,--pyArchives <arg> Add python archive files for job. The
  33. archive files will be extracted to
  34. the working directory of python UDF
  35. worker. Currently only zip-format is
  36. supported. For each archive file, a
  37. target directory be specified. If the
  38. target directory name is specified,
  39. the archive file will be extracted to
  40. a name can directory with the
  41. specified name. Otherwise, the
  42. archive file will be extracted to a
  43. directory with the same name of the
  44. archive file. The files uploaded via
  45. this option are accessible via
  46. relative path. '#' could be used as
  47. the separator of the archive file
  48. path and the target directory name.
  49. Comma (',') could be used as the
  50. separator to specify multiple archive
  51. files. This option can be used to
  52. upload the virtual environment, the
  53. data files used in Python UDF (e.g.:
  54. --pyArchives
  55. file:///tmp/py37.zip,file:///tmp/data
  56. .zip#data --pyExecutable
  57. py37.zip/py37/bin/python). The data
  58. files could be accessed in Python
  59. UDF, e.g.: f = open('data/data.txt',
  60. 'r').
  61. -pyexec,--pyExecutable <arg> Specify the path of the python
  62. interpreter used to execute the
  63. python UDF worker (e.g.:
  64. --pyExecutable
  65. /usr/local/bin/python3). The python
  66. UDF worker depends on Python 3.5+,
  67. Apache Beam (version == 2.19.0), Pip
  68. (version >= 7.1.0) and SetupTools
  69. (version >= 37.0.0). Please ensure
  70. that the specified environment meets
  71. the above requirements.
  72. -pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
  73. These files will be added to the
  74. PYTHONPATH of both the local client
  75. and the remote python UDF worker. The
  76. standard python resource file
  77. suffixes such as .py/.egg/.zip or
  78. directory are all supported. Comma
  79. (',') could be used as the separator
  80. to specify multiple files (e.g.:
  81. --pyFiles
  82. file:///tmp/myresource.zip,hdfs:///$n
  83. amenode_address/myresource2.zip).
  84. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
  85. defines the third-party dependencies.
  86. These dependencies will be installed
  87. and added to the PYTHONPATH of the
  88. python UDF worker. A directory which
  89. contains the installation packages of
  90. these dependencies could be specified
  91. optionally. Use '#' as the separator
  92. if the optional parameter exists
  93. (e.g.: --pyRequirements
  94. file:///tmp/requirements.txt#file:///
  95. tmp/cached_dir).
  96. -s,--session <session identifier> The identifier for a session.
  97. 'default' is the default identifier.
  98. -u,--update <SQL update statement> Experimental (for testing only!):
  99. Instructs the SQL Client to
  100. immediately execute the given update
  101. statement after starting up. The
  102. process is shut down after the
  103. statement has been submitted to the
  104. cluster and returns an appropriate
  105. return code. Currently, this feature
  106. is only supported for INSERT INTO
  107. statements that declare the target
  108. sink table.

Back to top

Environment Files

A SQL query needs a configuration environment in which it is executed. The so-called environment files define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment.

Every environment file is a regular YAML file. An example of such a file is presented below.

  1. # Define tables here such as sources, sinks, views, or temporal tables.
  2. tables:
  3. - name: MyTableSource
  4. type: source-table
  5. update-mode: append
  6. connector:
  7. type: filesystem
  8. path: "/path/to/something.csv"
  9. format:
  10. type: csv
  11. fields:
  12. - name: MyField1
  13. data-type: INT
  14. - name: MyField2
  15. data-type: VARCHAR
  16. line-delimiter: "\n"
  17. comment-prefix: "#"
  18. schema:
  19. - name: MyField1
  20. data-type: INT
  21. - name: MyField2
  22. data-type: VARCHAR
  23. - name: MyCustomView
  24. type: view
  25. query: "SELECT MyField2 FROM MyTableSource"
  26. # Define user-defined functions here.
  27. functions:
  28. - name: myUDF
  29. from: class
  30. class: foo.bar.AggregateUDF
  31. constructor:
  32. - 7.6
  33. - false
  34. # Define available catalogs
  35. catalogs:
  36. - name: catalog_1
  37. type: hive
  38. property-version: 1
  39. hive-conf-dir: ...
  40. - name: catalog_2
  41. type: hive
  42. property-version: 1
  43. default-database: mydb2
  44. hive-conf-dir: ...
  45. # Properties that change the fundamental execution behavior of a table program.
  46. execution:
  47. planner: blink # optional: either 'blink' (default) or 'old'
  48. type: streaming # required: execution mode either 'batch' or 'streaming'
  49. result-mode: table # required: either 'table' or 'changelog'
  50. max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
  51. # 'table' mode (1000000 by default, smaller 1 means unlimited)
  52. time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
  53. parallelism: 1 # optional: Flink's parallelism (1 by default)
  54. periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  55. max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
  56. min-idle-state-retention: 0 # optional: table program's minimum idle state time
  57. max-idle-state-retention: 0 # optional: table program's maximum idle state time
  58. current-catalog: catalog_1 # optional: name of the current catalog of the session ('default_catalog' by default)
  59. current-database: mydb1 # optional: name of the current database of the current catalog
  60. # (default database of the current catalog by default)
  61. restart-strategy: # optional: restart strategy
  62. type: fallback # "fallback" to global restart strategy by default
  63. # Configuration options for adjusting and tuning table programs.
  64. # A full list of options and their default values can be found
  65. # on the dedicated "Configuration" page.
  66. configuration:
  67. table.optimizer.join-reorder-enabled: true
  68. table.exec.spill-compression.enabled: true
  69. table.exec.spill-compression.block-size: 128kb
  70. # Properties that describe the cluster to which table programs are submitted to.
  71. deployment:
  72. response-timeout: 5000

This configuration:

  • defines an environment with a table source MyTableSource that reads from a CSV file,
  • defines a view MyCustomView that declares a virtual table using a SQL query,
  • defines a user-defined function myUDF that can be instantiated using the class name and two constructor parameters,
  • connects to two Hive catalogs and uses catalog_1 as the current catalog with mydb1 as the current database of the catalog,
  • uses the blink planner in streaming mode for running statements with event-time characteristic and a parallelism of 1,
  • runs exploratory queries in the table result mode,
  • and makes some planner adjustments around join reordering and spilling via configuration options.

Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (defaults environment file using --defaults) as well as on a per-session basis (session environment file using --environment). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in every session whereas the session environment file only declares a specific state retention time and parallelism. Both default and session environment files can be passed when starting the CLI application. If no default environment file has been specified, the SQL Client searches for ./conf/sql-client-defaults.yaml in Flink’s configuration directory.

Attention Properties that have been set within a CLI session (e.g. using the SET command) have highest precedence:

  1. CLI commands > session environment file > defaults environment file

Restart Strategies

Restart strategies control how Flink jobs are restarted in case of a failure. Similar to global restart strategies for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file.

The following strategies are supported:

  1. execution:
  2. # falls back to the global strategy defined in flink-conf.yaml
  3. restart-strategy:
  4. type: fallback
  5. # job fails directly and no restart is attempted
  6. restart-strategy:
  7. type: none
  8. # attempts a given number of times to restart the job
  9. restart-strategy:
  10. type: fixed-delay
  11. attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE)
  12. delay: 10000 # delay in ms between retries (default: 10 s)
  13. # attempts as long as the maximum number of failures per time interval is not exceeded
  14. restart-strategy:
  15. type: failure-rate
  16. max-failures-per-interval: 1 # retries in interval until failing (default: 1)
  17. failure-rate-interval: 60000 # measuring interval in ms for failure rate
  18. delay: 10000 # delay in ms between retries (default: 10 s)

Back to top

Dependencies

The SQL Client does not require to setup a Java project using Maven or SBT. Instead, you can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR file separately (using --jar) or define entire library directories (using --library). For connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON), Flink provides ready-to-use JAR bundles. These JAR files can be downloaded for each release from the Maven central repository.

The full list of offered SQL JARs and documentation about how to use them can be found on the connection to external systems page.

The following example shows an environment file that defines a table source reading JSON data from Apache Kafka.

  1. tables:
  2. - name: TaxiRides
  3. type: source-table
  4. update-mode: append
  5. connector:
  6. property-version: 1
  7. type: kafka
  8. version: "0.11"
  9. topic: TaxiRides
  10. startup-mode: earliest-offset
  11. properties:
  12. bootstrap.servers: localhost:9092
  13. group.id: testGroup
  14. format:
  15. property-version: 1
  16. type: json
  17. schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
  18. schema:
  19. - name: rideId
  20. data-type: BIGINT
  21. - name: lon
  22. data-type: FLOAT
  23. - name: lat
  24. data-type: FLOAT
  25. - name: rowTime
  26. data-type: TIMESTAMP(3)
  27. rowtime:
  28. timestamps:
  29. type: "from-field"
  30. from: "rideTime"
  31. watermarks:
  32. type: "periodic-bounded"
  33. delay: "60000"
  34. - name: procTime
  35. data-type: TIMESTAMP(3)
  36. proctime: true

The resulting schema of the TaxiRide table contains most of the fields of the JSON schema. Furthermore, it adds a rowtime attribute rowTime and processing-time attribute procTime.

Both connector and format allow to define a property version (which is currently version 1) for future backwards compatibility.

Back to top

User-defined Functions

The SQL Client allows users to create custom, user-defined functions to be used in SQL queries. Currently, these functions are restricted to be defined programmatically in Java/Scala classes or Python files.

In order to provide a Java/Scala user-defined function, you need to first implement and compile a function class that extends ScalarFunction, AggregateFunction or TableFunction (see User-defined Functions). One or more functions can then be packaged into a dependency JAR for the SQL Client.

In order to provide a Python user-defined function, you need to write a Python function and decorate it with the pyflink.table.udf.udf or pyflink.table.udf.udtf decorator (see Python UDFs). One or more functions can then be placed into a Python file. The Python file and related dependencies need to be specified via the configuration (see Python Configuration) in environment file or the command line options (see Command Line Usage).

All functions must be declared in an environment file before being called. For each item in the list of functions, one must specify

  • a name under which the function is registered,
  • the source of the function using from (restricted to be class (Java/Scala UDF) or python (Python UDF) for now),

The Java/Scala UDF must specify:

  • the class which indicates the fully qualified class name of the function and an optional list of constructor parameters for instantiation.

The Python UDF must specify:

  • the fully-qualified-name which indicates the fully qualified name, i.e the “[module name].[object name]” of the function.
  1. functions:
  2. - name: java_udf # required: name of the function
  3. from: class # required: source of the function
  4. class: ... # required: fully qualified class name of the function
  5. constructor: # optional: constructor parameters of the function class
  6. - ... # optional: a literal parameter with implicit type
  7. - class: ... # optional: full class name of the parameter
  8. constructor: # optional: constructor parameters of the parameter's class
  9. - type: ... # optional: type of the literal parameter
  10. value: ... # optional: value of the literal parameter
  11. - name: python_udf # required: name of the function
  12. from: python # required: source of the function
  13. fully-qualified-name: ... # required: fully qualified class name of the function

For Java/Scala UDF, make sure that the order and types of the specified parameters strictly match one of the constructors of your function class.

Constructor Parameters

Depending on the user-defined function, it might be necessary to parameterize the implementation before using it in SQL statements.

As shown in the example before, when declaring a user-defined function, a class can be configured using constructor parameters in one of the following three ways:

A literal value with implicit type: The SQL Client will automatically derive the type according to the literal value itself. Currently, only values of BOOLEAN, INT, DOUBLE and VARCHAR are supported here. If the automatic derivation does not work as expected (e.g., you need a VARCHAR false), use explicit types instead.

  1. - true # -> BOOLEAN (case sensitive)
  2. - 42 # -> INT
  3. - 1234.222 # -> DOUBLE
  4. - foo # -> VARCHAR

A literal value with explicit type: Explicitly declare the parameter with type and value properties for type-safety.

  1. - type: DECIMAL
  2. value: 11111111111111111

The table below illustrates the supported Java parameter types and the corresponding SQL type strings.

Java typeSQL type
java.math.BigDecimalDECIMAL
java.lang.BooleanBOOLEAN
java.lang.ByteTINYINT
java.lang.DoubleDOUBLE
java.lang.FloatREAL, FLOAT
java.lang.IntegerINTEGER, INT
java.lang.LongBIGINT
java.lang.ShortSMALLINT
java.lang.StringVARCHAR

More types (e.g., TIMESTAMP or ARRAY), primitive types, and null are not supported yet.

A (nested) class instance: Besides literal values, you can also create (nested) class instances for constructor parameters by specifying the class and constructor properties. This process can be recursively performed until all the constructor parameters are represented with literal values.

  1. - class: foo.bar.paramClass
  2. constructor:
  3. - StarryName
  4. - class: java.lang.Integer
  5. constructor:
  6. - class: java.lang.String
  7. constructor:
  8. - type: VARCHAR
  9. value: 3

Back to top

Catalogs

Catalogs can be defined as a set of YAML properties and are automatically registered to the environment upon starting SQL Client.

Users can specify which catalog they want to use as the current catalog in SQL CLI, and which database of the catalog they want to use as the current database.

  1. catalogs:
  2. - name: catalog_1
  3. type: hive
  4. property-version: 1
  5. default-database: mydb2
  6. hive-conf-dir: <path of Hive conf directory>
  7. - name: catalog_2
  8. type: hive
  9. property-version: 1
  10. hive-conf-dir: <path of Hive conf directory>
  11. execution:
  12. ...
  13. current-catalog: catalog_1
  14. current-database: mydb1

For more information about catalogs, see Catalogs.

Detached SQL Queries

In order to define end-to-end SQL pipelines, SQL’s INSERT INTO statement can be used for submitting long-running, detached queries to a Flink cluster. These queries produce their results into an external system instead of the SQL Client. This allows for dealing with higher parallelism and larger amounts of data. The CLI itself does not have any control over a detached query after submission.

  1. INSERT INTO MyTableSink SELECT * FROM MyTableSource

The table sink MyTableSink has to be declared in the environment file. See the connection page for more information about supported external systems and their configuration. An example for an Apache Kafka table sink is shown below.

  1. tables:
  2. - name: MyTableSink
  3. type: sink-table
  4. update-mode: append
  5. connector:
  6. property-version: 1
  7. type: kafka
  8. version: "0.11"
  9. topic: OutputTopic
  10. properties:
  11. bootstrap.servers: localhost:9092
  12. group.id: testGroup
  13. format:
  14. property-version: 1
  15. type: json
  16. derive-schema: true
  17. schema:
  18. - name: rideId
  19. data-type: BIGINT
  20. - name: lon
  21. data-type: FLOAT
  22. - name: lat
  23. data-type: FLOAT
  24. - name: rideTime
  25. data-type: TIMESTAMP(3)

The SQL Client makes sure that a statement is successfully submitted to the cluster. Once the query is submitted, the CLI will show information about the Flink job.

  1. [INFO] Table update statement has been successfully submitted to the cluster:
  2. Cluster ID: StandaloneClusterId
  3. Job ID: 6f922fe5cba87406ff23ae4a7bb79044
  4. Web interface: http://localhost:8081

Attention The SQL Client does not track the status of the running Flink job after submission. The CLI process can be shutdown after the submission without affecting the detached query. Flink’s restart strategy takes care of the fault-tolerance. A query can be cancelled using Flink’s web interface, command-line, or REST API.

Back to top

SQL Views

Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general INSERT INTO or SELECT statement.

Views can either be defined in environment files or within the CLI session.

The following example shows how to define multiple views in a file. The views are registered in the order in which they are defined in the environment file. Reference chains such as view A depends on view B depends on view C are supported.

  1. tables:
  2. - name: MyTableSource
  3. # ...
  4. - name: MyRestrictedView
  5. type: view
  6. query: "SELECT MyField2 FROM MyTableSource"
  7. - name: MyComplexView
  8. type: view
  9. query: >
  10. SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
  11. FROM MyTableSource
  12. WHERE MyField2 > 200

Similar to table sources and sinks, views defined in a session environment file have highest precedence.

Views can also be created within a CLI session using the CREATE VIEW statement:

  1. CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;

Views created within a CLI session can also be removed again using the DROP VIEW statement:

  1. DROP VIEW MyNewView;

Attention The definition of views in the CLI is limited to the mentioned syntax above. Defining a schema for views or escaping whitespaces in table names will be supported in future versions.

Back to top

Temporal Tables

A temporal table allows for a (parameterized) view on a changing history table that returns the content of a table at a specific point in time. This is especially useful for joining a table with the content of another table at a particular timestamp. More information can be found in the temporal table joins page.

The following example shows how to define a temporal table SourceTemporalTable:

  1. tables:
  2. # Define the table source (or view) that contains updates to a temporal table
  3. - name: HistorySource
  4. type: source-table
  5. update-mode: append
  6. connector: # ...
  7. format: # ...
  8. schema:
  9. - name: integerField
  10. data-type: INT
  11. - name: stringField
  12. data-type: STRING
  13. - name: rowtimeField
  14. data-type: TIMESTAMP(3)
  15. rowtime:
  16. timestamps:
  17. type: from-field
  18. from: rowtimeField
  19. watermarks:
  20. type: from-source
  21. # Define a temporal table over the changing history table with time attribute and primary key
  22. - name: SourceTemporalTable
  23. type: temporal-table
  24. history-table: HistorySource
  25. primary-key: integerField
  26. time-attribute: rowtimeField # could also be a proctime field

As shown in the example, definitions of table sources, views, and temporal tables can be mixed with each other. They are registered in the order in which they are defined in the environment file. For example, a temporal table can reference a view which can depend on another view or table source.

Back to top

Limitations & Future

The current SQL Client implementation is in a very early development stage and might change in the future as part of the bigger Flink Improvement Proposal 24 (FLIP-24). Feel free to join the discussion and open issue about bugs and features that you find useful.

Back to top