SQL Client

Flink’s Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.

The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

Animated demo of the Flink SQL Client CLI running table programs on a cluster

Getting Started

This section describes how to setup and run your first Flink SQL program from the command-line.

The SQL Client is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the Cluster & Deployment part. If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command:

  1. ./bin/start-cluster.sh

Starting the SQL Client CLI

The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded mode is supported, and default mode is embedded. You can start the CLI by calling:

  1. ./bin/sql-client.sh

or explicitly use embedded mode:

  1. ./bin/sql-client.sh embedded

Running SQL Queries

Once the CLI has been started, you can use the HELP command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the Enter key to execute it:

  1. SELECT 'Hello World';

This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the Q key.

The CLI supports three modes for maintaining and visualizing results.

The table mode materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:

  1. SET sql-client.execution.result-mode=table;

The changelog mode does not materialize results and visualizes the result stream that is produced by a continuous query consisting of insertions (+) and retractions (-).

  1. SET sql-client.execution.result-mode=changelog;

The tableau mode is more like a traditional way which will display the results in the screen directly with a tableau format. The displaying content will be influenced by the query execution type(execution.type).

  1. SET sql-client.execution.result-mode=tableau;

Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. Otherwise, if you want to terminate a running query, just type CTRL-C in this case, the job and the printing will be stopped.

You can use the following query to see all the result modes in action:

  1. SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

This query performs a bounded word count example.

In changelog mode, the visualized changelog should be similar to:

  1. + Bob, 1
  2. + Alice, 1
  3. + Greg, 1
  4. - Bob, 1
  5. + Bob, 2

In table mode, the visualized result table is continuously updated until the table program ends with:

  1. Bob, 2
  2. Alice, 1
  3. Greg, 1

In tableau mode, if you ran the query in streaming mode, the displayed result would be:

  1. +-----+----------------------+----------------------+
  2. | +/- | name | cnt |
  3. +-----+----------------------+----------------------+
  4. | + | Bob | 1 |
  5. | + | Alice | 1 |
  6. | + | Greg | 1 |
  7. | - | Bob | 1 |
  8. | + | Bob | 2 |
  9. +-----+----------------------+----------------------+
  10. Received a total of 5 rows

And if you ran the query in batch mode, the displayed result would be:

  1. +-------+-----+
  2. | name | cnt |
  3. +-------+-----+
  4. | Alice | 1 |
  5. | Bob | 2 |
  6. | Greg | 1 |
  7. +-------+-----+
  8. 3 rows in set

All these result modes can be useful during the prototyping of SQL queries. In all these modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured maximum number of rows (sql-client.execution.max-table-result.rows).

Attention Queries that are executed in a batch environment, can only be retrieved using the table or tableau result mode.

After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The configuration section explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.

Configuration

SQL Client startup options

The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.

  1. ./bin/sql-client.sh --help
  2. Mode "embedded" (default) submits Flink jobs from the local machine.
  3. Syntax: [embedded] [OPTIONS]
  4. "embedded" mode options:
  5. -d,--defaults <environment file> Deprecated feature: the environment
  6. properties with which every new
  7. session is initialized. Properties
  8. might be overwritten by session
  9. properties.
  10. -e,--environment <environment file> Deprecated feature: the environment
  11. properties to be imported into the
  12. session. It might overwrite default
  13. environment properties.
  14. -f,--file <script file> Script file that should be executed.
  15. In this mode, the client will not
  16. open an interactive terminal.
  17. -h,--help Show the help message with
  18. descriptions of all options.
  19. -hist,--history <History file path> The file which you want to save the
  20. command history into. If not
  21. specified, we will auto-generate one
  22. under your user's home directory.
  23. -i,--init <initialization file> Script file that used to init the
  24. session context. If get error in
  25. execution, the sql client will exit.
  26. Notice it's not allowed to add query
  27. or insert into the init file.
  28. -j,--jar <JAR file> A JAR file to be imported into the
  29. session. The file might contain
  30. user-defined classes needed for the
  31. execution of statements such as
  32. functions, table sources, or sinks.
  33. Can be used multiple times.
  34. -l,--library <JAR directory> A JAR file directory with which every
  35. new session is initialized. The files
  36. might contain user-defined classes
  37. needed for the execution of
  38. statements such as functions, table
  39. sources, or sinks. Can be used
  40. multiple times.
  41. -pyarch,--pyArchives <arg> Add python archive files for job. The
  42. archive files will be extracted to
  43. the working directory of python UDF
  44. worker. Currently only zip-format is
  45. supported. For each archive file, a
  46. target directory be specified. If the
  47. target directory name is specified,
  48. the archive file will be extracted to
  49. a directory with the
  50. specified name. Otherwise, the
  51. archive file will be extracted to a
  52. directory with the same name of the
  53. archive file. The files uploaded via
  54. this option are accessible via
  55. relative path. '#' could be used as
  56. the separator of the archive file
  57. path and the target directory name.
  58. Comma (',') could be used as the
  59. separator to specify multiple archive
  60. files. This option can be used to
  61. upload the virtual environment, the
  62. data files used in Python UDF (e.g.:
  63. --pyArchives
  64. file:///tmp/py37.zip,file:///tmp/data
  65. .zip#data --pyExecutable
  66. py37.zip/py37/bin/python). The data
  67. files could be accessed in Python
  68. UDF, e.g.: f = open('data/data.txt',
  69. 'r').
  70. -pyexec,--pyExecutable <arg> Specify the path of the python
  71. interpreter used to execute the
  72. python UDF worker (e.g.:
  73. --pyExecutable
  74. /usr/local/bin/python3). The python
  75. UDF worker depends on Python 3.6+,
  76. Apache Beam (version == 2.27.0), Pip
  77. (version >= 7.1.0) and SetupTools
  78. (version >= 37.0.0). Please ensure
  79. that the specified environment meets
  80. the above requirements.
  81. -pyfs,--pyFiles <pythonFiles> Attach custom files for job.
  82. The standard resource file suffixes
  83. such as .py/.egg/.zip/.whl or
  84. directory are all supported. These
  85. files will be added to the PYTHONPATH
  86. of both the local client and the
  87. remote python UDF worker. Files
  88. suffixed with .zip will be extracted
  89. and added to PYTHONPATH. Comma (',')
  90. could be used as the separator to
  91. specify multiple files (e.g.:
  92. --pyFiles
  93. file:///tmp/myresource.zip,hdfs:///$n
  94. amenode_address/myresource2.zip).
  95. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
  96. defines the third-party dependencies.
  97. These dependencies will be installed
  98. and added to the PYTHONPATH of the
  99. python UDF worker. A directory which
  100. contains the installation packages of
  101. these dependencies could be specified
  102. optionally. Use '#' as the separator
  103. if the optional parameter exists
  104. (e.g.: --pyRequirements
  105. file:///tmp/requirements.txt#file:///
  106. tmp/cached_dir).
  107. -s,--session <session identifier> The identifier for a session.
  108. 'default' is the default identifier.
  109. -u,--update <SQL update statement> Deprecated Experimental (for testing
  110. only!) feature: Instructs the SQL
  111. Client to immediately execute the
  112. given update statement after starting
  113. up. The process is shut down after
  114. the statement has been submitted to
  115. the cluster and returns an
  116. appropriate return code. Currently,
  117. this feature is only supported for
  118. INSERT INTO statements that declare
  119. the target sink table.Please use
  120. option -f to submit update statement.

SQL Client Configuration

KeyDefaultTypeDescription
sql-client.execution.max-table-result.rows

Batch Streaming
1000000IntegerThe number of rows to cache when in the table mode. If the number of rows exceeds the specified value, it retries the row in the FIFO style.
sql-client.execution.result-mode

Batch Streaming
TABLE

Enum

Possible values: [TABLE, CHANGELOG, TABLEAU]
Determine the mode when display the query result. The available values are [‘table’, ‘tableau’, ‘changelog’]. The ‘table’ mode materializes results in memory and visualizes them in a regular, paginated table representation. The ‘changelog’ mode does not materialize results and visualizes the result stream that is produced by a continuous query. The ‘tableau’ mode is more like a traditional way which will display the results in the screen directly with a tableau format.
sql-client.verbose

Batch Streaming
falseBooleanDetermine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause.

Initialize Session Using SQL Files

A SQL query needs a configuration environment in which it is executed. SQL Client supports the -i startup option to execute an initialization SQL file to setup environment when starting up the SQL Client. The so-called initialization SQL file can use DDLs to define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment.

An example of such a file is presented below.

  1. -- Define available catalogs
  2. CREATE CATALOG MyCatalog
  3. WITH (
  4. 'type' = 'hive'
  5. );
  6. USE CATALOG MyCatalog;
  7. -- Define available database
  8. CREATE DATABASE MyDatabase;
  9. USE MyDatabase;
  10. -- Define TABLE
  11. CREATE TABLE MyTable(
  12. MyField1 INT,
  13. MyField2 STRING
  14. ) WITH (
  15. 'connector' = 'filesystem',
  16. 'path' = '/path/to/something',
  17. 'format' = 'csv'
  18. );
  19. -- Define VIEW
  20. CREATE VIEW MyCustomView AS SELECT MyField2 FROM MyTable;
  21. -- Define user-defined functions here.
  22. CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;
  23. -- Properties that change the fundamental execution behavior of a table program.
  24. SET table.planner = blink; -- planner: either 'blink' (default) or 'old'
  25. SET execution.runtime-mode = streaming; -- execution mode either 'batch' or 'streaming'
  26. SET sql-client.execution.result-mode = table; -- available values: 'table', 'changelog' and 'tableau'
  27. SET sql-client.execution.max-table-result.rows = 10000; -- optional: maximum number of maintained rows
  28. SET parallelism.default = 1; -- optional: Flink's parallelism (1 by default)
  29. SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks
  30. SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
  31. SET table.exec.state.ttl=1000; -- optional: table program's idle state time
  32. SET restart-strategy = fixed-delay;
  33. -- Configuration options for adjusting and tuning table programs.
  34. SET table.optimizer.join-reorder-enabled = true;
  35. SET table.exec.spill-compression.enabled = true;
  36. SET table.exec.spill-compression.block-size = 128kb;

This configuration:

  • connects to Hive catalogs and uses MyCatalog as the current catalog with MyDatabase as the current database of the catalog,
  • defines a table MyTableSource that can read data from a CSV file,
  • defines a view MyCustomView that declares a virtual table using a SQL query,
  • defines a user-defined function myUDF that can be instantiated using the class name,
  • uses the blink planner in streaming mode for running statements and a parallelism of 1,
  • runs exploratory queries in the table result mode,
  • and makes some planner adjustments around join reordering and spilling via configuration options.

When using -i <init.sql> option to initialize SQL Client session, the following statements are allowed in an initialization SQL file:

  • DDL(CREATE/DROP/ALTER),
  • USE CATALOG/DATABASE,
  • LOAD/UNLOAD MODULE,
  • SET command,
  • RESET command.

When execute queries or insert statements, please enter the interactive mode or use the -f option to submit the SQL statements.

Attention If SQL Client meets errors in initialization, SQL Client will exit with error messages.

Dependencies

The SQL Client does not require to setup a Java project using Maven or SBT. Instead, you can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR file separately (using --jar) or define entire library directories (using --library). For connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON), Flink provides ready-to-use JAR bundles. These JAR files can be downloaded for each release from the Maven central repository.

The full list of offered SQL JARs and documentation about how to use them can be found on the connection to external systems page.

Use SQL Client to submit job

SQL Client allows users to submit jobs either within the interactive command line or using -f option to execute sql file.

In both modes, SQL Client supports to parse and execute all types of the Flink supported SQL statements.

Interactive Command Line

In interactive Command Line, the SQL Client reads user inputs and executes the statement when getting semicolon (;).

SQL Client will print success message if the statement is executed successfully. When getting errors, SQL Client will also print error messages. By default, the error message only contains the error cause. In order to print the full exception stack for debugging, please set the sql-client.verbose to true through command SET sql-client.verbose = true;.

Execute SQL Files

SQL Client supports to execute a SQL script file with the -f option. SQL Client will execute statements one by one in the SQL script file and print execution messages for each executed statements. Once a statement is failed, the SQL Client will exist and all the remaining statements will not be executed.

An example of such a file is presented below.

  1. CREATE TEMPORARY TABLE users (
  2. user_id BIGINT,
  3. user_name STRING,
  4. user_level STRING,
  5. region STRING,
  6. PRIMARY KEY (user_id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'upsert-kafka',
  9. 'topic' = 'users',
  10. 'properties.bootstrap.servers' = '...',
  11. 'key.format' = 'csv',
  12. 'value.format' = 'avro'
  13. );
  14. -- set sync mode
  15. SET table.dml-sync=true;
  16. -- set the job name
  17. SET pipeline.name=SqlJob;
  18. -- set the queue that the job submit to
  19. SET yarn.application.queue=root;
  20. -- set the job parallism
  21. SET parallism.default=100;
  22. -- restore from the specific savepoint path
  23. SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
  24. INSERT INTO pageviews_enriched
  25. SELECT *
  26. FROM pageviews AS p
  27. LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
  28. ON p.user_id = u.user_id;

This configuration:

  • defines a temporal table source users that reads from a CSV file,
  • set the properties, e.g job name,
  • set the savepoint path,
  • submit a sql job that load the savepoint from the specified savepoint path.

Attention Comparing to interactive mode, SQL Client will stop execution and exits when meets errors.

Execute a set of SQL statements

SQL Client execute each INSERT INTO statement as a single Flink job. However, this is sometimes not optimal because some part of the pipeline can be reused. SQL Client supports STATEMENT SET syntax to execute a set of SQL statements. This is an equivalent feature with StatementSet in Table API. The STATEMENT SET syntax encloses one or more INSERT INTO statements. All statements in a STATEMENT SET block are holistically optimized and executed as a single Flink job. Joint optimization and execution allows for reusing common intermediate results and can therefore significantly improve the efficiency of executing multiple queries.

Syntax

  1. BEGIN STATEMENT SET;
  2. -- one or more INSERT INTO statements
  3. { INSERT INTO|OVERWRITE <select_statement>; }+
  4. END;

Attention The statements of enclosed in the STATEMENT SET must be separated by a semicolon (;).

SQL CLI

  1. Flink SQL> CREATE TABLE pageviews (
  2. > user_id BIGINT,
  3. > page_id BIGINT,
  4. > viewtime TIMESTAMP,
  5. > proctime AS PROCTIME()
  6. > ) WITH (
  7. > 'connector' = 'kafka',
  8. > 'topic' = 'pageviews',
  9. > 'properties.bootstrap.servers' = '...',
  10. > 'format' = 'avro'
  11. > );
  12. [INFO] Execute statement succeed.
  13. Flink SQL> CREATE TABLE pageview (
  14. > page_id BIGINT,
  15. > cnt BIGINT
  16. > ) WITH (
  17. > 'connector' = 'jdbc',
  18. > 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  19. > 'table-name' = 'pageview'
  20. > );
  21. [INFO] Execute statement succeed.
  22. Flink SQL> CREATE TABLE uniqueview (
  23. > page_id BIGINT,
  24. > cnt BIGINT
  25. > ) WITH (
  26. > 'connector' = 'jdbc',
  27. > 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  28. > 'table-name' = 'uniqueview'
  29. > );
  30. [INFO] Execute statement succeed.
  31. Flink SQL> BEGIN STATEMENT SET;
  32. [INFO] Begin a statement set.
  33. Flink SQL> INSERT INTO pageviews
  34. > SELECT page_id, count(1)
  35. > FROM pageviews
  36. > GROUP BY page_id;
  37. [INFO] Add SQL update statement to the statement set.
  38. Flink SQL> INSERT INTO uniqueview
  39. > SELECT page_id, count(distinct user_id)
  40. > FROM pageviews
  41. > GROUP BY page_id;
  42. [INFO] Add SQL update statement to the statement set.
  43. Flink SQL> END;
  44. [INFO] Submitting SQL update statement to the cluster...
  45. [INFO] SQL update statement has been successfully submitted to the cluster:
  46. Job ID: 6b1af540c0c0bb3fcfcad50ac037c862

SQL File

  1. CREATE TABLE pageviews (
  2. user_id BIGINT,
  3. page_id BIGINT,
  4. viewtime TIMESTAMP,
  5. proctime AS PROCTIME()
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. 'topic' = 'pageviews',
  9. 'properties.bootstrap.servers' = '...',
  10. 'format' = 'avro'
  11. );
  12. CREATE TABLE pageview (
  13. page_id BIGINT,
  14. cnt BIGINT
  15. ) WITH (
  16. 'connector' = 'jdbc',
  17. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  18. 'table-name' = 'pageview'
  19. );
  20. CREATE TABLE uniqueview (
  21. page_id BIGINT,
  22. cnt BIGINT
  23. ) WITH (
  24. 'connector' = 'jdbc',
  25. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  26. 'table-name' = 'uniqueview'
  27. );
  28. BEGIN STATEMENT SET;
  29. INSERT INTO pageviews
  30. SELECT page_id, count(1)
  31. FROM pageviews
  32. GROUP BY page_id;
  33. INSERT INTO uniqueview
  34. SELECT page_id, count(distinct user_id)
  35. FROM pageviews
  36. GROUP BY page_id;
  37. END;

Execute DML statements sync/async

By default, SQL Client executes DML statements asynchronously. That means, SQL Client will submit a job for the DML statement to a Flink cluster, and not wait for the job to finish. So SQL Client can submit multiple jobs at the same time. This is useful for streaming jobs, which are long-running in general.

SQL Client makes sure that a statement is successfully submitted to the cluster. Once the statement is submitted, the CLI will show information about the Flink job.

  1. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
  2. [INFO] Table update statement has been successfully submitted to the cluster:
  3. Cluster ID: StandaloneClusterId
  4. Job ID: 6f922fe5cba87406ff23ae4a7bb79044

Attention The SQL Client does not track the status of the running Flink job after submission. The CLI process can be shutdown after the submission without affecting the detached query. Flink’s restart strategy takes care of the fault-tolerance. A query can be cancelled using Flink’s web interface, command-line, or REST API.

However, for batch users, it’s more common that the next DML statement requires to wait util the previous DML statement finishes. In order to execute DML statements synchronously, you can set table.dml-sync option true in SQL Client.

  1. Flink SQL> SET table.dml-sync = true;
  2. [INFO] Session property has been set.
  3. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
  4. [INFO] Submitting SQL update statement to the cluster...
  5. [INFO] Execute statement in sync mode. Please wait for the execution finish...
  6. [INFO] Complete execution of the SQL update statement.

Attention If you want to terminate the job, just type CTRL-C to cancel the execution.

Start a SQL Job from a savepoint

Flink supports to start the job with specified savepoint. In SQL Client, it’s allowed to use SET command to specify the path of the savepoint.

  1. Flink SQL> SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
  2. [INFO] Session property has been set.
  3. -- all the following DML statements will be restroed from the specified savepoint path
  4. Flink SQL> INSERT INTO ...

When the path to savepoint is specified, Flink will try to restore the state from the savepoint when executing all the following DML statements.

Because the specified savepoint path will affect all the following DML statements, you can use RESET command to reset this config option, i.e. disable restoring from savepoint.

  1. Flink SQL> RESET execution.savepoint.path;
  2. [INFO] Session property has been reset.

For more details about creating and managing savepoints, please refer to Job Lifecycle Management.

Define a Custom Job Name

SQL Client supports to define job name for queries and DML statements through SET command.

  1. Flink SQL> SET pipeline.name= 'kafka-to-hive' ;
  2. [INFO] Session property has been set.
  3. -- all the following DML statements will use the specified job name.
  4. Flink SQL> INSERT INTO ...

Because the specified job name will affect all the following queries and DML statements, you can also use RESET command to reset this configuration, i.e. use default job names.

  1. Flink SQL> RESET pipeline.name;
  2. [INFO] Session property has been reset.

If the option pipeline.name is not specified, SQL Client will generate a default name for the submitted job, e.g. insert-into_<sink_table_name> for INSERT INTO statements.

Compatibility

To be compatible with before, SQL Client still supports to initialize with environment YAML file and allows to SET the key in YAML file. When set the key defined in YAML file, the SQL Client will print the warning messages to inform.

  1. Flink SQL> SET execution.type = batch;
  2. [WARNING] The specified key 'execution.type' is deprecated. Please use 'execution.runtime-mode' instead.
  3. [INFO] Session property has been set.
  4. -- all the following DML statements will be restored from the specified savepoint path
  5. Flink SQL> INSERT INTO ...

When using SET command to print the properties, the SQL Client will also print all the properties. To distinguish the deprecated key, the sql client use the ‘[DEPRECATED]’ as the identifier.

  1. Flink SQL>SET;
  2. execution.runtime-mode=batch
  3. sql-client.execution.result-mode=table
  4. table.planner=blink
  5. [DEPRECATED] execution.planner=blink
  6. [DEPRECATED] execution.result-mode=table
  7. [DEPRECATED] execution.type=batch

If you want to see more information about environment files, please refer to previous docs version

Limitations & Future

The current SQL Client only supports embedded mode. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway, see more in FLIP-24 and FLIP-91.