Wait the light to fall

查询

焉知非鱼

Queries

查询

SELECT 语句和 VALUES 语句是用 TableEnvironment 的 sqlQuery()方法指定的。该方法将 SELECT 语句(或 VALUES 语句)的结果作为一个表返回。表可以在后续的 SQL 和 Table API 查询中使用,可以转换为 DataSet 或 DataStream,也可以写入 TableSink。SQL 和 Table API 查询可以无缝混合,并进行整体优化,转化为一个程序。

为了在 SQL 查询中访问一个表,必须在 TableEnvironment 中注册。表可以从 TableSourceTableCREATE TABLE 语句DataStream 或 DataSet 中注册。另外,用户也可以在 TableEnvironment 中注册目录来指定数据源的位置。

为了方便起见,Table.toString()会自动在其 TableEnvironment 中以唯一的名称注册表,并返回名称。所以,Table 对象可以直接内联到 SQL 查询中,如下例所示。

注意:包含不支持的 SQL 特性的查询会导致 TableException。批量表和流式表的 SQL 支持的功能在下面的章节中列出。

指定查询 #

下面的例子显示了如何在注册表和内联表上指定 SQL 查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result = tableEnv.sqlQuery(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
// run a SQL query on the Table and retrieve the result as a new Table
val result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

// create and register a TableSink
val schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT())

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders")

// run an INSERT SQL on the Table and emit the result to the TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

执行查询 #

可以通过 TableEnvironment.executeSql()方法执行 SELECT 语句或 VALUES 语句,将内容收集到本地。该方法将 SELECT 语句(或 VALUES 语句)的结果作为 TableResult 返回。与 SELECT 语句类似,可以使用 Table.execute()方法执行 Table 对象,将查询的内容收集到本地客户端。TableResult.collect()方法返回一个可关闭的行迭代器。除非收集完所有的结果数据,否则选择作业不会结束。我们应该通过 CloseableIterator#close()方法主动关闭作业,避免资源泄露。我们也可以通过 TableResult.print()方法将选择结果打印到客户端控制台。TableResult 中的结果数据只能被访问一次。因此,collect()和 print()不能相继被调用。

对于流式作业,TableResult.collect()方法或 TableResult.print()方法可以保证端到端的精确一次记录传递。这需要启用检查点机制。默认情况下,检查点机制是被禁用的。要启用检查点,我们可以通过 TableConfig 设置检查点属性(详见检查点配置)。所以一条结果记录只有在其对应的检查点完成后才能被客户端访问。

注意事项 对于流媒体模式,现在只支持只追加查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env, settings)
// enable checkpointing
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

// execute SELECT statement
val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
val it = tableResult1.collect()
try while (it.hasNext) {
  val row = it.next
  // handle row
}
finally it.close() // close the iterator to avoid resource leak

// execute Table
val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
tableResult2.print()

语法 #

Flink 使用 Apache Calcite 解析 SQL,它支持标准的 ANSI SQL。

下面的 BNF-语法描述了在批处理和流式查询中支持的 SQL 特性的超集。操作部分显示了支持的特性的例子,并指出哪些特性只支持批处理或流式查询。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

Flink SQL 对标识符(表名、属性名、函数名)使用了类似 Java 的词汇策略。

无论标识符是否被引用,它们的大小写都会被保留。 之后,标识符会被大小写敏感地匹配。 与 Java 不同的是,回标允许标识符包含非字母数字字符(例如:“SELECT a ASmy fieldFROM t”)。 字符串必须用单引号括起来(例如,SELECT ‘Hello World’)。重复一个单引号进行转义(例如,SELECT ‘It’s me.’)。字符串中支持 Unicode 字符。如果需要明确的 unicode 码点,请使用以下语法。

使用反斜杠(\)作为转义字符(默认)。SELECT U&’\263A’ 使用自定义转义字符。SELECT U&’#263A’ UESCAPE ‘#’。

Operations #

Scan, Projection 和 Filter #

  • Scan / Select / As(Batch/Streaming)
SELECT * FROM Orders

SELECT a, c AS d FROM Orders
  • Where / Filter(Batch/Streaming)
SELECT * FROM Orders WHERE b = 'red'

SELECT * FROM Orders WHERE a % 2 = 0
  • 用户定义标量函数 (Scalar UDF)(Batch/Streaming)

UDF 必须在 TableEnvironment 中注册。关于如何指定和注册标量 UDF 的详细信息,请参见 UDF 文档

SELECT PRETTY_PRINT(user) FROM Orders

聚合 #

  • GroupBy 聚合(Batch/Streaming/Result Updating)

注意:流表上的 GroupBy 会产生更新结果。详情请参见动态表流概念页面。

SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
  • GroupBy 窗口聚合(Batch/Streaming)

使用分组窗口来计算每个组的单一结果行。更多细节请参见分组窗口部分。

SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
  • Over 窗口聚合(Streaming)

注意:所有的聚合必须定义在同一个窗口上,即相同的分区、排序和范围。目前,只支持对 CURRENT ROW 范围的 PRECEDING(UNBOUNDED 和 bounded)窗口。还不支持带 FOLLOWING 的范围。ORDER BY 必须在单个时间属性上指定。

SELECT COUNT(amount) OVER (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders

SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
  • Distinct(Batch/Streaming/Result Updating)
SELECT DISTINCT users FROM Orders

注意:对于流式查询,计算查询结果所需的状态可能会根据不同字段的数量而无限增长。请提供一个有效的保留时间间隔的查询配置,以防止过大的状态大小。详情请看查询配置

  • Grouping sets, Rollup, Cube(Batch/Streaming/Result Updating)
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))

注:流式模式分组集、Rollup 和 Cube 仅在 Blink 计划器中支持。

  • Having(Batch/Streaming)
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
  • 用户定义聚合函数 (UDAGG)(Batch/Streaming)

UDAGG 必须在 TableEnvironment 中注册。关于如何指定和注册 UDAGG 的细节,请参见 UDF 文档

SELECT MyAggregate(amount)
FROM Orders
GROUP BY users

Joins #

  • Inner Equi-join(Batch/Streaming)

目前,只支持等价连接,即至少有一个带有平等谓词的共轭条件的连接,不支持任意的交叉连接或θ连接。不支持任意的交叉连接或θ连接。

注意:连接的顺序没有被优化。表的连接顺序是按照 FROM 子句中指定的顺序进行的。确保指定表的顺序不会产生交叉连接(笛卡尔乘积),因为交叉连接不支持,会导致查询失败。

SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置

  • Outer Equi-join(Batch/Streaming/Result Updating)

目前,只支持 equi-joins 连接,即至少有一个带有平等谓词的共轭条件的连接,不支持任意的交叉连接或θ连接。不支持任意的交叉连接或θ连接。

注意:连接的顺序没有被优化。表的连接顺序是按照 FROM 子句中指定的顺序进行的。确保指定表的顺序不会产生交叉连接(笛卡尔乘积),因为交叉连接不支持,会导致查询失败。

SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置

  • Interval Join(Batch/Streaming)

注:区间连接是常规连接的一个子集,可以用流式处理。

一个区间连接至少需要一个等价连接谓词和一个连接条件,以限制双方的时间。这样的条件可以由两个合适的范围谓词(<,<=,>=,>)、一个 BETWEEN 谓词或一个比较两个输入表的相同类型的时间属性(即处理时间或事件时间)的单一平等谓词来定义。

例如,以下谓词是有效的区间连接条件。

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
  • ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

上面的例子中,如果在收到订单 4 小时后才发货,那么就会将所有的订单与其对应的货物加入。

  • 将数组扩展为关系(Batch/Streaming)

还不支持 Unnesting With ORDINALITY。

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
  • Join with Table Function (UDTF)(Batch/Streaming)

用表格函数的结果连接一个表格。左表(外表)的每一行都与表函数的相应调用所产生的所有行相连接。

用户定义表函数(UDTF)必须在之前注册。关于如何指定和注册 UDTF 的细节,请参见 UDF 文档

Inner Join

左表(外表)的一行,如果它的表函数调用返回一个空的结果,就会被删除。

SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join

如果表函数调用返回的结果为空,则保留相应的外行,并将结果用空值填充。

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

注意:目前,只有字面意义上的 “TRUE “被支持为针对横向表的左外连接的谓词。

  • Join with Temporal Table Function(Streaming)

时间表是跟踪随时间变化的表。

时间表函数提供了对时间表在特定时间点的状态的访问。使用时态表函数连接表的语法与使用表函数连接相同。

注意:目前只支持与时态表的内部连接。

假设 Rates 是一个时间表函数,连接可以用 SQL 表达如下。

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency

更多信息请查看更详细的时间表概念说明。

  • Join with Temporal Table(Batch/Streaming)

时间表是跟踪随时间变化的表。时间表提供了对时间表在特定时间点的版本的访问。

只支持与处理时间的时态表进行内联和左联。

下面的例子假设 LatestRates 是一个时间表,它是以最新的速率来具体化的。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

更多信息请查看更详细的时间表概念描述。

仅支持 Blink 计划器。

集合运算 #

  • Union(Batch)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION
    (SELECT user FROM Orders WHERE b = 0)
)
  • UnionAll(Batch/Streaming)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION ALL
    (SELECT user FROM Orders WHERE b = 0)
)
  • Intersect / Except(Batch)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  INTERSECT
    (SELECT user FROM Orders WHERE b = 0)
)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  EXCEPT
    (SELECT user FROM Orders WHERE b = 0)
)
  • In(Batch/Streaming)

如果给定表的子查询中存在表达式,则返回 true。子查询表必须由一列组成。该列必须与表达式具有相同的数据类型。

SELECT user, amount
FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

注意:对于流式查询,该操作被重写为加入和分组操作。计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供有效的保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置

  • Exists(Batch/Streaming)

如果子查询至少返回一条记录,则返回 true。只有当操作可以被重写成联接和分组操作时才支持。

SELECT user, amount
FROM Orders
WHERE product EXISTS (
    SELECT product FROM NewProducts
)

注意:对于流式查询,该操作被重写为加入和分组操作。计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供有效的保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置

OrderBy 和 Limit #

  • Order By

批量流注:流查询的结果必须主要按升序时间属性进行排序。支持其他排序属性。

SELECT *
FROM Orders
ORDER BY orderTime
  • Limit(Batch)

注意:LIMIT 子句需要一个 ORDER BY 子句。

SELECT *
FROM Orders
ORDER BY orderTime
LIMIT 3

Top-N #

注意 Top-N 只在 Blink planner 中支持。

Top-N 查询要求按列排序的 N 个最小或最大的值。最小值和最大值集都被认为是 Top-N 查询。当需要从批处理/流处理表中只显示 N 条最底层或最上层的记录时,Top-N 查询非常有用。这个结果集可以用于进一步分析。

Flink 使用 OVER 窗口子句和过滤条件的组合来表达 Top-N 查询。借助 OVER window PARTITION BY 子句的强大功能,Flink 还支持每组 Top-N。例如,每个类别中实时销售量最大的前五个产品。对于批处理表和流处理表的 SQL,都支持 Top-N 查询。

下面是 TOP-N 语句的语法。

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

参数说明:

  • ROW_NUMBER()。根据分区内行的顺序,给每一行分配一个唯一的、连续的数字,从 1 开始。目前,我们只支持 ROW_NUMBER 作为 over window 函数。在未来,我们将支持 RANK()和 DENSE_RANK()。
  • PARTITION BY col1[,col2…]。指定分区列。每个分区将有一个 Top-N 的结果。
  • ORDER BY col1[asc|desc][,col2[asc|desc]…]:指定排序列。指定排序列。不同列的排序方向可以不同。
  • WHERE rownum <= N:为了让 Flink 识别这个查询是 Top-N 查询,需要 rownum <= N。N 代表将保留 N 条最小或最大的记录。
  • [AND 条件]。在 where 子句中可以自由添加其他条件,但其他条件只能与 rownum <= N 使用 AND 连接组合。

流模式下的注意点: TopN 查询是结果更新。Flink SQL 会根据顺序键对输入的数据流进行排序,所以如果前 N 条记录发生了变化,变化后的记录会作为回撤/更新记录发送到下游。建议使用支持更新的存储作为 Top-N 查询的汇。另外,如果 Top N 记录需要存储在外部存储中,结果表应该与 Top-N 查询的唯一键相同。

Top-N 查询的唯一键是分区列和 rownum 列的组合。Top-N 查询也可以得出上游的唯一键。以下面的工作为例,假设 product_id 是 ShopSales 的唯一键,那么 Top-N 查询的唯一键是[category,rownum]和[product_id]。

下面的例子展示了如何在流表上使用 Top-N 指定 SQL 查询。这个例子是为了得到我们上面提到的 “每个类别实时销量最大的前五个产品”。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
// register the DataStream under the name "ShopSales"
tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales")


// select top-5 products per category which have the maximum sales.
val result1 = tableEnv.sqlQuery(
    """
      |SELECT *
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
      |   FROM ShopSales)
      |WHERE row_num <= 5
    """.stripMargin)

无排名输出优化 #

如上所述,rownum 字段将作为唯一键的一个字段写入结果表,这可能导致很多记录被写入结果表。例如,当排名 9 的记录(比如产品-1001)更新,其排名升级为 1 时,排名 1~9 的所有记录都会作为更新消息输出到结果表。如果结果表接收的数据过多,就会成为 SQL 作业的瓶颈。

优化的方式是在 Top-N 查询的外侧 SELECT 子句中省略 rownum 字段。这样做是合理的,因为 Top N 记录的数量通常不多,因此消费者可以自己快速排序。如果没有 rownum 字段,在上面的例子中,只需要将改变的记录(product-1001)发送到下游,这样可以减少很多结果表的 IO。

下面的例子展示了如何用这种方式优化上面的 Top-N 例子。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
// register the DataStream under the name "ShopSales"
tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales")


// select top-5 products per category which have the maximum sales.
val result1 = tableEnv.sqlQuery(
    """
      |SELECT product_id, category, product_name, sales  -- omit row_num field in the output
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
      |   FROM ShopSales)
      |WHERE row_num <= 5
    """.stripMargin)

流模式下的注意点: 为了将上述查询输出到外部存储中,并得到正确的结果,外部存储必须与 Top-N 查询具有相同的唯一键,在上面的示例查询中,如果 product_id 是查询的唯一键,那么外部表也应该以 product_id 作为唯一键。在上面的示例查询中,如果 product_id 是查询的唯一键,那么外部表也应该以 product_id 作为唯一键。

重复数据删除 #

注意 重复数据删除只在 Blink planner 中支持。

重复数据删除就是删除一组列上重复的行,只保留第一条或最后一条。在某些情况下,上游 ETL 作业并不是端到端完全对接的,这可能会导致在故障切换时,sink 中有重复的记录。但是,重复的记录会影响到下游分析作业(如 SUM、COUNT)的正确性。所以在进一步分析之前需要进行重复数据删除。

Flink 使用 ROW_NUMBER()来删除重复记录,就像 Top-N 查询的方式一样。理论上,重复数据删除是 Top-N 的一个特例,N 为 1,按处理时间或事件时间排序。

下面是重复数据删除语句的语法。

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

参数说明:

  • ROW_NUMBER()。为每一行指定一个唯一的、连续的编号,从 1 开始。
  • PARTITION BY col1[,col2…]: 指定分区列,即重复复制键。
  • ORDER BY time_attr[asc|desc]。指定排序列,必须是时间属性。目前只支持 proctime 属性。未来将支持 Rowtime 属性。用 ASC 排序表示保留第一行,用 DESC 排序表示保留最后一行。
  • WHERE rownum = 1:为了让 Flink 识别这个查询是重复数据删除,需要 rownum = 1。

下面的例子展示了如何在流表上指定使用重复数据删除的 SQL 查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.createTemporaryView("Orders", ds, $"order_id", $"user", $"product", $"number", $"proctime".proctime)

// remove duplicate rows on order_id and keep the first occurrence row,
// because there shouldn't be two orders with the same order_id.
val result1 = tableEnv.sqlQuery(
    """
      |SELECT order_id, user, product, number
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
      |   FROM Orders)
      |WHERE row_num = 1
    """.stripMargin)

Group Windows #

组窗口是在 SQL 查询的 GROUP BY 子句中定义的。就像使用常规的 GROUP BY 子句的查询一样,使用包含组窗口函数的 GROUP BY 子句的查询是为每个组计算一条结果行。在批处理表和流式表上的 SQL 支持以下组窗口函数。

分组窗口函数 描述
TUMBLE(time_attr, interval) 定义一个滚动时间窗口。滚动时间窗口将行分配到具有固定持续时间(间隔)的非重叠的连续窗口。例如,一个 5 分钟的时间窗口可以将行以 5 分钟的间隔进行分组。滚动窗口可以在事件时间(流+批次)或处理时间(流)上定义。
HOP(time_attr, interval, interval) 定义一个跳转时间窗口(在表 API 中称为滑动窗口)。跳跃时间窗口有一个固定的持续时间(第二个间隔参数),并按指定的跳跃间隔(第一个间隔参数)进行跳转。如果跳转间隔小于窗口大小,则跳转窗口是重叠的。因此,可以将行分配到多个窗口。例如,15 分钟大小的跳转窗口和 5 分钟的跳转间隔将每行分配给 3 个 15 分钟大小的不同窗口,这些窗口以 5 分钟的间隔进行评估。滚动窗口可以在事件时间(流+批处理)或处理时间(流)上定义。
SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有固定的持续时间,但其边界由不活动的时间间隔定义,即如果在定义的间隙期内没有事件出现,则会话窗口关闭。例如,有 30 分钟间隙的会话窗口在 30 分钟不活动后观察到一行时开始(否则该行将被添加到现有的窗口中),如果在 30 分钟内没有行被添加,则关闭。会话窗口可以在事件时间(流+批处理)或处理时间(流)上工作。

时间属性 #

对于流表的 SQL 查询,组窗口函数的 time_attr 参数必须引用一个有效的时间属性,该属性指定行的处理时间或事件时间。请参阅时间属性的文档,了解如何定义时间属性。

对于批处理表上的 SQL,组窗口函数的 time_attr 参数必须是类型为 TIMESTAMP 的属性。

选择组窗口的开始和结束时间戳 #

可以通过以下辅助功能选择组窗口的开始和结束时间戳以及时间属性。

Auxiliary 函数 Description
TUMBLE_START(time_attr, interval),HOP_START(time_attr, interval, interval),SESSION_START(time_attr, interval) 返回对应的滚动、跳跃或会话窗口的包容下界的时间戳。
TUMBLE_END(time_attr, interval),HOP_END(time_attr, interval, interval),SESSION_END(time_attr, interval) 返回对应的翻滚、跳跃或会话窗口的专属上界的时间戳。注意:在后续的基于时间的操作中,如区间连接分组窗口或 over 窗口聚合中,不能将专属上界时间戳作为行时间属性使用。
TUMBLE_ROWTIME(time_attr, interval),HOP_ROWTIME(time_attr, interval, interval),SESSION_ROWTIME(time_attr, interval) 返回对应的翻滚、跳跃或会话窗口的包容上界的时间戳。产生的属性是一个行时间属性,可以用于后续的基于时间的操作,如区间连接分组窗口或窗口聚合
TUMBLE_PROCTIME(time_attr, interval),HOP_PROCTIME(time_attr, interval, interval),SESSION_PROCTIME(time_attr, interval) 返回一个 proctime 属性,该属性可用于后续基于时间的操作,如区间连接分组窗口或过窗口聚合

注意:在调用辅助函数时,必须使用与 GROUP BY 子句中的组窗口函数完全相同的参数。

下面的例子展示了如何在流式表上使用组窗口指定 SQL 查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount", $"proctime".proctime, $"rowtime".rowtime)

// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
    """.stripMargin)

// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")

// compute every hour the SUM(amount) of the last 24 hours in event-time
val result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
      |  SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
    """.stripMargin)

模式识别 #

  • MATCH_RECOGNIZE(Streaming)

根据 MATCH_RECOGNIZE ISO 标准在流表中搜索给定模式。这使得在 SQL 查询中表达复杂事件处理(CEP)逻辑成为可能。

更详细的描述,请参见检测表中模式的专门页面。

SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
  PARTITION BY userid
  ORDER BY proctime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    C.id AS cid
  PATTERN (A B C)
  DEFINE
    A AS name = 'a',
    B AS name = 'b',
    C AS name = 'c'
) AS T