Wait the light to fall

Joining

焉知非鱼

Joining

窗口连接(Join)

窗口连接(window join)将两个流的元素连接起来,这两个流有一个共同的键,并且位于同一个窗口中。这些窗口可以通过使用窗口分配器来定义,并对两个流的元素进行评估。

然后,来自双方的元素被传递到一个用户定义的 JoinFunction 或 FlatJoinFunction 中,用户可以发出符合加入标准的结果。

一般的用法可以归纳为以下几点。

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

关于语义的一些说明:

  • 两个流中元素的成对组合的创建就像一个内部连接,这意味着一个流中的元素如果没有另一个流中的相应元素与之连接,就不会发出。
  • 那些被加入的元素将以各自窗口中最大的时间戳作为它们的时间戳。例如,一个以 [5, 10) 为边界的窗口将导致加入的元素以9作为它们的时间戳。

在下面的章节中,我们将使用一些示例性的场景来概述不同类型的窗口连接是如何进行的。

滚动窗口连接 #

当执行滚动窗口连接时,所有具有共同的键和共同的滚动窗口的元素都会以成对组合的方式进行连接,并传递给 JoinFunctionFlatJoinFunction。因为这表现得像一个内连接,所以一个流的元素如果在其滚动窗口中没有来自另一个流的元素,就不会被发出去!这就是为什么我们要把一个流的元素加入到滚动窗口中。

img

如图所示,我们定义了一个大小为2毫秒的滚动窗口,其结果是 [0,1][2,3],…形式的窗口。图中显示了每个窗口中所有元素的配对组合,这些元素将被传递给 JoinFunction。请注意,在翻滚窗口 [6,7] 中,没有任何元素发出,因为绿色流中没有元素存在,要与橙色元素⑥和⑦连接。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

滑动窗连接 #

在执行滑动窗口连接时,所有具有共同键和共同滑动窗口的元素都会以成对组合的方式连接,并传递给 JoinFunctionFlatJoinFunction。一个流的元素如果在当前的滑动窗口中没有来自另一个流的元素,则不会被发出! 请注意,有些元素可能在一个滑动窗口中被加入,但在另一个滑动窗口中却没有!

img

在这个例子中,我们使用的是大小为两毫秒的滑动窗口,并将它们滑动一毫秒,结果是滑动窗口 [-1,0],[0,1],[1,2],[2,3],…。x轴下面的加入元素就是每个滑动窗口传递给 JoinFunction 的元素。这里你也可以看到,例如橙色的②与绿色的③在窗口 [2,3] 中连接,但与窗口 [1,2] 中的任何元素都没有连接。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply { (e1, e2) => e1 + "," + e2 }

会议窗口连接 #

当执行会话窗口连接时,所有具有相同键的元素,当"组合"满足会话标准时,将以成对组合的方式连接,并传递给 JoinFunctionFlatJoinFunction。同样,这也是执行内部连接,所以如果有一个会话窗口只包含来自一个流的元素,就不会有输出。

img

在这里,我们定义了一个会话窗口加入,其中每个会话被至少1ms的间隙所分割。有三个会话,在前两个会话中,两个流中的加入元素都会传递给 JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有加入!在第三个会话中,绿色流中没有元素。

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply { (e1, e2) => e1 + "," + e2 }

间隔连接 #

间隔连接将两个流的元素(我们暂且称它们为A和B)用一个共同的键连接起来,流B中的元素的时间戳与流A中元素的时间戳处于一个相对的时间间隔。

这也可以更正式地表达为 b.timestamp∈[a.timestamp + lowerBound; a.timestamp + upperBound]a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B的元素,它们有一个共同的键。下界和上界都可以是负的或正的,只要下界总是小于或等于上界。区间连接目前只执行内连接。

当一对元素传递给 ProcessJoinFunction 时,它们将被分配为两个元素中较大的时间戳(可以通过 ProcessJoinFunction.Context 访问)。

注意:间隔连接目前只支持事件时间。

img

在上面的例子中,我们将两个流"橙色"和"绿色"连接起来,下界为-2毫秒,上界为+1毫秒。默认情况下,这些边界是包容的,但可以应用 .lowerBoundExclusive().upperBoundExclusive 来改变行为。

再次使用更正式的符号,这将被翻译为:

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

as indicated by the triangles.

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

原文连接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html