Wait the light to fall

检测表中的模式

焉知非鱼

Detecting Patterns in Tables

检测表格中的模式 搜索一组事件模式是一个常见的用例,特别是在数据流的情况下。Flink 自带复杂事件处理(CEP)库,可以在事件流中进行模式检测。此外,Flink 的 SQL API 提供了一种关系型的查询表达方式,有大量的内置函数和基于规则的优化,可以开箱即用。

2016 年 12 月,国际标准化组织(ISO)发布了新版本的 SQL 标准,其中包括 SQL 中的行模式识别(ISO/IEC TR 19075-5:2016)。它允许 Flink 使用 MATCH_RECOGNIZE 子句整合 CEP 和 SQL API,用于 SQL 中的复杂事件处理。

MATCH_RECOGNIZE 子句可以实现以下任务。

对使用 partition by 和 order by 子句的数据进行逻辑分区和排序。 使用 PATTERN 子句定义要寻找的行的模式。这些模式使用类似于正则表达式的语法。 行模式变量的逻辑成分在 DEFINE 子句中指定。 在 MEASURES 子句中定义措施,这些措施是在 SQL 查询的其他部分中可用的表达式。 下面的例子说明了基本模式识别的语法。

SELECT T.aid, T.bid, T.cid
FROM MyTable
    MATCH_RECOGNIZE (
      PARTITION BY userid
      ORDER BY proctime
      MEASURES
        A.id AS aid,
        B.id AS bid,
        C.id AS cid
      PATTERN (A B C)
      DEFINE
        A AS name = 'a',
        B AS name = 'b',
        C AS name = 'c'
    ) AS T

本页将更详细地解释每个关键字,并将说明更复杂的例子。

注意 Flink 对 MATCH_RECOGNIZE 子句的实现是完整标准的一个子集。只有那些在下面的章节中记录的功能得到了支持。根据社区反馈,可能会支持更多的功能,也请看一下已知的限制。

介绍和示例 安装指南 模式识别功能内部使用了 Apache Flink 的 CEP 库。为了能够使用 MATCH_RECOGNIZE 子句,需要将该库作为一个依赖项添加到你的 Maven 项目中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.11.0</version>
</dependency>

另外,你也可以将依赖关系添加到集群 classpath 中(更多信息请参见依赖关系部分)。

如果你想在 SQL 客户端中使用 MATCH_RECOGNIZE 子句,你不需要做任何事情,因为所有的依赖关系都是默认的。

SQL 语义 每个 MATCH_RECOGNIZE 查询都由以下子句组成。

PARTITION BY - 定义表的逻辑分区;类似于 GROUP BY 操作。

MEASURES - 定义子句的输出;类似于 SELECT 子句。 ONE ROW PER MATCH - 输出模式,定义每次匹配应该产生多少行。 AFTER MATCH SKIP–指定下一个匹配应该从哪里开始;这也是控制一个事件可以属于多少个不同匹配的方法。 PATTERN - 允许使用类似于正则表达式的语法来构建搜索的模式。 DEFINE - 这一部分定义了模式变量必须满足的条件。 注意 目前,MATCH_RECOGNIZE 子句只能应用于追加表。此外,它也总是产生一个追加表。

例子 在我们的例子中,我们假设已经注册了一个 Ticker 表。该表包含股票在某一特定时间点的价格。

该表的模式如下:

Ticker
     |-- symbol: String                           # symbol of the stock
     |-- price: Long                              # price of the stock
     |-- tax: Long                                # tax liability of the stock
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # point in time when the change to those values happened

为了简化,我们只考虑单只股票 ACME 的传入数据。一个行情可以类似于下表,其中行是连续追加的。

symbol         rowtime         price    tax
======  ====================  ======= =======
'ACME'  '01-Apr-11 10:00:00'   12      1
'ACME'  '01-Apr-11 10:00:01'   17      2
'ACME'  '01-Apr-11 10:00:02'   19      1
'ACME'  '01-Apr-11 10:00:03'   21      3
'ACME'  '01-Apr-11 10:00:04'   25      2
'ACME'  '01-Apr-11 10:00:05'   18      1
'ACME'  '01-Apr-11 10:00:06'   15      1
'ACME'  '01-Apr-11 10:00:07'   14      2
'ACME'  '01-Apr-11 10:00:08'   24      2
'ACME'  '01-Apr-11 10:00:09'   25      2
'ACME'  '01-Apr-11 10:00:10'   19      1

现在的任务是寻找单一行情的价格不断下降的时期。为此,可以写一个类似的查询。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE (
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
        ONE ROW PER MATCH
        AFTER MATCH SKIP TO LAST PRICE_UP
        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
        DEFINE
            PRICE_DOWN AS
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;

该查询按符号列对 Ticker 表进行分区,并按行时间属性进行排序。

PATTERN 子句指定我们感兴趣的模式是以 START_ROW 事件为起点,然后是一个或多个 PRICE_DOWN 事件,最后是 PRICE_UP 事件。如果能找到这样的模式,下一个模式匹配将在最后一个 PRICE_UP 事件中寻找,如 AFTER MATCH SKIP TO LAST 子句所示。

DEFINE 子句指定了 PRICE_DOWN 和 PRICE_UP 事件需要满足的条件。虽然 START_ROW 模式变量并不存在,但它有一个隐含的条件,这个条件总是被评估为 TRUE。

模式变量 PRICE_DOWN 被定义为价格小于满足 PRICE_DOWN 条件的最后一行的价格。对于初始情况或者没有满足 PRICE_DOWN 条件的最后一行,这一行的价格应该小于模式中前一行的价格(由 START_ROW 引用)。

模式变量 PRICE_UP 被定义为价格大于满足 PRICE_DOWN 条件的最后一行的价格的行。

该查询为股票价格连续下跌的每个时期产生一条汇总行。

输出行的具体表示方法在查询的 MEASURES 部分定义。输出行的数量由 ONE ROW PER MATCH 输出模式定义。

 symbol       start_tstamp       bottom_tstamp         end_tstamp
=========  ==================  ==================  ==================
ACME       01-APR-11 10:00:04  01-APR-11 10:00:07  01-APR-11 10:00:08

结果一行描述了从 01-APR-11 10:00:04 开始的价格下降期,在 01-APR-11 10:00:07 达到最低价,在 01-APR-11 10:00:08 再次上涨。

分割 可以在分区数据中寻找模式,例如,单个股票或特定用户的趋势。这可以使用 partition by 子句来表达。该子句类似于使用 GROUP BY 进行聚合。

注意 强烈建议对输入的数据进行分区,否则 MATCH_RECOGNIZE 子句将被翻译成一个非平行操作符,以确保全局排序。

事件的顺序 Apache Flink 允许根据时间来搜索模式;无论是处理时间还是事件时间。

在事件时间的情况下,事件在被传递到内部模式状态机之前会被排序。因此,产生的输出将是正确的,不管行被附加到表中的顺序如何。相反,模式是按照每行包含的时间所指定的顺序来评估的。

MATCH_RECOGNIZE 子句假设时间属性以升序作为 ORDER BY 子句的第一个参数。

对于 Ticker 表的例子,像 ORDER BY rowtime ASC, price DESC 这样的定义是有效的,但是 ORDER BY price, rowtime 或者 ORDER BY rowtime DESC, price ASC 是无效的。

定义和测量 DEFINE 和 MEASURES 关键字的含义类似于简单 SQL 查询中的 WHERE 和 SELECT 子句。

MEASURES 子句定义了匹配模式的输出中会包含哪些内容。它可以投射列和定义评估的表达式。产生的行数取决于输出模式的设置。

DEFINE 子句指定了行必须满足的条件,以便将其分类到相应的模式变量。如果没有为模式变量定义条件,那么将使用一个默认条件,该条件对每条记录的评价为真。

关于这些子句中可以使用的表达式的更详细解释,请看事件流导航部分。

聚合 聚合可以在 DEFINE 和 MEASURES 子句中使用。同时支持内置和自定义的用户定义函数。

聚合函数被应用于映射到匹配的行的每个子集。为了了解这些子集是如何被评估的,请看一下事件流导航部分。

下面这个例子的任务是找到一个股票平均价格不低于某个阈值的最长时间段。它显示了 MATCH_RECOGNIZE 可以如何通过聚合来表达。这个任务可以用下面的查询来执行。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE (
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            FIRST(A.rowtime) AS start_tstamp,
            LAST(A.rowtime) AS end_tstamp,
            AVG(A.price) AS avgPrice
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A+ B)
        DEFINE
            A AS AVG(A.price) < 15
    ) MR;

给定这个查询和以下输入值:

symbol         rowtime         price    tax
======  ====================  ======= =======
'ACME'  '01-Apr-11 10:00:00'   12      1
'ACME'  '01-Apr-11 10:00:01'   17      2
'ACME'  '01-Apr-11 10:00:02'   13      1
'ACME'  '01-Apr-11 10:00:03'   16      3
'ACME'  '01-Apr-11 10:00:04'   25      2
'ACME'  '01-Apr-11 10:00:05'   2       1
'ACME'  '01-Apr-11 10:00:06'   4       1
'ACME'  '01-Apr-11 10:00:07'   10      2
'ACME'  '01-Apr-11 10:00:08'   15      2
'ACME'  '01-Apr-11 10:00:09'   25      2
'ACME'  '01-Apr-11 10:00:10'   25      1
'ACME'  '01-Apr-11 10:00:11'   30      1

只要事件的平均价格不超过 15,查询就会将事件累积为模式变量 A 的一部分。例如,这样的超限事件发生在 01-4-11 10:00:04。接下来的时期在 01-4-11 10:00:11 再次超过 15 的平均价格。因此,所述查询的结果将是:。

 symbol       start_tstamp       end_tstamp          avgPrice
=========  ==================  ==================  ============
ACME       01-APR-11 10:00:00  01-APR-11 10:00:03     14.5
ACME       01-APR-11 10:00:05  01-APR-11 10:00:10     13.5

注意 聚合可以应用于表达式,但只有当它们引用一个单一的模式变量时才可以。因此 SUM(A.price * A.tax)是有效的,但是 AVG(A.price * B.tax)不是。

注意不支持 DISTINCT 聚合。

定义一个模式 MATCH_RECOGNIZE 子句允许用户在事件流中搜索模式,使用一种强大的、富有表现力的语法,这种语法与广泛使用的正则表达式语法有些相似。

每个模式都是由基本的构件构成的,称为模式变量,可以对其应用运算符(量化符和其他修饰符)。整个模式必须用括号括起来。

一个模式的例子可以是这样的。

PATTERN (A B+ C* D)

我们可以使用以下操作符。

并集 – 像(A B)这样的模式意味着 A 和 B 之间的相邻性是严格的,因此,中间不能有没有映射到 A 或 B 的行。 定量符–修改可以映射到模式变量的行数。

* — 0 or more rows
+ — 1 or more rows
? — 0 or 1 rows
{ n } — exactly n rows (n > 0)
{ n, } — n or more rows (n ≥ 0)
{ n, m } — between n and m (inclusive) rows (0 ≤ n ≤ m, 0 < m)
{ , m } — between 0 and m (inclusive) rows (m > 0)

注意 不支持可能产生空匹配的模式。这类模式的例子有 PATTERN (A*)、PATTERN (A?B*)、PATTERN (A{0,} B{0,} C*)等。

贪婪和不情愿的量化器 每个量化器可以是贪婪的(默认行为)或勉强的。贪婪的量化器试图匹配尽可能多的记录,而不情愿的量化器试图匹配尽可能少的记录。

为了说明两者的区别,我们可以查看下面的示例,在这个示例中,一个贪婪的量化器被应用于 B 变量。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            C.price AS lastPrice
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C)
        DEFINE
            A AS A.price > 10,
            B AS B.price < 15,
            C AS C.price > 12
    )

鉴于我们有以下输入。

 symbol  tax   price          rowtime
======= ===== ======== =====================
 XYZ     1     10       2018-09-17 10:00:02
 XYZ     2     11       2018-09-17 10:00:03
 XYZ     1     12       2018-09-17 10:00:04
 XYZ     2     13       2018-09-17 10:00:05
 XYZ     1     14       2018-09-17 10:00:06
 XYZ     2     16       2018-09-17 10:00:07

上述模式将产生以下输出。

 symbol   lastPrice
======== ===========
 XYZ      16

同样的查询,将 B* 修改为 B* 吗,即 B*应该是不愿意的,会产生。

 symbol   lastPrice
======== ===========
 XYZ      13
 XYZ      16

模式变量 B 只匹配到价格为 12 的行,而不是吞掉价格为 12、13、14 的行。

注意 对于一个模式的最后一个变量,不可能使用贪婪的量化符。因此,像(A B*)这样的模式是不允许的。这可以通过引入一个人为的状态(如 C)来轻松解决,这个状态具有 B 的否定条件,所以你可以使用这样的查询。

PATTERN (A B* C)
DEFINE
    A AS condA(),
    B AS condB(),
    C AS NOT condB()

注意 目前不支持可选的勉强量化符(A??或 A{0,1}?)。

时间限制 特别是对于流式使用案例,通常要求一个模式在给定的时间内完成。这允许限制 Flink 必须在内部维护的整体状态大小,即使在贪婪的量化器的情况下。

因此,Flink SQL 支持额外的(非标准 SQL)WITHIN 子句来定义模式的时间约束。该子句可以定义在 PATTERN 子句之后,并以毫秒为间隔进行解析。

如果一个潜在匹配的第一个事件和最后一个事件之间的时间长于给定的值,这样的匹配将不会被追加到结果表中。

注意 一般鼓励使用 within 子句,因为它有助于 Flink 进行有效的内存管理。一旦达到阈值,底层状态可以被修剪。

注意 然而,WITHIN 子句不是 SQL 标准的一部分。推荐的处理时间限制的方式可能会在未来发生变化。

在下面的查询示例中说明了 WITHIN 子句的使用。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            C.rowtime AS dropTime,
            A.price - C.price AS dropDiff
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
        DEFINE
            B AS B.price > A.price - 10
            C AS C.price < A.price - 10
    )

查询检测到在 1 小时的时间间隔内发生的价格下跌 10。

假设该查询用于分析以下行情数据。

symbol         rowtime         price    tax
======  ====================  ======= =======
'ACME'  '01-Apr-11 10:00:00'   20      1
'ACME'  '01-Apr-11 10:20:00'   17      2
'ACME'  '01-Apr-11 10:40:00'   18      1
'ACME'  '01-Apr-11 11:00:00'   11      3
'ACME'  '01-Apr-11 11:20:00'   14      2
'ACME'  '01-Apr-11 11:40:00'   9       1
'ACME'  '01-Apr-11 12:00:00'   15      1
'ACME'  '01-Apr-11 12:20:00'   14      2
'ACME'  '01-Apr-11 12:40:00'   24      2
'ACME'  '01-Apr-11 13:00:00'   1       2
'ACME'  '01-Apr-11 13:20:00'   19      1

查询将产生以下结果。

symbol         dropTime         dropDiff
======  ====================  =============
'ACME'  '01-Apr-11 13:00:00'      14

结果行表示价格从 15(在 4 月 1 日 12:00:00)下降到 1(在 4 月 1 日 13:00:00)。dropDiff 列包含了价格差。

请注意,即使价格也以更高的数值下降,例如,下降 11(在 01-Apr-11 10:00:00 和 01-Apr-11 11:40:00 之间),这两个事件之间的时间差大于 1 小时。因此,它们不会产生匹配。

输出模式 输出模式描述了每找到一个匹配的记录应该发出多少行。SQL 标准描述了两种模式。

ALL ROWS PER MATCH
ONE ROW PER MATCH.

目前,唯一支持的输出模式是 ONE ROW PER MATCH,对于每一个找到的匹配项,总会产生一个输出汇总行。

输出行的模式将是[分区列]+[措施列]按该特定顺序的连接。

下面的例子显示了一个定义为查询的输出。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            FIRST(A.price) AS startPrice,
            LAST(A.price) AS topPrice,
            B.price AS lastPrice
        ONE ROW PER MATCH
        PATTERN (A+ B)
        DEFINE
            A AS LAST(A.price, 1) IS NULL OR A.price > LAST(A.price, 1),
            B AS B.price < LAST(A.price)
    )

对于以下输入行:

 symbol   tax   price          rowtime
======== ===== ======== =====================
 XYZ      1     10       2018-09-17 10:00:02
 XYZ      2     12       2018-09-17 10:00:03
 XYZ      1     13       2018-09-17 10:00:04
 XYZ      2     11       2018-09-17 10:00:05

查询将产生以下输出。

 symbol   startPrice   topPrice   lastPrice
======== ============ ========== ===========
 XYZ      10           13         11

模式识别是按符号列进行分区的。尽管在 MEASURES 子句中没有明确提到,但在结果的开头会添加分区列。

模式导航 DEFINE 和 MEASURES 子句允许在(可能)匹配模式的行列表中进行导航。

本节将讨论这种用于声明条件或产生输出结果的导航。

模式变量引用 模式变量引用允许引用映射到 DEFINE 或 MEASURES 子句中特定模式变量的一组行。

例如,表达式 A.price 描述了迄今为止映射到 A 的一组行,再加上当前行,如果我们尝试将当前行与 A 进行匹配。如果 DEFINE/MEASURES 子句中的表达式需要单行(例如 A.price 或 A.price>10),则选择属于相应集合的最后一个值。

如果没有指定模式变量(例如 SUM(price)),表达式会引用默认的模式变量*,它引用模式中的所有变量。换句话说,它创建了一个迄今为止映射到任何变量的所有行加上当前行的列表。

例子

要想了解更透彻的例子,可以看看下面的模式和相应的条件。

PATTERN (A B+)
DEFINE
  A AS A.price > 10,
  B AS B.price > A.price AND SUM(price) < 100 AND SUM(B.price) < 80

下表描述了如何评估每个传入事件的这些条件。

该表由以下几栏组成:

# - the row identifier that uniquely identifies an incoming row in the lists [A.price]/[B.price]/[price].
price - the price of the incoming row.
[A.price]/[B.price]/[price] - describe lists of rows which are used in the DEFINE clause to evaluate conditions.
Classifier - the classifier of the current row which indicates the pattern variable the row is mapped to.
A.price/B.price/SUM(price)/SUM(B.price) - describes the result after those expressions have been evaluated.
#	price	Classifier	[A.price]	[B.price]	[price]	A.price	B.price	SUM(price)	SUM(B.price)
#1	10	-> A	#1	-	-	10	-	-	-
#2	15	-> B	#1	#2	#1, #2	10	15	25	15
#3	20	-> B	#1	#2, #3	#1, #2, #3	10	20	45	35
#4	31	-> B	#1	#2, #3, #4	#1, #2, #3, #4	10	31	76	66
#5	35		#1	#2, #3, #4, #5	#1, #2, #3, #4, #5	10	35	111	101

从表中可以看出,第一行被映射到模式变量 A,随后的行被映射到模式变量 B,但是最后一行不满足 B 的条件,因为所有映射行的 SUM(价格)和 B 中所有行的总和超过了指定的阈值。

逻辑偏移 逻辑偏移可以在映射到特定模式变量的事件中进行导航。这可以用两个相应的函数来表示。

偏移函数 描述 LAST(variable.field, n) 返回事件中被映射到变量第 n 个最后元素的字段的值。从映射到的最后一个元素开始计算。

FIRST(variable.field, n) 返回事件中被映射到变量第 n 个元素的字段值。从映射到的第一个元素开始计算。

示例

为了更透彻的举例,可以看看下面的模式和相应的条件。

PATTERN (A B+)
DEFINE
  A AS A.price > 10,
  B AS (LAST(B.price, 1) IS NULL OR B.price > LAST(B.price, 1)) AND
       (LAST(B.price, 2) IS NULL OR B.price > 2 * LAST(B.price, 2))

下表描述了如何评估每个传入事件的这些条件。

该表由以下几栏组成:

price - the price of the incoming row.
Classifier - the classifier of the current row which indicates the pattern variable the row is mapped to.
LAST(B.price, 1)/LAST(B.price, 2) - describes the result after those expressions have been evaluated.
price	Classifier	LAST(B.price, 1)	LAST(B.price, 2)	Comment
10	-> A			
15	-> B	null	null	Notice that LAST(A.price, 1) is null because there is still nothing mapped to B.
20	-> B	15	null	
31	-> B	20	15	
35		31	20	Not mapped because 35 < 2 * 20.

使用默认的模式变量与逻辑偏移量也可能是有意义的。

在这种情况下,偏移量会考虑到目前为止映射的所有行。

PATTERN (A B? C)
DEFINE
  B AS B.price < 20,
  C AS LAST(price, 1) < C.price
price	Classifier	LAST(price, 1)	Comment
10	-> A		
15	-> B		
20	-> C	15	LAST(price, 1) is evaluated as the price of the row mapped to the B variable.

如果第二行没有映射到 B 变量,我们会有以下结果。

price	Classifier	LAST(price, 1)	Comment
10	-> A		
20	-> C	10	LAST(price, 1) is evaluated as the price of the row mapped to the A variable.

也可以在 first/last 函数的第一个参数中使用多个模式变量引用。这样,就可以写一个访问多列的表达式。但是,所有这些表达式必须使用同一个模式变量。换句话说,LAST/FIRST 函数的值必须在单行中计算。

因此,可以使用 LAST(A.price * A.tax),但不允许使用 LAST(A.price * B.tax)这样的表达式。

匹配后策略 AFTER MATCH SKIP 子句指定了在找到完整匹配后,在哪里开始一个新的匹配过程。

有四种不同的策略。

SKIP PAST LAST ROW - 在当前匹配的最后一行之后的下一行恢复模式匹配。 SKIP TO NEXT ROW - 从匹配起始行后的下一行开始继续搜索新的匹配。 SKIP TO LAST 变量–在映射到指定模式变量的最后一行恢复模式匹配。 SKIP TO FIRST 变量–在被映射到指定模式变量的第一行恢复模式匹配。 这也是一种指定一个事件可以属于多少个匹配的方式。例如,使用 SKIP PAST LAST ROW 策略,每个事件最多只能属于一个匹配。

例子

为了更好地理解这些策略之间的差异,可以看一下下面的例子。

对于以下输入行。

 symbol   tax   price         rowtime
======== ===== ======= =====================
 XYZ      1     7       2018-09-17 10:00:01
 XYZ      2     9       2018-09-17 10:00:02
 XYZ      1     10      2018-09-17 10:00:03
 XYZ      2     5       2018-09-17 10:00:04
 XYZ      2     17      2018-09-17 10:00:05
 XYZ      2     14      2018-09-17 10:00:06

我们用不同的策略评估以下查询。

SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            SUM(A.price) AS sumPrice,
            FIRST(rowtime) AS startTime,
            LAST(rowtime) AS endTime
        ONE ROW PER MATCH
        [AFTER MATCH STRATEGY]
        PATTERN (A+ C)
        DEFINE
            A AS SUM(A.price) < 30
    )

查询返回映射到 A 的所有行的价格总和,以及整体匹配的第一个和最后一个时间戳。

根据使用的 AFTER MATCH 策略,查询会产生不同的结果。

AFTER MATCH SKIP PAST ROW(跳过最后一行)

 symbol   sumPrice        startTime              endTime
======== ========== ===================== =====================
 XYZ      26         2018-09-17 10:00:01   2018-09-17 10:00:04
 XYZ      17         2018-09-17 10:00:05   2018-09-17 10:00:06

第一个结果与 1 号,2 号,3 号,4 号行相匹配。

第二个结果与#5, #6 行相匹配。

匹配后跳转到下一行。

 symbol   sumPrice        startTime              endTime
======== ========== ===================== =====================
 XYZ      26         2018-09-17 10:00:01   2018-09-17 10:00:04
 XYZ      24         2018-09-17 10:00:02   2018-09-17 10:00:05
 XYZ      15         2018-09-17 10:00:03   2018-09-17 10:00:05
 XYZ      22         2018-09-17 10:00:04   2018-09-17 10:00:06
 XYZ      17         2018-09-17 10:00:05   2018-09-17 10:00:06

同样,第一个结果对 1 号、2 号、3 号、4 号行进行匹配。

与之前的策略相比,接下来的匹配中又包含了 2 号行的匹配。因此,第二个结果与行#2,#3,#4,#5 相匹配。

第三个结果与 3 号,4 号,5 号行相匹配。

第四个结果与行#4,#5,#6 相匹配。

最后一个结果与行#5,#6 匹配。

匹配后跳转到最后一行。

 symbol   sumPrice        startTime              endTime
======== ========== ===================== =====================
 XYZ      26         2018-09-17 10:00:01   2018-09-17 10:00:04
 XYZ      15         2018-09-17 10:00:03   2018-09-17 10:00:05
 XYZ      22         2018-09-17 10:00:04   2018-09-17 10:00:06
 XYZ      17         2018-09-17 10:00:05   2018-09-17 10:00:06

同样,第一个结果针对 1 号、2 号、3 号、4 号行进行匹配。

与之前的策略相比,接下来的匹配只包括 3 号行(映射到 A 行),再次进行匹配。因此,第二个结果与行#3,#4,#5 相匹配。

第三个结果与#4,#5,#6 行相匹配。

最后一个结果与行#5,#6 匹配,因此第三个结果与行#4,#5,#6 匹配。

匹配后跳转到第一行 A。

这个组合会产生一个运行时异常,因为我们总是试图在上一个比赛开始的地方开始一个新的比赛。这将产生一个无限循环,因此是被禁止的。

我们必须记住,在使用 SKIP TO FIRST/LAST 变量策略的情况下,有可能没有记录映射到该变量上(例如模式 A*)。在这种情况下,将抛出一个运行时异常,因为标准要求有一条有效的记录来继续匹配。

时间属性 为了在 MATCH_RECOGNIZE 之上应用一些后续的查询,可能需要使用时间属性。为了选择这些属性,有两个函数可用。

功能描述 MATCH_ROWTIME() 返回被映射到给定模式的最后一行的时间戳。

所得到的属性是一个 rowtime 属性,它可以被用于后续的基于时间的操作,如区间连接和组窗口或窗口聚合。

MATCH_PROCTIME() 返回一个 proctime 属性,该属性可用于后续基于时间的操作,如区间连接和组窗口或窗口聚合。

控制内存消耗 在编写 MATCH_RECOGNIZE 查询时,内存消耗是一个重要的考虑因素,因为潜在的匹配空间是以类似广度优先的方式建立的。考虑到这一点,必须确保模式能够完成。最好是有合理数量的行映射到匹配中,因为它们必须适应内存。

例如,模式不能有一个没有上限的量化器,接受每一行。这样的模式可以是这样的。

PATTERN (A B+ C)
DEFINE
  A as A.price > 10,
  C as C.price > 20

该查询将把每一条进入的记录映射到 B 变量上,因此永远不会结束。这个查询可以通过否定 C 的条件来解决。

PATTERN (A B+ C)
DEFINE
  A as A.price > 10,
  B as B.price <= 20,
  C as C.price > 20

或者通过使用勉强的定量器。

PATTERN (A B+? C)
DEFINE
  A as A.price > 10,
  C as C.price > 20

注意 请注意,MATCH_RECOGNIZE 子句不使用配置的状态保留时间。人们可能希望使用 WITHIN 子句来达到这个目的。

已知限制 Flink 对 MATCH_RECOGNIZE 子句的实现是一项持续的努力,目前还不支持 SQL 标准的一些功能。

不支持的功能包括

模式表达式。 模式组–这意味着,例如量化符不能应用于模式的子序列。因此,(A (B C)+)不是有效的模式。 改变–像 PATTERN((A B | C D) E)这样的模式,这意味着在寻找 E 行之前必须先找到一个子序列 A B 或 C D。 PERMUTE 运算符–相当于它所应用的所有变量的排列组合,例如 PATTERN(PERMUTE (A, B, C))=PATTERN(A B C | A C B | B A C | B A C | C B A | C B A)。 锚 - ^, $,表示一个分区的开始/结束,这些在流媒体环境中没有意义,将不被支持。 排除 - PATTERN ({- A -} B) 意味着 A 将被查找,但不会参与输出。这只对 ALL ROWS PER MATCH 模式有效。 不情愿的可选量化符–PATTERN A?? 只支持贪婪的可选量化符。 ALL ROWS PER MATCH 输出模式–它为每一条参与创建发现匹配的记录产生一条输出行。这也意味着。 MEASURES 子句唯一支持的语义是 FINAL。 CLASSIFIER 函数,该函数返回某行被映射到的模式变量,目前还不支持。 SUBSET - 允许创建模式变量的逻辑组,并在 DEFINE 和 MEASURES 子句中使用这些组。 物理偏移–PREV/NEXT,它索引所有看到的事件,而不是只索引那些被映射到模式变量的事件(如逻辑偏移情况)。 提取时间属性–目前没有可能为后续基于时间的操作获取时间属性。 MATCH_RECOGNIZE 只支持 SQL。在 Table API 中没有等价物。 聚合。 不支持不同的聚合。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/match_recognize.html