Wait the light to fall

SQL 客户端

焉知非鱼

SQL Client

SQL Client #

Flink 的 Table & SQL API 使得它可以使用 SQL 语言编写的查询,但是这些查询需要嵌入到一个用 Java 或 Scala 编写的表程序中。而且,这些程序在提交给集群之前需要用构建工具打包。这或多或少限制了 Flink 对 Java/Scala 程序员的使用。

SQL Client 旨在提供一种简单的方式来编写、调试和提交表程序到 Flink 集群,而不需要任何一行 Java 或 Scala 代码。SQL Client CLI 允许在命令行上从运行的分布式应用中检索和可视化实时结果。

img

入门 #

本节介绍如何从命令行设置和运行第一个 Flink SQL 程序。

SQL Client 被捆绑在常规的 Flink 发行版中,因此可以开箱即运行。它只需要一个正在运行的 Flink 集群,在那里可以执行表程序。关于设置 Flink 集群的更多信息,请参见集群和部署部分。如果你只是想试用 SQL Client,也可以使用下面的命令用一个 worker 启动一个本地集群。

./bin/start-cluster.sh

启动 SQL 客户端 CLI #

SQL Client 脚本也位于 Flink 的二进制目录中。在未来,用户将有两种启动 SQL Client CLI 的可能性,一是通过启动一个嵌入式的独立进程,二是通过连接到一个远程 SQL Client 网关。目前只支持嵌入式模式。你可以通过调用来启动 CLI。

./bin/sql-client.sh embedded

默认情况下,SQL 客户端将从位于 ./conf/sql-client-defaults.yaml 的环境文件中读取其配置。有关环境文件结构的更多信息,请参见配置部分

运行 SQL 查询 #

一旦启动 CLI,您可以使用 HELP 命令列出所有可用的 SQL 语句。为了验证你的设置和集群连接,你可以输入第一个 SQL 查询,然后按回车键执行。

SELECT 'Hello World';

这个查询不需要表源(table source),并产生一个单行结果。CLI 将从集群中检索结果,并将其可视化。您可以按Q键关闭结果视图。

CLI 支持三种模式来维护和可视化结果。

table 模式将结果在内存中具体化,并以常规的、分页的表格表示方式将其可视化。可以通过在 CLI 中执行以下命令启用该模式。

SET execution.result-mode=table;

changelog 模式不将结果具体化,而是将由插入(+)和收回(-)组成的连续查询所产生的结果流可视化。

SET execution.result-mode=changelog;

tableau 模式更像是传统的方式,将结果以 tableau 的形式直接显示在屏幕上。显示内容会受到查询执行类型(execute.type)的影响。

SET execution.result-mode=tableau;

请注意,当您使用此模式进行流式查询时,结果将在控制台中连续打印。如果这个查询的输入数据是有边界的,那么在Flink处理完所有输入数据后,作业就会终止,打印也会自动停止。否则,如果你想终止一个正在运行的查询,在这种情况下只要输入CTRL-C键,作业和打印就会停止。

你可以使用下面的查询来查看所有的结果模式。

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

这个查询执行一个有界字数的例子。

在 changelog 模式下,可视化的 changelog 应该类似于。

+ Bob, 1
+ Alice, 1
+ Greg, 1
- Bob, 1
+ Bob, 2

在 table 模式下,可视化的结果表会持续更新,直到表程序结束。

Bob, 2
Alice, 1
Greg, 1

在tableau模式下,如果你在流模式下运行查询,显示的结果将是。

+-----+----------------------+----------------------+
| +/- |                 name |                  cnt |
+-----+----------------------+----------------------+
|   + |                  Bob |                    1 |
|   + |                Alice |                    1 |
|   + |                 Greg |                    1 |
|   - |                  Bob |                    1 |
|   + |                  Bob |                    2 |
+-----+----------------------+----------------------+
Received a total of 5 rows

而如果你在批处理模式下运行查询,显示的结果将是。

+-------+-----+
|  name | cnt |
+-------+-----+
| Alice |   1 |
|   Bob |   2 |
|  Greg |   1 |
+-------+-----+
3 rows in set

在 SQL 查询的原型设计过程中,所有这些结果模式都是有用的。在所有这些模式中,结果都存储在 SQL 客户端的 Java 堆内存中。为了保持CLI界面的响应性,changelog 模式只显示最新的1000个变化。表模式允许浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(max-table-result-rows)的限制。

注意:在批处理环境中执行的查询,只能使用table或tableau结果模式进行检索。

在定义了一个查询后,可以将其作为一个长期运行的、分离的 Flink 作业提交给集群。为此,需要使用 INSERT INTO 语句指定存储结果的目标系统。配置部分解释了如何声明 table source 以读取数据,如何声明 table sink 以写入数据,以及如何配置其他表程序属性。

配置 #

SQL Client 可以通过以下可选的CLI命令来启动。这些命令将在后面的段落中详细讨论。

./bin/sql-client.sh embedded --help

Mode "embedded" submits Flink jobs from the local machine.

  Syntax: embedded [OPTIONS]
  "embedded" mode options:
     -d,--defaults <environment file>      The environment properties with which
                                           every new session is initialized.
                                           Properties might be overwritten by
                                           session properties.
     -e,--environment <environment file>   The environment properties to be
                                           imported into the session. It might
                                           overwrite default environment
                                           properties.
     -h,--help                             Show the help message with
                                           descriptions of all options.
     -hist,--history <History file path>   The file which you want to save the
                                           command history into. If not
                                           specified, we will auto-generate one
                                           under your user's home directory.
     -j,--jar <JAR file>                   A JAR file to be imported into the
                                           session. The file might contain
                                           user-defined classes needed for the
                                           execution of statements such as
                                           functions, table sources, or sinks.
                                           Can be used multiple times.
     -l,--library <JAR directory>          A JAR file directory with which every
                                           new session is initialized. The files
                                           might contain user-defined classes
                                           needed for the execution of
                                           statements such as functions, table
                                           sources, or sinks. Can be used
                                           multiple times.
     -pyarch,--pyArchives <arg>            Add python archive files for job. The
                                           archive files will be extracted to
                                           the working directory of python UDF
                                           worker. Currently only zip-format is
                                           supported. For each archive file, a
                                           target directory be specified. If the
                                           target directory name is specified,
                                           the archive file will be extracted to
                                           a name can directory with the
                                           specified name. Otherwise, the
                                           archive file will be extracted to a
                                           directory with the same name of the
                                           archive file. The files uploaded via
                                           this option are accessible via
                                           relative path. '#' could be used as
                                           the separator of the archive file
                                           path and the target directory name.
                                           Comma (',') could be used as the
                                           separator to specify multiple archive
                                           files. This option can be used to
                                           upload the virtual environment, the
                                           data files used in Python UDF (e.g.:
                                           --pyArchives
                                           file:///tmp/py37.zip,file:///tmp/data
                                           .zip#data --pyExecutable
                                           py37.zip/py37/bin/python). The data
                                           files could be accessed in Python
                                           UDF, e.g.: f = open('data/data.txt',
                                           'r').
     -pyexec,--pyExecutable <arg>          Specify the path of the python
                                           interpreter used to execute the
                                           python UDF worker (e.g.:
                                           --pyExecutable
                                           /usr/local/bin/python3). The python
                                           UDF worker depends on Python 3.5+,
                                           Apache Beam (version == 2.19.0), Pip
                                           (version >= 7.1.0) and SetupTools
                                           (version >= 37.0.0). Please ensure
                                           that the specified environment meets
                                           the above requirements.
     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
                                           These files will be added to the
                                           PYTHONPATH of both the local client
                                           and the remote python UDF worker. The
                                           standard python resource file
                                           suffixes such as .py/.egg/.zip or
                                           directory are all supported. Comma
                                           (',') could be used as the separator
                                           to specify multiple files (e.g.:
                                           --pyFiles
                                           file:///tmp/myresource.zip,hdfs:///$n
                                           amenode_address/myresource2.zip).
     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which
                                           defines the third-party dependencies.
                                           These dependencies will be installed
                                           and added to the PYTHONPATH of the
                                           python UDF worker. A directory which
                                           contains the installation packages of
                                           these dependencies could be specified
                                           optionally. Use '#' as the separator
                                           if the optional parameter exists
                                           (e.g.: --pyRequirements
                                           file:///tmp/requirements.txt#file:///
                                           tmp/cached_dir).
     -s,--session <session identifier>     The identifier for a session.
                                           'default' is the default identifier.
     -u,--update <SQL update statement>    Experimental (for testing only!):
                                           Instructs the SQL Client to
                                           immediately execute the given update
                                           statement after starting up. The
                                           process is shut down after the
                                           statement has been submitted to the
                                           cluster and returns an appropriate
                                           return code. Currently, this feature
                                           is only supported for INSERT INTO
                                           statements that declare the target
                                           sink table.

环境文件 #

一个SQL查询需要一个配置环境来执行。所谓的环境文件定义了可用的目录(catalogs)、table source 和 sink、用户定义的函数以及执行和部署所需的其他属性。

每个环境文件都是一个常规的 YAML 文件。下面是这样一个文件的例子。

# Define tables here such as sources, sinks, views, or temporal tables.

tables:
  - name: MyTableSource
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/path/to/something.csv"
    format:
      type: csv
      fields:
        - name: MyField1
          data-type: INT
        - name: MyField2
          data-type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: "#"
    schema:
      - name: MyField1
        data-type: INT
      - name: MyField2
        data-type: VARCHAR
  - name: MyCustomView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"

# Define user-defined functions here.

functions:
  - name: myUDF
    from: class
    class: foo.bar.AggregateUDF
    constructor:
      - 7.6
      - false

# Define available catalogs

catalogs:
   - name: catalog_1
     type: hive
     property-version: 1
     hive-conf-dir: ...
   - name: catalog_2
     type: hive
     property-version: 1
     default-database: mydb2
     hive-conf-dir: ...

# Properties that change the fundamental execution behavior of a table program.

execution:
  planner: blink                    # optional: either 'blink' (default) or 'old'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time
  current-catalog: catalog_1        # optional: name of the current catalog of the session ('default_catalog' by default)
  current-database: mydb1           # optional: name of the current database of the current catalog
                                    #   (default database of the current catalog by default)
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000

这份配置:

  • 定义了一个表源 MyTableSource 的环境,该表源从 CSV 文件中读取。
  • 定义了一个视图 MyCustomView,该视图使用 SQL 查询声明了一个虚拟表。
  • 定义了一个用户定义的函数 myUDF,可以使用类名和两个构造函数参数来实例化。
  • 连接两个 Hive 目录,并使用 catalog_1 作为当前目录,mydb1 作为目录的当前数据库。
  • 在流式模式下使用 blink planner,运行具有事件时间特性和并行度为1的语句。
  • 在 table 结果模式下运行探索性查询。
  • 并通过配置选项围绕连接重排序和溢出进行一些 planner 调整。

根据使用情况,一个配置可以被分割成多个文件。因此,可以为一般目的创建环境文件(使用 --defaults 创建默认环境文件),也可以为每个会话创建环境文件(使用 --environment 创建会话环境文件)。每一个 CLI 会话都会用默认属性和会话属性来初始化。例如,defaults 环境文件可以指定在每个会话中应该可以查询的所有 table source,而 session 环境文件只声明特定的状态保留时间和并行度。在启动CLI应用程序时,可以同时传递默认环境文件和会话环境文件。如果没有指定默认环境文件,SQL Client 会在 Flink 的配置目录下搜索 ./conf/sql-client-defaults.yaml

注意:在 CLI 会话中设置的属性(例如使用SET命令)具有最高优先权。

CLI commands > session environment file > defaults environment file

重启策略 #

重启策略控制 Flink 作业在发生故障时如何重启。与 Flink 集群的全局重启策略类似,可以在环境文件中声明一个更精细的重启配置。

支持以下策略。

execution:
  # falls back to the global strategy defined in flink-conf.yaml
  restart-strategy:
    type: fallback

  # job fails directly and no restart is attempted
  restart-strategy:
    type: none

  # attempts a given number of times to restart the job
  restart-strategy:
    type: fixed-delay
    attempts: 3      # retries before job is declared as failed (default: Integer.MAX_VALUE)
    delay: 10000     # delay in ms between retries (default: 10 s)

  # attempts as long as the maximum number of failures per time interval is not exceeded
  restart-strategy:
    type: failure-rate
    max-failures-per-interval: 1   # retries in interval until failing (default: 1)
    failure-rate-interval: 60000   # measuring interval in ms for failure rate
    delay: 10000                   # delay in ms between retries (default: 10 s)

依赖性 #

SQL Client 不需要使用 Maven 或 SBT 设置一个 Java 项目。相反,你可以将依赖关系作为常规的 JAR 文件传递给集群。你可以单独指定每个 JAR 文件(使用 --jar)或定义整个库目录(使用 --library)。对于连接到外部系统(如 Apache Kafka)的连接器和相应的数据格式(如JSON),Flink 提供了现成的 JAR bundles。这些 JAR 文件可以从 Maven 中央仓库为每个版本下载。

所提供的 SQL JAR 的完整列表和关于如何使用它们的文档可以在连接到外部系统页面上找到。

下面的例子显示了一个环境文件,它定义了一个从 Apache Kafka 读取 JSON 数据的 table source。

tables:
  - name: TaxiRides
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: TaxiRides
      startup-mode: earliest-offset
      properties:
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rowTime
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: "from-field"
            from: "rideTime"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"
      - name: procTime
        data-type: TIMESTAMP(3)
        proctime: true

由此产生的 TaxiRide 表的模式(schema)包含了 JSON 模式(schema)的大部分字段。此外,它还增加了一个行时间属性 rowTime 和处理时间属性 procTime

connectorformat 都允许定义一个属性版本(目前是版本1),以便于将来向后兼容。

用户定义的函数 #

SQL Client 允许用户创建自定义的、用户定义的函数,以便在 SQL 查询中使用。目前,这些函数被限制在 Java/Scala 类或 Python 文件中以编程方式定义。

为了提供一个 Java/Scala 用户定义的函数,你需要首先实现和编译一个扩展 ScalarFunction、AggregateFunction 或 TableFunction 的函数类(见用户定义的函数)。然后可以将一个或多个函数打包到 SQL Client 的依赖 JAR 中。

为了提供一个 Python 用户定义函数,你需要编写一个 Python 函数,并用 pyflink.table.udf.udfpyflink.table.udf.udtf 装饰器来装饰它(见 Python UDFs)。然后可以将一个或多个函数放入一个 Python 文件中。Python 文件和相关的依赖关系需要通过环境文件中的配置 (参见 Python 配置) 或命令行选项 (参见命令行用法) 来指定。

所有函数在被调用之前必须在环境文件中声明。对于函数列表中的每一项,必须指定

  • 注册该函数的名称。
  • 函数的源,使用 from (暂时限制为 class (Java/Scala UDF) 或 python (Python UDF))。

Java/Scala UDF 必须指定:

  • class,表示函数的完全限定类名和一个可选的实例化构造参数列表。

Python 的 UDF 必须指定:

  • fully-qualified-name: 表示完全限定的名称,即函数的"[模块名].[对象名]"。
functions:
  - name: java_udf               # required: name of the function
    from: class                  # required: source of the function
    class: ...                   # required: fully qualified class name of the function
    constructor:                 # optional: constructor parameters of the function class
      - ...                      # optional: a literal parameter with implicit type
      - class: ...               # optional: full class name of the parameter
        constructor:             # optional: constructor parameters of the parameter's class
          - type: ...            # optional: type of the literal parameter
            value: ...           # optional: value of the literal parameter
  - name: python_udf             # required: name of the function
    from: python                 # required: source of the function 
    fully-qualified-name: ...    # required: fully qualified class name of the function      

对于 Java/Scala UDF,请确保指定参数的顺序和类型严格匹配你的函数类的一个构造函数。

构造函数参数 #

根据用户定义的函数,在 SQL 语句中使用它之前,可能需要对实现进行参数化。

如前面的例子所示,在声明用户定义函数时,可以通过以下三种方式之一使用构造函数参数来配置类。

一个隐含类型的字面值。SQL Client 会根据字面值本身自动推导出类型。目前,这里只支持 BOOLEAN、INT、DOUBLE 和 VARCHAR 的值。如果自动推导没有达到预期的效果(例如,你需要一个 VARCHAR false),请使用显式类型代替。

- true         # -> BOOLEAN (case sensitive)
- 42           # -> INT
- 1234.222     # -> DOUBLE
- foo          # -> VARCHAR

一个具有明确类型的字面值。明确声明参数的 typevalue 属性,以保证类型安全。

- type: DECIMAL
  value: 11111111111111111

下表说明了支持的 Java 参数类型和相应的 SQL 类型字符串。

Java type SQL type
java.math.BigDecimal DECIMAL
java.lang.Boolean BOOLEAN
java.lang.Byte TINYINT
java.lang.Double DOUBLE
java.lang.Float REAL, FLOAT
java.lang.Integer INTEGER, INT
java.lang.Long BIGINT
java.lang.Short SMALLINT
java.lang.String VARCHAR

目前还不支持更多的类型(如 TIMESTAMP 或 ARRAY)、原语类型和 null。

一个(嵌套的)类实例。除了字面值,你还可以通过指定 classconstructor 函数属性,为构造函数参数创建(嵌套)类实例。这个过程可以递归执行,直到所有的构造参数都用字面值表示。

- class: foo.bar.paramClass
  constructor:
    - StarryName
    - class: java.lang.Integer
      constructor:
        - class: java.lang.String
          constructor:
            - type: VARCHAR
              value: 3

Catalogs #

Catalogs 可以定义为一组 YAML 属性,在启动 SQL Client 时自动注册到环境中。

用户可以在 SQL CLI 中指定要使用哪个目录(catalog)作为当前目录(catalog),以及要使用该目录(catalog)的哪个数据库作为当前数据库。

catalogs:
   - name: catalog_1
     type: hive
     property-version: 1
     default-database: mydb2
     hive-conf-dir: <path of Hive conf directory>
   - name: catalog_2
     type: hive
     property-version: 1
     hive-conf-dir: <path of Hive conf directory>

execution:
   ...
   current-catalog: catalog_1
   current-database: mydb1

关于目录的更多信息,请参见目录

分离式 SQL 查询 #

为了定义端到端 SQL 管道,SQL 的 INSERT INTO 语句可以用于向 Flink 集群提交长期运行的、分离的查询。这些查询将其结果产生到外部系统中,而不是 SQL Client。这允许处理更高的并行性和更大的数据量。CLI 本身对提交后的分离查询没有任何控制。

INSERT INTO MyTableSink SELECT * FROM MyTableSource

表接收器 MyTableSink 必须在环境文件中声明。有关支持的外部系统及其配置的更多信息,请参见连接页面。下面是一个 Apache Kafka 表接收器的例子。

tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

SQL 客户端确保语句成功提交到集群。一旦提交查询,CLI 将显示有关 Flink 作业的信息。

[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 6f922fe5cba87406ff23ae4a7bb79044
Web interface: http://localhost:8081

注意: SQL 客户端在提交后不会跟踪正在运行的 Flink 作业的状态。CLI 进程可以在提交后被关闭,而不影响分离查询。Flink 的重启策略照顾到了容错。可以使用 Flink 的 Web 界面、命令行或 REST API 来取消查询。

SQL 视图 #

视图允许从 SQL 查询中定义虚拟表。视图定义会被立即解析和验证。然而,实际的执行发生在提交一般的 INSERT INTO 或 SELECT 语句期间访问视图时。

视图可以在环境文件中定义,也可以在 CLI 会话中定义。

下面的例子显示了如何在一个文件中定义多个视图。视图是按照它们在环境文件中定义的顺序注册的。支持诸如视图 A 依赖于视图 B 依赖于视图 C 的引用链。

tables:
  - name: MyTableSource
    # ...
  - name: MyRestrictedView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"
  - name: MyComplexView
    type: view
    query: >
      SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
      FROM MyTableSource
      WHERE MyField2 > 200      

与 table source 和 sink 类似,会话环境文件中定义的视图具有最高优先级。

视图也可以在 CLI 会话中使用 CREATE VIEW 语句创建。

CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;

在 CLI 会话中创建的视图也可以使用 DROP VIEW 语句再次删除。

DROP VIEW MyNewView;

注意: CLI 中视图的定义仅限于上述语法。在未来的版本中,将支持为视图定义模式或在表名中转义空格。

临时表 #

临时表允许对变化的历史表进行(参数化的)查看,该表在特定的时间点返回一个表的内容。这对于将一个表与另一个表在特定时间戳的内容连接起来特别有用。更多信息可以在临时表连接页面中找到。

下面的示例展示了如何定义一个临时表 SourceTemporalTable

tables:

  # Define the table source (or view) that contains updates to a temporal table
  - name: HistorySource
    type: source-table
    update-mode: append
    connector: # ...
    format: # ...
    schema:
      - name: integerField
        data-type: INT
      - name: stringField
        data-type: STRING
      - name: rowtimeField
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: rowtimeField
          watermarks:
            type: from-source

  # Define a temporal table over the changing history table with time attribute and primary key
  - name: SourceTemporalTable
    type: temporal-table
    history-table: HistorySource
    primary-key: integerField
    time-attribute: rowtimeField  # could also be a proctime field

如示例中所示,table source、视图和临时表的定义可以相互混合。它们按照在环境文件中定义的顺序进行注册。例如,一个临时表可以引用一个视图,该视图可以依赖于另一个视图或 table source。

限制和未来 #

目前的 SQL Client 只支持嵌入式模式。未来,社区计划通过提供基于 REST 的 SQL Client 网关来扩展其功能,详见 FLIP-24FLIP-91

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html