Wait the light to fall

FlinkCEP - Flink 的复杂事件处理

焉知非鱼

Flink Cep Complex Event Processing for Flink

FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库。它允许你在无尽的事件流中检测事件模式, 让你有机会掌握数据中的重要内容。

本页介绍了 Flink CEP 中可用的 API 调用。我们首先介绍 Pattern API, 它允许你指定你想在你的流中检测的模式, 然后介绍你如何检测并对匹配的事件序列采取行动。然后, 我们将介绍 CEP 库在处理事件时间的延迟时做出的假设, 以及如何将你的工作从旧版 Flink 迁移到 Flink-1.3。

入门

如果你想直接进入, 设置一个 Flink 程序, 并将 FlinkCEP 依赖关系添加到项目的 pom.xml 中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

信息:FlinkCEP 不是二进制发行版的一部分。请在这里查看如何与它链接进行集群执行。

现在你可以开始使用模式 API 编写你的第一个 CEP 程序了。

注意: 你想应用模式匹配的 DataStream 中的事件必须实现适当的 equals()hashCode() 方法, 因为 FlinkCEP 使用它们来比较和匹配事件。

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
        override def processMatch(
              `match`: util.Map[String, util.List[Event]],
              ctx: PatternProcessFunction.Context,
              out: Collector[Alert]): Unit = {
            out.collect(createAlertFrom(pattern))
        }
    })

Pattern API

模式 API 允许你定义你想从输入流中提取的复杂模式序列。

每个复杂模式序列由多个简单模式组成, 即寻找具有相同属性的单个事件的模式。从现在开始, 我们将把这些简单模式称为模式, 而最终我们要在流中寻找的复杂模式序列, 就是模式序列。你可以把模式序列看成是这样的模式图, 根据用户指定的条件, 从一个模式过渡到下一个模式, 例如 event.getName().equals(“end”)。一个匹配是一个输入事件的序列, 它通过有效的模式转换序列, 访问复杂模式图的所有模式。

注意: 每个模式必须有一个唯一的名称, 你以后用它来识别匹配事件。

注意: 模式名称不能包含字符 “:"。

在本节的其余部分, 我们将首先介绍如何定义 单个模式, 然后介绍如何将单个模式组合成 复杂模式

单个模式 #

模式可以是单个模式, 也可以是循环模式。单元模式只接受一个事件, 而循环模式可以接受多个事件。在模式匹配符号中, 模式 “a b+ c?d”(或 “a”, 后面跟着一个或多个 “b”, 可选地跟着一个 “c”, 后面跟着一个 “d”), a、c? 和 d 是单个模式, 而 b+ 是循环模式。默认情况下, 模式是一个单个模式, 你可以通过使用量词将其转换为一个循环模式。每个模式可以有一个或多个条件, 基于这些条件, 它可以接受事件。

量词 #

在 FlinkCEP 中, 你可以使用这些方法来指定循环模式:pattern.oneOrMore(), 用于期望给定事件出现一次或多次的模式(例如前面提到的 b+);以及 pattern.times(#ofTimes), 用于期望给定事件出现的特定次数的模式, 例如 4 个 a;以及 pattern.times(#fromTimes, #toTimes), 用于期望给定事件的特定最小出现次数和最大出现次数的模式, 例如 2-4 个 a。

你可以使用 pattern.greedy() 方法使循环模式变得贪婪, 但你还不能使分组模式变得贪婪。你可以使用 pattern.option() 方法使所有模式, 不管是否循环, 都是可选的。

对于名为 start 的模式, 以下是有效的量词。

// expecting 4 occurrences
start.times(4)

// expecting 0 or 4 occurrences
start.times(4).optional()

// expecting 2, 3 or 4 occurrences
start.times(2, 4)

// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy()

// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional()

// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy()

// expecting 1 or more occurrences
start.oneOrMore()

// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy()

// expecting 0 or more occurrences
start.oneOrMore().optional()

// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy()

// expecting 2 or more occurrences
start.timesOrMore(2)

// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy()

// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional()

// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy()

条件

对于每个模式, 你可以指定一个条件, 传入的事件必须满足这个条件才能被"接受"到模式中, 例如, 它的值应该大于 5, 或者大于之前接受的事件的平均值。你可以通过 pattern.where()、pattern.or() 或 pattern.until() 方法来指定事件属性的条件。这些条件可以是 IterativeConditions 或 SimpleConditions。

迭代条件。这是最通用的条件类型。你可以通过这种方式指定一个条件, 该条件基于之前接受的事件的属性或其中一个子集的统计量来接受后续事件。

下面是一个迭代条件的代码, 如果一个名为 “middle” 的模式的名称以 “foo” 开头, 并且如果该模式之前接受的事件的价格加上当前事件的价格之和不超过 5.0 的值, 则接受该模式的下一个事件。迭代条件可以发挥强大的作用, 尤其是与循环模式相结合, 例如 oneOrMore()。

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

注意:调用 ctx.getEventsForPattern(…) 可以为给定的潜在匹配找到所有之前接受的事件。这个操作的成本可能会有所不同, 所以在实现你的条件时, 尽量减少它的使用。

描述的上下文使人们也可以访问事件的时间特征。更多信息请看时间上下文。

简单条件。这种类型的条件扩展了前面提到的 IterativeCondition 类, 仅根据事件本身的属性来决定是否接受一个事件。

start.where(event => event.getName.startsWith("foo"))

最后, 你还可以通过 pattern.subtype(subClass) 方法将接受的事件类型限制为初始事件类型的一个子类型(这里是 Event)。

start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

组合条件。如上所示, 你可以将子类型条件与其他条件结合起来。这对每个条件都适用。你可以通过依次调用 where() 来任意组合条件。最后的结果将是各个条件的结果的逻辑 AND。要使用 OR 组合条件, 可以使用 or() 方法, 如下所示。

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

停止条件:如果是循环模式(oneOrMore() 和 oneOrMore().option()), 你也可以指定一个停止条件, 例如, 接受值大于 5 的事件, 直到值的总和小于 50。

为了更好地理解它, 请看下面的例子。给定:

像 “(a+ until b)” (一个或多个 “a” 直到 “b”) 这样的模式

输入事件的序列 “a1” “c” “a2” “b” “a3”

该库将输出结果: {a1 a2} {a1} {a2} {a3}.

正如你所看到的 {a1 a2 a3} 或 {a2 a3} 由于停止条件没有返回。

  • where(条件) - 定义当前模式的条件。要匹配模式, 一个事件必须满足条件。多个连续的 where() 子句会导致其条件被 AND 化。
pattern.where(event => ... /* some condition */)
  • or(条件) - 添加一个新的条件, 该条件与现有的条件相匹配。一个事件只有在通过至少一个条件的情况下才能与模式匹配。
pattern.where(event => ... /* some condition */)
       .or(event => ... /* alternative condition */)
  • until(条件) - 指定循环模式的停止条件。意思是如果发生了与给定条件相匹配的事件, 则不会再接受更多的事件进入模式。

仅与 oneOrMore() 结合使用。

注意:它允许在事件条件下对相应的模式进行清洗状态。

pattern.oneOrMore().until(event => ... /* some condition */)
  • subtype(subClass) - 为当前模式定义一个子类型条件。只有当一个事件属于这个子类型时, 它才能与模式相匹配。
pattern.subtype(classOf[SubEvent])
  • oneOrMore() - 指定该模式期望匹配事件至少出现一次。

默认情况下, 使用的是放宽的内部连续(在后续事件之间)。关于内部连续性的更多信息, 请参见 consecutive

注意:建议使用 until() 或 within() 来启用状态清除。

pattern.oneOrMore()
  • timesOrMore(#times) - 指定该模式期望一个匹配事件至少出现 #times 次。

默认情况下, 使用的是放宽的内部连续(在后续事件之间)。关于内部相邻性的更多信息, 请参见 consecutive

pattern.timesOrMore(2)
  • times(#ofTimes) - 指定该模式期望匹配事件的准确出现次数。

默认情况下, 使用的是放宽的内部连续性(在后续事件之间)。关于内部相邻性的更多信息, 请参见 consecutive

pattern.times(2)
  • times(#fromTimes, #toTimes) - 指定该模式期望匹配事件的 #fromTimes 和 #toTimes 之间出现。

默认情况下, 使用的是放宽的内部连续性(在后续事件之间)。关于内部相邻性的更多信息, 请参见 consecutive

pattern.times(2, 4)
  • optional() - 指定该模式是可选的, 即它可能根本不会出现。这适用于上述所有量词。
pattern.oneOrMore().optional()
  • greedy() - 指定该模式是贪婪的, 即会尽可能多的重复。这只适用于量词, 目前不支持组模式。
pattern.oneOrMore().greedy()

组合模式 #

现在你已经看到了单个模式的样子, 现在是时候看看如何将它们组合成一个完整的模式序列了。

一个模式序列必须从一个初始模式开始, 如下所示。

val start : Pattern[Event, _] = Pattern.begin("start")

下一步, 你可以通过指定它们之间所需的毗连条件, 将更多的模式附加到你的模式序列中。FlinkCEP 支持以下形式的事件之间的相邻性。

  • 严格相邻: 希望所有匹配的事件严格地一个接一个出现, 中间没有任何非匹配的事件。
  • Relaxed Contiguity: 忽略匹配事件之间出现的非匹配事件。
  • 非决定性的松弛相邻性(Non-Deterministic Relaxed Contiguity)。进一步放宽相邻性, 允许忽略一些匹配事件的额外匹配。

要在连续模式之间应用它们, 你可以使用:

  • next(), 用于严格相邻,
  • followedBy(), 用于松散相邻, 和
  • followedByAny(), 用于非确定性的松散相邻。

  • notNext(), 如果你不希望一个事件类型直接跟随另一个事件类型
  • notFollowedBy(), 如果你不想让一个事件类型位于两个其他事件类型之间的任何地方。

注意:模式序列不能以 notFollowedBy() 结束。

注意: NOT 模式不能在前面加上一个可选模式。

// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)

// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

松散毗连意味着只有第一个后续的匹配事件才会被匹配, 而对于非确定性的松散毗连, 同一开头会发出多个匹配。举个例子, 一个模式 “a b”, 给定事件序列 “a”, “c”, “b1”, “b2”, 将得到以下结果。

“a” 和 “b” 之间有严格的毗连性。{} (不匹配), “a” 后面的 “c” 会导致 “a” 被丢弃。

“a” 和 “b” 之间的松散相邻性。{a b1}, 因为松散连续性被看作是 “跳过非匹配事件, 直到下一个匹配事件”。

“a” 和 “b” 之间的非确定性松散相邻性。{a b1}, {a b2}, 因为这是最一般的形式。

也可以定义一个时间约束, 让模式有效。例如, 你可以通过 pattern.within() 方法定义一个模式应该在 10 秒内发生。处理时间和事件时间都支持时间模式。

注意: 模式序列只能有一个时间约束。如果在不同的单个模式上定义了多个这样的约束, 那么就采用最小的约束。

next.within(Time.seconds(10))

循环模式中的相邻性

你可以在循环模式中应用与上一节讨论的相同的相邻性条件。相邻性将被应用在这样一个模式中的元素之间。为了举例说明, 模式序列 “a b+ c”(“a” 后面跟着一个或多个 “b” 的任意(非确定的松散的)序列, 后面跟着一个 “c”), 输入 “a”、“b1”、“d1”、“b2”、“d2”、“b3”、“c”, 会有以下结果。

严格相邻性:{a b3 c} - “b1” 后面的 “d1” 会导致 “b1” 被丢弃, “b2” 也会因为 “d2” 而被丢弃。

放宽相邻性:{a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c} - “d” 被忽略。

非确定性的松弛相邻性:{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c} - 注意{a b1 b3 c}, 这是 “b” 之间松弛相邻性的结果。

对于循环模式(例如 oneOrMore() 和 times()), 默认是放宽毗连性。如果你想要严格的相邻性, 你必须通过使用 continuous() 调用来明确指定, 如果你想要非确定性的松弛相邻性, 你可以使用 allowCombinations() 调用。

  • consecutive()

与 oneOrMore() 和 times() 一起使用, 并在匹配的事件之间施加严格的毗连性, 即任何不匹配的元素都会中断匹配(如 next())。

如果不应用, 则使用宽松的连续性(如 followedBy())。

例如, 像这样的模式。

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().consecutive()
  .followedBy("end1").where(_.getName().equals("b"))

将为一个输入序列生成以下匹配。C D A1 A2 A3 D A4 B

与连续应用。{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}。

不连续应用。{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}。

  • allowCombinations()

与 oneOrMore()和 times()一起使用, 并在匹配的事件之间施加非确定性的松散相邻性(如 followedByAny())。

如果不应用, 则使用松散的相邻性(如 followedBy())。

例如, 像这样的模式。

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().allowCombinations()
  .followedBy("end1").where(_.getName().equals("b"))

将为一个输入序列生成以下匹配。C D A1 A2 A3 D A4 B

启用组合。{C A1 B}、{C A1 A2 B}、{C A1 A3 B}、{C A1 A4 B}、{C A1 A2 A3 B}、{C A1 A2 A4 B}、{C A1 A3 A4 B}、{C A1 A2 A3 A4 B}、{C A1 A2 A3 A4 B}。

不启用组合。{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}。

模式组 #

也可以定义一个模式序列作为 begin、followBy、followByAny 和 next 的条件。该模式序列将被视为逻辑上的匹配条件, 并将返回一个 GroupPattern, 并且可以对 GroupPattern 应用 oneOrMore()、times(#ofTimes)、times(#fromTimes、#toTimes)、optional()、continuous()、allowCombinations()。

val start: Pattern[Event, _] = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)

// strict contiguity
val strict: Pattern[Event, _] = start.next(
    Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
    Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
    Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
  • begin(#name) - 定义一个起始模式。
val start = Pattern.begin[Event]("start")
  • begin(#pattern_sequence) - 定义一个起始模式。
val start = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
  • next(#name) - 添加一个新的模式。一个匹配事件必须直接接替前一个匹配事件(严格相邻)。
val next = start.next("middle")
  • next(#pattern_sequence) - 添加一个新的模式。一个匹配事件的序列必须直接接替前一个匹配事件(严格相邻)。
val next = start.next(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
  • followedBy(#name) - 添加一个新的模式。其他事件可以发生在一个匹配事件和上一个匹配事件之间(松散的相邻性)。
val followedBy = start.followedBy("middle")
  • followedBy(#pattern_sequence) - 添加一个新的模式。其他事件可以发生在一系列匹配事件和前一个匹配事件之间(放松的相邻性)。
val followedBy = start.followedBy(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
  • followedByAny(#name) - 添加一个新的模式。在一个匹配事件和上一个匹配事件之间可以发生其他事件, 并且对每一个备选匹配事件都会呈现备选匹配(非确定性的松散毗连性)。
val followedByAny = start.followedByAny("middle")
  • followedByAny(#pattern_sequence) - 添加一个新的模式。在一个匹配事件序列和前一个匹配事件之间可以发生其他事件, 并且将为每一个可供选择的匹配事件序列呈现备选匹配(非确定性的松散毗连性)。
val followedByAny = start.followedByAny(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)

notNext() - 添加一个新的否定模式。一个匹配(负值)事件必须直接接替前一个匹配事件(严格的相邻性), 以使部分匹配被丢弃。

val notNext = start.notNext("not")
  • notFollowedBy() - 添加一个新的负模式。即使在匹配(负值)事件和前一个匹配事件之间发生了其他事件, 部分匹配事件序列也会被丢弃(松散的相邻性)。
val notFollowedBy = start.notFollowedBy("not")

within(time) - 定义事件序列匹配模式的最大时间间隔。如果一个未完成的事件序列超过了这个时间, 它将被丢弃。

pattern.within(Time.seconds(10))

After Match Skip Strategy #

对于一个给定的模式, 同一个事件可能会被分配给多个成功的匹配。要控制一个事件将被分配到多少个匹配中, 你需要指定名为 AfterMatchSkipStrategy 的跳过策略。有五种类型的跳过策略, 如下所示。

  • NO_SKIP: 每一个可能的匹配都会被发出。
  • SKIP_TO_NEXT:丢弃每一个局部的匹配, 从相同的事件开始, 发射匹配开始。
  • SKIP_PAST_LAST_EVENT: 丢弃每一个在匹配开始后但结束前开始的部分匹配。
  • SKIP_TO_FIRST: 丢弃每个在匹配开始后但在 PatternName 的第一个事件发生之前开始的部分匹配。
  • SKIP_TO_LAST: 丢弃在匹配开始后但在 PatternName 的最后一个事件发生之前开始的每一个部分匹配。

注意, 当使用 SKIP_TO_FIRST 和 SKIP_TO_LAST 跳过策略时, 还应该指定一个有效的 PatternName。

例如, 对于给定的模式 b+ c 和数据流 b1 b2 b3 c, 这四种跳过策略的区别如下。

Skip Strategy 结果 描述
NO_SKIP b1 b2 b3 c b2 b3 c b3 c 找到匹配的 b1 b2 b3 c 后, 匹配过程不会丢弃任何结果。
SKIP_TO_NEXT b1 b2 b3 c b2 b3 c b3 c 找到匹配的 b1 b2 b3 c 后, 匹配过程不会丢弃任何结果, 因为没有其他匹配可以从 b1 开始。
SKIP_PAST_LAST_EVENT b1 b2 b3 c 在找到匹配的 b1 b2 b3 c 后, 匹配过程将放弃所有开始的部分匹配。
SKIP_TO_FIRST[b] b1 b2 b3 c b2 b3 c b3 c 找到匹配的 b1 b2 b3 c 后, 匹配过程会尝试丢弃所有在 b1 之前开始的部分匹配, 但没有这样的匹配。因此, 没有任何匹配结果会被丢弃。
SKIP_TO_LAST[b] b1 b2 b3 c b3 c 找到匹配的 b1 b2 b3 c 后, 匹配过程会尝试丢弃所有在 b3 之前开始的部分匹配。有一个这样的匹配 b2 b3 c。

还可以看看另一个例子, 以更好地了解 NO_SKIP 和 SKIP_TO_FIRST 的区别:模式: (a | b | c) (b | c) c+.greedy d 和序列: a b c1 c2 c3 d 那么结果将是:

Skip Strategy 结果 描述
NO_SKIP a b c1 c2 c3 d b c1 c2 c3 d c1 c2 c3 d 找到匹配的 a b c1 c2 c3 d 后, 匹配过程不会丢弃任何结果。
SKIP_TO_FIRST[c*] a b c1 c2 c3 d c1 c2 c3 d 在找到匹配的 a b c1 c2 c3 d 后, 匹配过程将丢弃所有在 c1 之前开始的部分匹配, 有一个这样的匹配 b c1 c2 c3 d。有一个这样的匹配 b c1 c2 c3 d。

为了更好地理解 NO_SKIP 和 SKIP_TO_NEXT 的区别, 请看下面的例子: Pattern: a b+ 和序列: a b1 b2 b3 那么结果将是:

Skip Strategy 结果 描述
NO_SKIP a b1 a b1 b2 a b1 b2 b3 找到匹配的 b1 后, 匹配过程不会丢弃任何结果。
SKIP_TO_NEXT a b1 在找到匹配的 b1 后, 匹配过程将丢弃从 a 开始的所有部分匹配, 这意味着既不能生成 b1 b2, 也不能生成 b1 b2 b3。

要指定使用哪种跳过策略, 只需调用 AfterMatchSkipStrategy 来创建一个 AfterMatchSkipStrategy。

功能 描述
AfterMatchSkipStrategy.noSkip() 创建一个 NO_SKIP 跳过策略
AfterMatchSkipStrategy.skipToNext() 创建一个 SKIP_TO_NEXT 跳过策略。
AfterMatchSkipStrategy.skipPastLastEvent() 创建一个 SKIP_PAST_LAST_EVENT 跳过策略。
AfterMatchSkipStrategy.skipToFirst(patternName) 用引用的模式名 patternName 创建一个 SKIP_TO_FIRST 跳过策略。
AfterMatchSkipStrategy.skipToLast(patternName) 用引用的模式名 patternName 创建一个 SKIP_TO_LAST 跳过策略。

然后通过调用跳过策略来应用于模式。

val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

注意 对于 SKIP_TO_FIRST/LAST 有两个选项来处理没有元素映射到指定变量的情况。默认情况下, 将使用 NO_SKIP 策略。另一个选项是在这种情况下抛出异常。我们可以通过以下方式启用这个选项

AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()

检测模式 #

在指定了你要寻找的模式序列后, 现在是时候将其应用到你的输入流中以检测潜在的匹配。要针对你的模式序列运行事件流, 你必须创建一个 PatternStream。给定一个输入流输入、一个模式模式和一个可选的比较器比较器, 用于在 EventTime 的情况下对具有相同时间戳的事件或在同一时刻到达的事件进行排序, 你可以通过调用创建 PatternStream。

val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

输入流可以是 keyed 的, 也可以是 non-keyed 的, 这取决于你的使用情况。

注意: 在 non-keyed 流上应用模式将导致作业的并行度等于 1。

从模式中选择 #

一旦你获得了一个 PatternStream, 你就可以对检测到的事件序列进行转换。建议的方法是通过 PatternProcessFunction 来实现。

PatternProcessFunction 有一个 processMatch 方法, 它对每个匹配的事件序列都会被调用。它以 Map<String, List<IN>> 的形式接收匹配, 其中键是你的模式序列中每个模式的名称, 值是该模式的所有接受事件的列表(IN 是你的输入元素的类型)。给定模式的事件是按时间戳排序的。返回每个模式所接受的事件列表的原因是, 当使用循环模式(例如 oneToMany() 和 times())时, 一个给定模式可能会接受多个事件。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

PatternProcessFunction 提供了对 Context 对象的访问。通过它, 我们可以访问与时间相关的特性, 如当前处理时间或当前匹配的时间戳(这是分配给匹配的最后一个元素的时间戳)。更多信息请看时间上下文。通过这个上下文, 我们还可以将结果发送到一个侧输出

处理超时的部分模式 #

当一个模式通过 within 关键字附加了一个窗口长度时, 部分事件序列有可能因为超过窗口长度而被丢弃。要对一个超时的部分匹配采取行动, 可以使用 TimedOutPartialMatchHandler 接口。该接口应该以混搭的方式使用。这意味着你可以在你的 PatternProcessFunction 中额外实现这个接口。TimedOutPartialMatchHandler 提供了额外的 processTimedOutMatch 方法, 该方法将为每个超时部分匹配调用。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

注意: processTimedOutMatch 不给人访问主输出的机会。但你仍然可以通过 Context 对象, 通过侧输出来发出结果。

方便的 API #

前面提到的 PatternProcessFunction 是在 Flink 1.8 中引入的, 从那时起, 它就是推荐的与匹配交互的方式。人们仍然可以使用老式的 API, 比如 select/flatSelect, 内部会被翻译成 PatternProcessFunction。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)

在 CEP 库中的时间 #

处理事件时间的延迟 #

在 CEP 中, 处理元素的顺序很重要。为了保证元素在事件时间工作时以正确的顺序进行处理, 一个传入的元素最初会被放在一个缓冲区中, 在这个缓冲区中, 元素根据其时间戳按升序排序, 当一个水印到达时, 这个缓冲区中所有时间戳小于水印的元素都会被处理。这意味着水印之间的元素是按事件时间顺序处理的。

注意: 当在事件时间内工作时, 该库假定水印的正确性。

为了保证水印之间的元素按事件时间顺序处理, Flink 的 CEP 库假设水印的正确性, 并将时间戳小于最后看到的水印的元素视为迟到元素。迟到的元素不会被进一步处理。另外, 你可以指定一个 sideOutput 标签来收集最后一次看到的水印之后的迟到元素, 你可以这样使用。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
      }

val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)

时间上下文 #

PatternProcessFunction 以及 IterativeCondition 中, 用户可以访问一个实现 TimeContext 的上下文, 如下所示。

/**
 * Enables access to time related characteristics such as current processing time or timestamp of
 * currently processed element. Used in {@link PatternProcessFunction} and
 * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
 */
@PublicEvolving
public interface TimeContext {

	/**
	 * Timestamp of the element currently being processed.
	 *
	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
	 * will be set to the time when event entered the cep operator.
	 */
	long timestamp();

	/** Returns the current processing time. */
	long currentProcessingTime();
}

这个上下文让用户可以访问处理事件的时间特征(在 IterativeCondition 的情况下是传入记录, 在 PatternProcessFunction 的情况下是匹配)。调用 TimeContext#currentProcessingTime 总是给你当前处理时间的值, 这个调用应该比调用 System.currentTimeMillis()更可取。

在 TimeContext#timestamp() 的情况下, 返回的值等于 EventTime 中分配的时间戳。在 ProcessingTime 中, 这将等于所述事件进入 cep 运算符的时间点(或者在 PatternProcessFunction 的情况下生成匹配时)。这意味着该值将在对该方法的多次调用中保持一致。

例子 #

下面的例子是在事件的键控数据流上检测模式 start, middle(name = “error”) -> end(name = “critical”)。这些事件通过其 id 进行 keyed, 一个有效的模式必须在 10 秒内出现。整个处理过程是以事件时间来完成的。

val env : StreamExecutionEnvironment = ...

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_))

从旧版本(1.3 前)迁移到 1.4 以上版本 #

迁移到 1.4+ 版本 #

在 Flink-1.4 中, CEP 库与 <= Flink 1.2 的向后兼容性被取消。不幸的是, 无法恢复曾经在 1.2.x 下运行的 CEP 作业。

迁移到 1.3.x #

Flink-1.3 中的 CEP 库有很多新的特性, 这导致了 API 的一些变化。在这里, 我们描述了为了能够在 Flink-1.3 中运行, 你需要对你的旧 CEP 作业进行的修改。在做了这些改变并重新编译你的作业后, 你将能够从旧版作业的保存点恢复执行, 也就是说, 不需要重新处理你过去的数据。

所需的更改是:

  • 改变你的条件(在 where(…) 子句中的条件)来扩展 SimpleCondition 类, 而不是实现 FilterFunction 接口。

  • 改变你的函数作为参数提供给 select(…) 和 flatSelect(…) 方法, 以期望与每个模式相关联的事件列表(Java 中为 List, Scala 中为 Iterable)。这是因为增加了循环模式后, 多个输入事件可以匹配一个(循环)模式。

  • Flink 1.1 和 1.2 中的 followBy() 暗示了非确定性的松散毗连性(见这里)。在 Flink 1.3 中, 这一点发生了变化, followBy() 意味着松散毗连, 而 followByAny() 应该在需要非确定性松散毗连的情况下使用。