SQL客户端测试版

Flink的Table&SQL API可以处理用SQL语言编写的查询,但是这些查询需要嵌入在用Java或Scala编写的表程序中。而且,这些程序需要在提交到集群之前使用构建工具打包。这或多或少地限制了Flink对Java / Scala程序员的使用。

SQL客户端旨在提供编写,调试,并提交表格程序到Flink集群的一个简单的方法没有的Java或Scala代码一行。SQL客户端CLI允许检索和命令行中运行分布式应用可视化实时结果。

在群集上运行表程序的Flink SQL Client CLI的动画演示

注意 SQL客户端处于早期开发阶段。即使应用程序还没有生产就绪,它可以是一个非常有用的工具,用于原型设计和Flink SQL。在未来,社区计划通过提供基于REST的SQL客户端网关来扩展其函数

入门

本节介绍如何从命令行设置和运行第一个Flink SQL程序。

SQL客户端捆绑在常规Flink分发中,因此可以开箱即用。它只需要一个正在运行的Flink集群,其中可以执行表程序。有关设置Flink群集的详细信息,请参阅群集和部署部分。如果您只想尝试SQL客户端,还可以使用以下命令启动具有一个worker的本地群集:

  1. ./bin/start-cluster.sh

启动SQL客户端CLI

SQL客户端脚本也位于Flink的二进制目录中。将来,用户可以通过启动嵌入式独立进程或连接到远程SQL客户端网关来启动SQL Client CLI。目前仅embedded支持模式。您可以通过调用以下命令启动CLI:

  1. ./bin/sql-client.sh embedded

默认情况下,SQL客户端将从位于的环境文件中读取其配置./conf/sql-client-defaults.yaml有关环境文件结构的更多信息,请参阅配置部分

运行SQL查询

启动CLI后,您可以使用该HELP命令列出所有可用的SQL语句。要验证您的设置和群集连接,您可以输入第一个SQL查询并Enter按键执行它:

  1. SELECT 'Hello World'

此查询不需要表源,并生成单行结果。CLI将从群集中检索结果并将其可视化。您可以通过按键关闭结果视图Q

CLI支持两种维护和可视化结果的模式

表模式物化在内存中的结果和可视化他们的常客,分页表表示。可以通过在CLI中执行以下命令来启用它:

  1. SET execution.result-mode=table

更改日志模式不实现结果和可视化,其由所产生的结果数据流的连续查询由插入的(+)和撤消(-)。

  1. SET execution.result-mode=changelog

您可以使用以下查询来查看两种结果模式:

  1. SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name

此查询执行有界字数计数示例。

更改日志模式下,可视化更改日志应类似于:

  1. + Bob, 1
  2. + Alice, 1
  3. + Greg, 1
  4. - Bob, 1
  5. + Bob, 2

表模式下,可视化结果表不断更新,直到表程序结束为:

  1. Bob, 2
  2. Alice, 1
  3. Greg, 1

在SQL查询的原型设计期间,两种结果模式都很有用。在这两种模式下,结果都存储在SQL客户端的Java堆内存中。为了使CLI界面保持响应,更改日志模式仅显示最新的1000个更改。表模式允许导航更大的结果,这些结果仅受可用主存储器和配置的最大行数max-table-result-rows)的限制。

注意只能使用table结果模式检索在批处理环境中执行的查询

定义查询后,可以将其作为长时间运行的分离Flink作业提交给集群。为此,需要使用INSERT INTO语句指定存储结果的目标系统配置部分解释了如何申报表源读取数据,如何申报表汇写入数据,以及如何配置其它表程序性能。

配置

可以使用以下可选CLI命令启动SQL Client。它们将在随后的章节中详细讨论。

  1. ./bin/sql-client.sh embedded --help
  2. Mode "embedded" submits Flink jobs from the local machine.
  3. Syntax: embedded [OPTIONS]
  4. "embedded" mode options:
  5. -d,--defaults <environment file> The environment properties with which
  6. every new session is initialized.
  7. Properties might be overwritten by
  8. session properties.
  9. -e,--environment <environment file> The environment properties to be
  10. imported into the session. It might
  11. overwrite default environment
  12. properties.
  13. -h,--help Show the help message with
  14. descriptions of all options.
  15. -j,--jar <JAR file> A JAR file to be imported into the
  16. session. The file might contain
  17. user-defined classes needed for the
  18. execution of statements such as
  19. functions, table sources, or sinks.
  20. Can be used multiple times.
  21. -l,--library <JAR directory> A JAR file directory with which every
  22. new session is initialized. The files
  23. might contain user-defined classes
  24. needed for the execution of
  25. statements such as functions, table
  26. sources, or sinks. Can be used
  27. multiple times.
  28. -s,--session <session identifier> The identifier for a session.
  29. 'default' is the default identifier.

环境文件

SQL查询需要一个执行它的配置环境。所谓的环境文件定义了可用的表源和接收器,外部目录,用户定义的函数以及执行和部署所需的其他属性。

每个环境文件都是常规的YAML文件下面给出了这种文件的一个例子。

  1. # Define table sources and sinks here.
  2. tables:
  3. - name: MyTableSource
  4. type: source
  5. update-mode: append
  6. connector:
  7. type: filesystem
  8. path: "/path/to/something.csv"
  9. format:
  10. type: csv
  11. fields:
  12. - name: MyField1
  13. type: INT
  14. - name: MyField2
  15. type: VARCHAR
  16. line-delimiter: "\n"
  17. comment-prefix: "#"
  18. schema:
  19. - name: MyField1
  20. type: INT
  21. - name: MyField2
  22. type: VARCHAR
  23. # Define table views here.
  24. views:
  25. - name: MyCustomView
  26. query: "SELECT MyField2 FROM MyTableSource"
  27. # Define user-defined functions here.
  28. functions:
  29. - name: myUDF
  30. from: class
  31. class: foo.bar.AggregateUDF
  32. constructor:
  33. - 7.6
  34. - false
  35. # Execution properties allow for changing the behavior of a table program.
  36. execution:
  37. type: streaming # required: execution mode either 'batch' or 'streaming'
  38. result-mode: table # required: either 'table' or 'changelog'
  39. max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
  40. # 'table' mode (1000000 by default, smaller 1 means unlimited)
  41. time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
  42. parallelism: 1 # optional: Flink's parallelism (1 by default)
  43. periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  44. max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
  45. min-idle-state-retention: 0 # optional: table program's minimum idle state time
  46. max-idle-state-retention: 0 # optional: table program's maximum idle state time
  47. restart-strategy: # optional: restart strategy
  48. type: fallback # "fallback" to global restart strategy by default
  49. # Deployment properties allow for describing the cluster to which table programs are submitted to.
  50. deployment:
  51. response-timeout: 5000

这个配置:

  • 定义一个具有MyTableSource从CSV文件读取的表源的环境,
  • 定义一个MyCustomView使用SQL查询声明虚拟表的视图,
  • 定义了一个用户定义的函数myUDF,可以使用类名和两个构造函数参数进行实例化,
  • 为在此流式传输环境中执行的查询指定1的并行度,
  • 指定事件时间特征,和
  • table结果模式下运行查询。根据用例,可以将配置拆分为多个文件。因此,可以为一般用途(默认使用环境文件—defaults)以及每个会话(使用会话环境文件—environment)创建环境文件每个CLI会话都使用默认属性进行初始化,后跟会话属性。例如,默认环境文件可以指定在每个会话中可用于查询的所有表源,而会话环境文件仅声明特定的状态保存时间和并行性。启动CLI应用程序时,可以传递默认和会话环境文件。如果未指定默认环境文件,则SQL客户端将搜索./conf/sql-client-defaults.yaml 在Flink的配置目录中。

注意在CLI会话中设置的属性(例如,使用该SET命令)具有最高优先级:

  1. CLI commands > session environment file > defaults environment file

重启策略

重新启动策略控制在发生故障时如何重新启动Flink作业。Flink集群的全局重新启动策略类似,可以在环境文件中声明更细粒度的重新启动配置。

支持以下策略:

  1. execution:
  2. # falls back to the global strategy defined in flink-conf.yaml
  3. restart-strategy:
  4. type: fallback
  5. # job fails directly and no restart is attempted
  6. restart-strategy:
  7. type: none
  8. # attempts a given number of times to restart the job
  9. restart-strategy:
  10. type: fixed-delay
  11. attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE)
  12. delay: 10000 # delay in ms between retries (default: 10 s)
  13. # attempts as long as the maximum number of failures per time interval is not exceeded
  14. restart-strategy:
  15. type: failure-rate
  16. max-failures-per-interval: 1 # retries in interval until failing (default: 1)
  17. failure-rate-interval: 60000 # measuring interval in ms for failure rate
  18. delay: 10000 # delay in ms between retries (default: 10 s)

依赖

SQL客户端不需要使用Maven或SBT设置Java项目。相反,您可以将依赖项作为提交到集群的常规JAR文件传递。您可以单独指定每个JAR文件(使用—jar),也可以定义整个库目录(使用—library)。对于连接外部系统(如Apache Kafka)和相应数据格式(如JSON)的连接器,Flink提供了即用型JAR包这些JAR文件sql-jar以Maven中央存储库的每个版本为后缀并可以下载。

可以在与外部系统页面连接上找到提供的SQL JAR的完整列表以及有关如何使用它们的文档

以下示例显示了一个环境文件,该文件定义从Apache Kafka读取JSON数据的表源。

  1. tables:
  2. - name: TaxiRides
  3. type: source
  4. update-mode: append
  5. connector:
  6. property-version: 1
  7. type: kafka
  8. version: 0.11
  9. topic: TaxiRides
  10. startup-mode: earliest-offset
  11. properties:
  12. - key: zookeeper.connect
  13. value: localhost:2181
  14. - key: bootstrap.servers
  15. value: localhost:9092
  16. - key: group.id
  17. value: testGroup
  18. format:
  19. property-version: 1
  20. type: json
  21. schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)"
  22. schema:
  23. - name: rideId
  24. type: LONG
  25. - name: lon
  26. type: FLOAT
  27. - name: lat
  28. type: FLOAT
  29. - name: rowTime
  30. type: TIMESTAMP
  31. rowtime:
  32. timestamps:
  33. type: "from-field"
  34. from: "rideTime"
  35. watermarks:
  36. type: "periodic-bounded"
  37. delay: "60000"
  38. - name: procTime
  39. type: TIMESTAMP
  40. proctime: true

生成的TaxiRide模式包含JSON模式的大多数字段。此外,它还添加了rowtime属性rowTime和processing-time属性procTime

双方connectorformat允许定义属性的版本(这是目前版本1),为未来的向后兼容性。

用户定义的函数

SQL客户端允许用户创建要在SQL查询中使用的自定义用户定义函数。目前,这些函数仅限于在Java / Scala类中以编程方式定义。

为了提供用户定义的函数,您需要首先实现和编译扩展的函数类ScalarFunctionAggregateFunction或者TableFunction(参见用户定义的函数)。然后可以将一个或多个函数打包到SQL客户端的依赖项JAR中。

在调用之前,必须在环境文件中声明所有函数。对于列表中的每个项目functions,必须指定

  • a name注册函数的,
  • 使用函数的来源from(限于class现在),
  • class指示函数的完全合格的类名和一个可选列表constructor的实例参数。
  1. functions:
  2. - name: ... # required: name of the function
  3. from: class # required: source of the function (can only be "class" for now)
  4. class: ... # required: fully qualified class name of the function
  5. constructor: # optimal: constructor parameters of the function class
  6. - ... # optimal: a literal parameter with implicit type
  7. - class: ... # optimal: full class name of the parameter
  8. constructor: # optimal: constructor parameters of the parameter's class
  9. - type: ... # optimal: type of the literal parameter
  10. value: ... # optimal: value of the literal parameter

确保指定参数的顺序和类型严格匹配函数类的一个构造函数。

构造函数参数

根据用户定义的函数,可能需要在SQL语句中使用它之前参数化实现。

如前面的示例所示,在声明用户定义的函数时,可以使用以下三种方法之一使用构造函数参数来配置类:

具有隐式类型的文字值: SQL客户端将根据文字值本身自动派生类型。目前,仅值BOOLEANINTDOUBLEVARCHAR在这里的支持。如果自动派生不能按预期工作(例如,您需要VARCHAR false),请改用显式类型。

  1. - true # -> BOOLEAN (case sensitive)
  2. - 42 # -> INT
  3. - 1234.222 # -> DOUBLE
  4. - foo # -> VARCHAR

具有显式类型的文字值:使用类型安全性typevalue属性显式声明参数

  1. - type: DECIMAL
  2. value: 11111111111111111

下表说明了受支持的Java参数类型和相应的SQL类型字符串。

Java类型SQL类型
java.math.BigDecimalDECIMAL
java.lang.BooleanBOOLEAN
java.lang.ByteTINYINT
java.lang.DoubleDOUBLE
java.lang.FloatREALFLOAT
java.lang.IntegerINTEGERINT
java.lang.LongBIGINT
java.lang.ShortSMALLINT
java.lang.StringVARCHAR

更多类型(例如,TIMESTAMPARRAY),原始类型,并且null尚不支持。

(嵌套)类实例:除文字值外,您还可以通过指定classconstructor属性为构造函数参数创建(嵌套)类实例可以递归地执行此过程,直到所有构造函数参数都使用文字值表示。

  1. - class: foo.bar.paramClass
  2. constructor:
  3. - StarryName
  4. - class: java.lang.Integer
  5. constructor:
  6. - class: java.lang.String
  7. constructor:
  8. - type: VARCHAR
  9. value: 3

分离的SQL查询

为了定义端到端SQL管道,SQL INSERT INTO语句可用于向Flink集群提交长时间运行的分离查询。这些查询将结果生成到外部系统而不是SQL客户端。这允许处理更高的并行性和更大量的数据。CLI本身在提交后对分离的查询没有任何控制权。

  1. INSERT INTO MyTableSink SELECT * FROM MyTableSource

MyTableSink必须在环境文件中声明表接收器有关支持的外部系统及其配置的详细信息,请参阅连接页面Apache Kafka表接收器的示例如下所示。

  1. tables:
  2. - name: MyTableSink
  3. type: sink
  4. update-mode: append
  5. connector:
  6. property-version: 1
  7. type: kafka
  8. version: 0.11
  9. topic: OutputTopic
  10. properties:
  11. - key: zookeeper.connect
  12. value: localhost:2181
  13. - key: bootstrap.servers
  14. value: localhost:9092
  15. - key: group.id
  16. value: testGroup
  17. format:
  18. property-version: 1
  19. type: json
  20. derive-schema: true
  21. schema:
  22. - name: rideId
  23. type: LONG
  24. - name: lon
  25. type: FLOAT
  26. - name: lat
  27. type: FLOAT
  28. - name: rideTime
  29. type: TIMESTAMP

SQL客户端确保将语句成功提交到群集。提交查询后,CLI将显示有关Flink作业的信息。

  1. [INFO] Table update statement has been successfully submitted to the cluster:
  2. Cluster ID: StandaloneClusterId
  3. Job ID: 6f922fe5cba87406ff23ae4a7bb79044
  4. Web interface: http://localhost:8081

注意 SQL Client在提交后不会跟踪正在运行的Flink作业的状态。提交后可以关闭CLI进程,而不会影响分离的查询。Flink的重启策略负责容错。可以使用Flink的Web界面,命令行或REST API取消查询。

SQL视图

视图允许从SQL查询定义虚拟表。视图定义将立即进行解析和验证。但是,在提交常规INSERT INTOSELECT语句期间访问视图时会发生实际执行

视图可以在环境文件中定义,也可以在CLI会话中定义。

以下示例显示如何在文件中定义多个视图:

  1. views:
  2. - name: MyRestrictedView
  3. query: "SELECT MyField2 FROM MyTableSource"
  4. - name: MyComplexView
  5. query: >
  6. SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
  7. FROM MyTableSource
  8. WHERE MyField2 > 200

与表源和接收器类似,会话环境文件中定义的视图具有最高优先级。

也可以使用以下CREATE VIEW语句在CLI会话中创建视图

  1. CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource

也可以使用以下DROP VIEW语句再次删除在CLI会话中创建的视图

  1. DROP VIEW MyNewView

注意视图的定义仅限于上面提到的语法。在将来的版本中,将支持为表名中的视图或转义空格定义模式。

局限与未来

当前的SQL客户端实施处于非常早期的开发阶段,并且可能会在未来作为更大的Flink改进提案24(FLIP-24)的一部分进行更改随意加入讨论,并打开有关您认为有用的错误和函数的问题。