Wait the light to fall

事件驱动型应用程序

焉知非鱼

Event-driven Applications

Process Functions #

介绍 #

ProcessFunction 将事件处理与定时器和状态结合起来,使其成为流处理应用的强大构件。这是用 Flink 创建事件驱动应用的基础。它与 RichFlatMapFunction 非常相似,但增加了定时器。

实例 #

如果你做过流分析培训中的实战练习,你会记得它使用 TumblingEventTimeWindow 来计算每个司机在每个小时内的小费之和,就像这样:

// compute the sum of the tips per hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
        .keyBy((TaxiFare fare) -> fare.driverId)
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .process(new AddTips());

用 KeyedProcessFunction 做同样的事情是相当直接的,也是很有教育意义的。让我们先把上面的代码替换成这样:

// compute the sum of the tips per hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
        .keyBy((TaxiFare fare) -> fare.driverId)
        .process(new PseudoWindow(Time.hours(1)));

在这段代码中,一个名为 PseudoWindow 的 KeyedProcessFunction 被应用于一个 keyed 流,其结果是一个 DataStream<Tuple3<Long,Long,Float>>(就是使用 Flink 内置时间窗口的实现所产生的那种流)。

PseudoWindow 的整体轮廓是这样的形状:

// Compute the sum of the tips for each driver in hour-long windows.
// The keys are driverIds.
public static class PseudoWindow extends 
        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

    private final long durationMsec;

    public PseudoWindow(Time duration) {
        this.durationMsec = duration.toMilliseconds();
    }

    @Override
    // Called once during initialization.
    public void open(Configuration conf) {
        . . .
    }

    @Override
    // Called as each fare arrives to be processed.
    public void processElement(
            TaxiFare fare,
            Context ctx,
            Collector<Tuple3<Long, Long, Float>> out) throws Exception {

        . . .
    }

    @Override
    // Called when the current watermark indicates that a window is now complete.
    public void onTimer(long timestamp, 
            OnTimerContext context, 
            Collector<Tuple3<Long, Long, Float>> out) throws Exception {

        . . .
    }
}

需要注意的事情。

  • ProcessFunctions 有好几种类型–这是一个 KeyedProcessFunctions,但还有 CoProcessFunctions、BroadcastProcessFunctions 等。

  • KeyedProcessFunction 是 RichFunction的一种。作为一个 RichFunction,它可以访问在管理 keyed state 下工作所需的 opengetRuntimeContext 方法。

  • 有两个回调要实现:processElementonTimerprocessElement 在每次传入事件时被调用;onTimer 在定时器发射时被调用。这些定时器可以是事件时间,也可以是处理时间定时器。processElementonTimer 都提供了一个上下文对象,该对象可以用来与 TimerService 交互(除其他外)。两个回调都还传递了一个可以用来发出结果的 Collector。

open() 方法 #

// Keyed, managed state, with an entry for each window, keyed by the window's end time.
// There is a separate MapState object for each driver.
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {

    MapStateDescriptor<Long, Float> sumDesc =
            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
    sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件可能会不按顺序到达,所以有时需要处理一个小时的事件,然后再完成前一个小时的结果计算。事实上,如果水印延迟比窗口长度长得多,那么可能会同时打开许多窗口,而不是只有两个。本实现通过使用 MapState 来支持这一点,MapState 将每个窗口结束的时间戳映射到该窗口的提示之和。

processElement() 方法 #

public void processElement(
        TaxiFare fare,
        Context ctx,
        Collector<Tuple3<Long, Long, Float>> out) throws Exception {

    long eventTime = fare.getEventTime();
    TimerService timerService = ctx.timerService();

    if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
    } else {
        // Round up eventTime to the end of the window containing this event.
        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

        // Schedule a callback for when the window has been completed.
        timerService.registerEventTimeTimer(endOfWindow);

        // Add this fare's tip to the running total for that window.
        Float sum = sumOfTips.get(endOfWindow);
        if (sum == null) {
            sum = 0.0F;
        }
        sum += fare.tip;
        sumOfTips.put(endOfWindow, sum);
    }
}

要考虑的事情:

  • 迟到的事件会怎样?在水印后面的事件(即迟到)会被丢弃。如果你想做一些比这更好的事情,可以考虑使用侧输出,这将在下一节解释。

  • 这个例子使用了一个 MapState,其中键是时间戳,并为同一个时间戳设置一个 Timer。这是一种常见的模式;它使得在定时器发射时查找相关信息变得简单而高效。

onTimer() 方法 #

public void onTimer(
        long timestamp, 
        OnTimerContext context, 
        Collector<Tuple3<Long, Long, Float>> out) throws Exception {

    long driverId = context.getCurrentKey();
    // Look up the result for the hour that just ended.
    Float sumOfTips = this.sumOfTips.get(timestamp);

    Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
    out.collect(result);
    this.sumOfTips.remove(timestamp);
}

观察:

  • 传递给 onTimer 的 OnTimerContext 上下文可以用来确定当前的键。

  • 我们的伪窗口是在当前水印到达每个小时结束时被触发的,此时调用 onTimer。这个 onTimer 方法从 sumOfTips 中删除了相关的条目,这样做的效果是无法容纳迟到的事件。这相当于在使用 Flink 的时间窗口时,将 allowLateness 设置为零。

性能方面的考虑 #

Flink 提供了针对 RocksDB 优化的 MapState 和 ListState 类型。在可能的情况下,应该使用这些类型来代替持有某种集合的 ValueState 对象。RocksDB 状态后端可以追加到 ListState,而不需要经过(去)序列化,对于 MapState,每个键/值对都是一个单独的 RocksDB 对象,因此 MapState 可以有效地被访问和更新。

侧输出 #

介绍 #

有几个很好的理由可以让 Flink operator 有一个以上的输出流,比如报告:

  • 异常
  • 畸形事件
  • 迟到事件
  • 操作警报,如与外部服务的连接超时。

侧输出是一种方便的方式。除了错误报告,侧输出也是实现流的多路分割的好方法。

例子 #

现在,您可以对上一节中被忽略的迟到事件做些什么了。

一个侧输出通道与一个 OutputTag<T> 相关联。这些标签具有与侧输出的 DataStream 的类型相对应的通用类型,它们有名称。

private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

上面展示的是一个静态的 OutputTag<TaxiFare>,它既可以在 PseudoWindow 的 processElement 方法中发出迟到事件时被引用。

if (eventTime <= timerService.currentWatermark()) {
    // This event is late; its window has already been triggered.
    ctx.output(lateFares, fare);
} else {
    . . .
}

并在访问这一侧输出的流时,在作业的 main 方法中输出:

// compute the sum of the tips per hour for each driver
SingleOutputStreamOperator hourlyTips = fares
        .keyBy((TaxiFare fare) -> fare.driverId)
        .process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();

或者,您可以使用两个具有相同名称的 OutputTags 来引用同一侧面输出,但如果您这样做,它们必须具有相同的类型。

结束语 #

在这个例子中,你已经看到了如何使用 ProcessFunction 来重新实现一个直接的时间窗口。当然,如果 Flink 内置的窗口 API 满足你的需求,无论如何,请继续使用它。但如果你发现自己在考虑用 Flink 的窗口做一些变形,不要害怕推出自己的窗口。

此外,ProcessFunction 对于计算分析之外的许多其他用例也很有用。下面的实践练习提供了一个完全不同的例子。

ProcessFunction 的另一个常见用例是用于过期的陈旧状态。如果你回想一下 Rides 和 Fares 练习,其中使用 RichCoFlatMapFunction 来计算一个简单的连接,示例解决方案假设 TaxiRides 和 TaxiFares 是完美匹配的,每个 rideId 是一对一的。如果一个事件丢失了,同一乘车 ID 的其他事件将永远保持在状态。这可以替换为一个 KeyedCoProcessFunction 来实现,并且可以使用一个定时器来检测和清除任何陈旧的状态。

实践 #

与本节配套的实战练习是 Long Ride Alerts 练习

进一步阅读 #