Wait the light to fall

Create 语句

焉知非鱼

Create Statements

CREATE 语句

CREATE 语句用于在当前或指定的目录中注册一个表/视图/函数。注册的表/视图/函数可以在 SQL 查询中使用。

Flink SQL 目前支持以下 CREATE 语句。

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

运行一条 CREATE 语句 #

CREATE 语句可以用 TableEnvironment 的 executeSql()方法执行,也可以在 SQL CLI 中执行。executeSql()方法对于一个成功的 CREATE 操作会返回’OK’,否则会抛出一个异常。

下面的例子展示了如何在 TableEnvironment 和 SQL CLI 中运行 CREATE 语句。

// Scala
val settings = EnvironmentSettings.newInstance()...
val tableEnv = TableEnvironment.create(settings)

// SQL query with a registered table
// register a table named "Orders"
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// Execute insert SQL with a registered table
// register a TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
// run an insert SQL on the Table and emit the result to the TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

CREATE TABLE #

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  
<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

用给定的名称创建一个表。如果目录中已经存在同名表,则抛出一个异常。

计算列

计算列是使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它是由一个非查询表达式生成的,这个表达式使用同一张表中的其他列,而不是实际存储在表中。例如,计算列可以定义为 cost AS price * quantity。表达式可以包含物理列、常量、函数或变量的任意组合。表达式不能包含子查询。

计算列在 Flink 中通常用于在 CREATE TABLE 语句中定义时间属性。可以通过 proc AS PROCTIME() 使用系统 proctime() 函数轻松定义一个处理时间属性。另一方面,计算列可以用来派生事件时间列,因为事件时间列可能需要从现有的字段中派生出来,比如原来的字段不是 TIMESTAMP(3)类型,或者嵌套在 JSON 字符串中。

注意:

  • 在源表上定义的计算列是在从源表读取后计算出来的,它可以用在下面的 SELECT 查询语句中。
  • 计算列不能作为 INSERT 语句的目标。在 INSERT 语句中,SELECT 子句的模式应该与没有计算列的目标表的模式相匹配。

WATERMARK

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

rowtime_column_name 定义了一个现有的列,该列被标记为表的事件时间属性。这个列的类型必须是 TIMESTAMP(3),并且是模式中的顶层列。它可以是一个计算列。

watermark_strategy_expression 定义了水印生成策略。它允许任意的非查询表达式,包括计算列,来计算水印。表达式的返回类型必须是 TIMESTAMP(3),它表示自 Epoch 以来的时间戳。只有当返回的水印是非空的,并且它的值大于之前发出的本地水印时,才会发出水印(以保留升水印的契约)。水印生成表达式由框架对每条记录进行评估。框架将定期发射最大的生成水印。如果当前的水印仍然与上一个水印相同,或者是空的,或者返回的水印值小于上一次发射的水印值,那么将不会发射新的水印。水印是在 pipeline.auto-watermark-interval 配置定义的时间间隔内发出的。如果水印间隔为 0ms,如果生成的水印不是空的,并且大于最后一个水印,则每条记录都会发出水印。

当使用事件时间语义时,表必须包含事件时间属性和水印策略。

Flink 提供了几种常用的水印策略。

  • 严格的升序时间戳。WATERMARK FOR rowtime_column AS rowtime_column。

  • 发出迄今为止观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。

  • 升序时间戳。WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND.

  • 发出迄今为止观察到的最大时间戳的水印减 1。时间戳等于或小于最大时间戳的行不会迟到。

  • 绑定出顺序性的时间戳。WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit.

发出水印,水印是最大观察到的时间戳减去指定的延迟,例如:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是 5 秒的延迟水印策略。

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
PRIMARY KEY

PRIMARY KEY

Flink 利用优化的一个提示。它告诉我们一个表或视图的一列或一组列是唯一的,它们不包含空值。主键中的两列都不能为空。因此,主键可以唯一地识别表中的某一行。

主键约束既可以和列定义一起声明(列约束),也可以作为单行(表约束)。对于这两种情况,只能将其声明为一个单子。如果你同时定义了多个主键约束,就会抛出一个异常。

有效性检查

SQL 标准规定,一个约束可以是 ENFORCED 或 NOT ENFORCED。这控制了约束检查是否会在输入/输出数据上执行。Flink 并不拥有数据,因此我们要支持的唯一模式是 NOT ENFORCED 模式。用户要确保查询强制执行密钥的完整性。

Flink 会假设主键的正确性,假设列的空性与主键的列对齐。连接器应该确保这些是对齐的。

注意事项: 在 CREATE TABLE 语句中,创建主键约束会改变列的可空性,也就是说,有主键约束的列是不可空的。

PARTITIONED BY

按指定的列对创建的表进行分区。如果该表被用作文件系统汇,则会为每个分区创建一个目录。

WITH OPTIONS

表属性用于创建表源/接收器。这些属性通常用于查找和创建底层连接器。

表达式 key1=val1 的键和值都应该是字符串文字。关于不同连接器的所有支持的表属性,请参见连接到外部系统中的详细信息。

注释:表名可以有三种格式。表名可以有三种格式。

  1. catalog_name.db_name.table_name
  2. db_name.table_name
  3. table_name。

对于 catalog_name.db_name.table_name,表将被注册到元存储中,目录名为 “catalog_name”,数据库名为 “db_name”;对于 db_name.table_name,表将被注册到执行表环境的当前目录中,数据库名为 “db_name”;对于 table_name,表将被注册到执行表环境的当前目录和数据库中。

注意事项: 用 CREATE TABLE 语句注册的表既可以作为表源,也可以作为表汇,在 DMLs 中没有引用之前,我们不能决定它是作为表源还是表汇使用。

LIKE 子句

LIKE 子句是 SQL 特征的变体/组合(特征 T171,“表定义中的 LIKE 子句"和特征 T173,“表定义中的扩展 LIKE 子句”)。该子句可用于根据现有表的定义创建一个表。此外,用户还可以扩展原表或排除其中的某些部分。与 SQL 标准不同的是,该子句必须在 CREATE 语句的顶层定义。这是因为该子句适用于定义的多个部分,而不仅仅是模式部分。

你可以使用该子句来重用(并可能覆盖)某些连接器属性,或者为外部定义的表添加水印。例如,您可以为 Apache Hive 中定义的表添加水印。

请考虑下面的示例语句。

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- Overwrite the startup-mode
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

由此产生的 Orders_with_watermark 表将等同于用以下语句创建的表。

CREATE TABLE Orders_with_watermark (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

可以用同类选项控制表功能的合并逻辑。

您可以控制以下的合并行为:

  • CONSTRAINTS - 主键和唯一键等约束条件。
  • GENERATED-计算列
  • OPTIONS - 描述连接器和格式属性的连接器选项。
  • PARTITIONS - 表的分区
  • WATERMARKS - 水印声明

有三种不同的合并策略。

  • INCLUDING - 包括源表的特征,对重复的条目失败,例如,如果两个表中都存在相同键的选项。
  • EXCLUDING - 不包含源表的给定特征。
  • OVERWRITING - 包括源表的特征,用新表的属性覆盖源表的重复条目,例如,如果两个表中都存在具有相同键的选项,则将使用当前语句中的选项。

此外,可以使用 INCLUDING/EXCLUDING ALL 选项来指定如果没有定义特定的策略应该是什么,即如果使用 EXCLUDING ALL INCLUDING WATERMARKS,则只从源表中包含水印。

例子:

-- A source table stored in a filesystem
CREATE TABLE Orders_in_file (
    user BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
    
)
PARTITIONED BY user 
WITH ( 
    'connector' = 'filesystem'
    'path' = '...'
);

-- A corresponding table we want to store in kafka
CREATE TABLE Orders_in_kafka (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector': 'kafka'
    ...
)
LIKE Orders_in_file (
    -- Exclude everything besides the computed columns which we need to generate the watermark for.
    -- We do not want to have the partitions or filesystem options as those do not apply to kafka. 
    EXCLUDING ALL
    INCLUDING GENERATED
);

如果您没有提供同类选项,则默认使用 INCLUDING ALL OVERWRITING OPTIONS

注意: 您无法控制合并物理字段的行为。这些字段将被合并,就像您应用 INCLUDING 策略一样。

CREATE CATALOG #

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

用给定的目录属性创建一个目录。如果已经存在同名的目录,则会产生异常。

WITH OPTIONS

目录属性,用于存储与本目录相关的额外信息。表达式 key1=val1 的键和值都应该是字符串文字。

更多详情请查看目录

CREATE DATABASE #

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

用给定的数据库属性创建一个数据库,如果目录中已经存在同名的数据库,则抛出异常。如果目录中已经存在相同名称的数据库,则会抛出异常。

IF NOT EXISTS

如果数据库已经存在,则不会发生任何事情。

WITH OPTIONS

数据库属性,用于存储与本数据库相关的额外信息。表达式 key1=val1 的键和值都应该是字符串文字。

CREATE VIEW #

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [{columnName [, columnName ]* }] [COMMENT view_comment]
  AS query_expression

用给定的查询表达式创建一个视图,如果目录中已经存在同名的视图,则抛出异常。如果目录中已经存在同名的视图,则会抛出异常。

TEMPORARY

创建具有目录和数据库命名空间并覆盖视图的临时视图。

IF NOT EXISTS

如果视图已经存在,则不会发生任何事情。

CREATE FUNCTION #

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

创建一个目录函数,该函数具有目录和数据库的名称空间,并带有标识符和可选的语言标签。如果目录中已经存在同名函数,则会抛出一个异常。

如果语言标签是 JAVA/SCALA,标识符是 UDF 的完整 classpath。关于 Java/Scala UDF 的实现,请参考 User-defined Functions 了解详情。

如果语言标签是 PYTHON,标识符是 UDF 的完全限定名,例如 pyflink.table.test.test_udf.add。关于 Python UDF 的实现,更多细节请参考 Python UDFs

TEMPORARY

创建具有目录和数据库命名空间并覆盖目录功能的临时目录功能。

TEMPORARY SYSTEM

创建没有命名空间并覆盖内置函数的临时系统函数。

IF NOT EXISTS

如果函数已经存在,则不会发生任何事情。

LANGUAGE JAVA|SCALA|PYTHON

语言标签,用于指示 Flink 运行时如何执行函数。目前只支持 JAVA、SCALA 和 PYTHON,函数的默认语言是 JAVA。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html