Wait the light to fall

连续查询中的 Join

焉知非鱼

Join in Continuous Queries

连续查询中的 Join

在批处理数据时,连接是一种常见的、好理解的操作,用来连接两个关系的行。然而,在动态表上的连接的语义就不那么明显了,甚至是混乱的。

正因为如此,有几种方法可以使用 Table APISQL 实际执行连接。

关于语法的更多信息,请查看 Table APISQL 中的连接部分。

常规连接 #

常规联接是最通用的联接类型,联接输入的任何一条新记录或变化都是可见的,并影响整个联接结果。例如,如果左边有一条新记录,它将与右边所有以前和将来的记录一起连接。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

这些语义允许任何形式的更新(插入、更新、删除)输入表。

然而,这种操作有一个重要的含义:它需要将 join 输入的双方永远保持在 Flink 的状态中。因此,如果一个或两个输入表持续增长,资源使用量也会无限增长。

区间连接 #

区间联接是由联接谓词定义的,它检查输入记录的时间属性是否在一定的时间限制内,即时间窗口。

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

与普通的 join 操作相比,这种 join 只支持带有时间属性的 append-only 表。由于时间属性是准单调递增的,所以 Flink 可以在不影响结果正确性的情况下,从其状态中删除旧值。

用临时表函数进行联接 #

使用时态表函数的连接,将一个只附加表(左输入/探针侧)与一个时态表(右输入/建立侧)连接起来,即一个随时间变化的表,并跟踪其变化。关于临时表的更多信息,请查看相应页面。

下面的例子显示了一个只附加表 Orders,它应该与不断变化的货币汇率表 RatesHistory 连接。

Orders 是一个只附加表,表示给定金额和给定货币的付款。例如在 10:15 有一个金额为 2 欧元的订单。

SELECT * FROM Orders;

rowtime amount currency
======= ====== =========
10:15        2 Euro
10:30        1 US Dollar
10:32       50 Yen
10:52        3 Euro
11:04        5 US Dollar

RatesHistory 代表了一个不断变化的对日元(汇率为 1)的货币汇率附加表。例如,从 09:00 到 10:45,欧元对日元的汇率是 114,从 10:45 到 11:15 是 116。从 10:45 到 11:15 是 116。

SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

我们想计算所有订单的金额,并将其换算成一种通用货币(日元)。

例如,我们想使用给定行时间(114)的适当换算率换算以下订单。

rowtime amount currency
======= ====== =========
10:15        2 Euro

如果不使用临时表的概念,就需要写一个类似的查询。

SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);

在临时表函数 Rates over RatesHistory 的帮助下,我们可以将这样的查询用 SQL 表达为:

SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

来自探针侧的每条记录将与构建侧表在探针侧记录的相关时间属性时的版本连接。为了支持更新(覆盖)构建侧表的先前值,表必须定义一个主键。

在我们的例子中,来自 Orders 的每条记录将与 Rates 的版本在时间 o.rowtime 连接。货币字段之前已经被定义为 Rates 的主键,在我们的例子中用来连接两个表。如果查询使用的是处理时间的概念,那么在执行操作时,新添加的订单将始终与 Rates 的最新版本连接。

常规的连接不同,这意味着如果在构建端有新的记录,不会影响之前的连接结果。这又使得 Flink 可以限制必须保留在状态中的元素数量。

区间联接相比,时间表联接并没有定义一个时间窗口,在这个时间窗口的范围内,记录将被加入。来自探针侧的记录总是在时间属性指定的时间与构建侧的版本进行连接。因此,构建侧的记录可能是任意的旧记录。随着时间的流逝,记录(对于给定的主键)以前的和不再需要的版本将从状态中删除。

这样的行为使得时间表连接成为用关系术语来表达流丰富的一个很好的候选。

使用方法 #

定义了临时表函数之后,我们就可以开始使用它了。时间表函数的使用方法可以和普通表函数的使用方法一样。

下面的代码片段解决了我们的动机问题,即从订单表中转换货币。

SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
// scala
val result = orders
    .joinLateral(rates('o_proctime), 'r_currency === 'o_currency)
    .select(('o_amount * 'r_rate).sum as 'amount)

注意:在查询配置中定义的状态保留还没有实现时序连接。这意味着计算查询结果所需的状态可能会根据历史表的不同主键的数量而无限增长。

处理时间的 Temporal 连接 #

有了处理时间时间属性,就不可能将过去的时间属性作为参数传递给时序表函数。根据定义,它总是当前的时间戳。因此,对处理时间时间表函数的调用将始终返回底层表的最新已知版本,底层历史表的任何更新也将立即覆盖当前值。

只有构建侧记录的最新版本(相对于定义的主键)才会保存在状态中。构建侧的更新不会对之前发出的连接结果产生影响。

我们可以把处理时的时空联接看成一个简单的 HashMap<K,V>,它存储了来自构建侧的所有记录。当来自构建侧的新记录与之前的某个记录具有相同的键时,旧的值只是简单地被覆盖。来自探针侧的每条记录总是根据 HashMap 的最近/当前状态进行评估。

事件时间的 Temporal 连接 #

有了事件时间属性(即行时间属性),就可以将过去的时间属性传递给时间表函数。这样就可以在一个共同的时间点上连接两个表。

与处理时间的时空连接相比,时空表不仅保留了状态下构建方记录的最新版本(相对于定义的主键),而且还存储了自上次水印以来的所有版本(通过时间来识别)。

例如,一个事件时间时间戳为 12:30:00 的传入行被追加到探针侧表中,根据临时表的概念,它与构建侧表中时间为 12:30:00 的版本连接。因此,传入的行只与时间戳小于或等于 12:30:00 的行连接,并根据主键应用更新,直到这个时间点。

根据事件时间的定义,水印允许联接操作在时间上向前移动,并丢弃不再需要的构建表的版本,因为预计不会有时间戳较低或相等的传入行。

用时间表进行联接 #

带时态表的连接将一个任意表(左输入/探针侧)与一个时态表(右输入/建立侧)连接起来,即一个随时间变化的外部维度表。关于时间表的详细信息,请查看相应页面。

注意: 用户不能使用任意表作为时间表,而是需要使用一个由 LookupableTableSource 支持的表。一个 LookupableTableSource 只能作为一个时态表用于时态连接。有关如何定义 LookupableTableSource 的详细信息,请参见页面。

下面的示例显示了一个 Orders 流,它应该与不断变化的货币汇率表 LatestRates 进行连接。

LatestRates 是一个维度表,它是以最新的汇率来实现的。在时间 10:15、10:30、10:52,LatestRates 的内容如下。

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1


10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

时间 10:15 和 10:30 的 LastestRates 内容相等。欧元汇率在 10:52 从 114 变为 116。

订单是一个只附加的表,表示给定金额和给定货币的支付。例如在 10:15 有一个金额为 2 欧元的订单。

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

我们想计算所有订单的金额,并将其兑换成一种通用货币(日元)。

例如,我们想使用 LatestRates 中的最新汇率来转换以下订单。结果将是:

amount currency     rate   amout*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

在时间表连接的帮助下,我们可以将这样的查询用 SQL 表达为。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

来自探针侧的每一条记录都将与构建侧表的当前版本相连接。在我们的例子中,查询使用的是处理时间的概念,所以在执行操作时,新追加的订单将始终与最新版本的 LatestRates 连接。需要注意的是,结果并不是处理时间的确定性。

常规联接相比,尽管构建端发生了变化,但时态表联接之前的结果不会受到影响。另外,时态表连接操作符非常轻量级,不保留任何状态。

区间联接相比,时态表联接不定义记录联接的时间窗口。在处理时,来自探针侧的记录总是与构建侧的最新版本连接。因此,构建侧的记录可能是任意旧的。

时态表函数连接和时态表连接的动机都是一样的,但在 SQL 语法和运行时的实现上却有所不同。

  • 时间表函数 join 的 SQL 语法是 join UDTF,而时间表 join 使用 SQL:2011 中引入的标准时间表语法。
  • 时态表函数 join 的实现实际上是将两个流连接起来并保持状态,而时态表 join 只是接收唯一的输入流,并根据记录中的键查找外部数据库。
  • 时态表函数联接通常用于联接变更日志流,而时态表联接通常用于联接外部表(即维表)。

这样的行为使得时态表连接成为用关系术语来表达流丰富的一个很好的候选。

在未来,时态表连接将支持时态表函数连接的特性,即支持时态连接 changelog 流。

使用方法 #

时间表连接的语法如下。

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.column-name1

目前只支持 INNER JOIN 和 LEFT JOIN。在 temporal 表后应跟上 FOR SYSTEM_TIME AS OF table1.proctime。proctime 是 table1 的处理时间属性。这意味着它在处理时间对时间表进行快照,当从左表连接每一条记录时,它就会对时间表进行快照。

例如,在定义了 temporal 表之后,我们可以按以下方式使用。

SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders
  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
  ON r_currency = o_currency

注意: 这只在 Blink 计划器中支持。

注意: 目前只在 SQL 中支持,在 Table API 中还不支持。

注意: Flink 目前不支持事件时间的表连接。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html