Wait the light to fall

Process Function

焉知非鱼

Process Function

ProcessFunction #

ProcessFunction 是一种低级的流处理操作,可以访问所有(非循环)流应用的基本构件。

  • 事件(流元素)
  • 状态(容错,一致,只在 keyed 流上)。
  • 定时器(事件时间和处理时间,仅在 keyed 流上)

ProcessFunction 可以被认为是一个 FlatMapFunction,它可以访问 keyed 状态和定时器。它通过对输入流中收到的每个事件进行调用来处理事件。

对于容错状态,ProcessFunction 提供了对 Flink 的 keyed 状态的访问,通过 RuntimeContext 访问,类似于其他有状态函数访问 keyed 状态的方式。

定时器允许应用程序对处理时间和事件时间的变化做出反应。每次调用函数 processElement(...) 都会得到一个 Context 对象,它可以访问元素的事件时间时间戳,以及 TimerServiceTimerService 可以用来为未来的事件/处理时间实例注册回调。对于事件时间定时器,当当前水印提前到或超过定时器的时间戳时,就会调用 onTimer(...) 方法;而对于处理时间定时器,当挂钟时间达到指定时间时,就会调用 onTimer(...)。在该调用过程中,所有的状态又会被限定在创建定时器的键上,允许定时器对 keyed 状态进行操作。

注意:如果你想访问 keyed 状态和定时器,你必须在 keyed 流上应用 ProcessFunction

stream.keyBy(...).process(new MyProcessFunction())

低级连接(join) #

为了实现对两个输入的低级操作,应用程序可以使用 CoProcessFunctionKeyedCoProcessFunction。该函数与两个不同的输入绑定,并从两个不同的输入中获取对 processElement1(...)processElement2(...) 记录的单独调用。

实现低级联接(join)通常遵循这种模式。

  • 为一个输入(或两个输入)创建一个状态对象。
  • 从其输入中接收到元素时更新状态
  • 在接收到另一个输入的元素后,探测状态并产生加入的结果。

例如,您可能会将客户数据加入到金融交易中,同时为客户数据保留状态。如果你关心在面对失序事件时是否有完整的、确定性的加入,你可以使用一个定时器,当客户数据流的水印已经超过该交易的时间时,你可以为该交易评估并发出加入。

例子 #

在下面的例子中,KeyedProcessFunction 维护每个键的计数,并且每当一分钟过去(以事件时间为单位),该键没有更新时,就会发出一个键/计数对。

  • 计数、键和最后修改时间戳都存储在一个 ValueState 中,这个 ValueState 隐式地被键所限定。
  • 对于每条记录,KeyedProcessFunction 都会递增计数器并设置最后修改时间戳。
  • 该函数还将在未来一分钟内(以事件时间为单位)安排一次回调。
  • 在每次回调时,它都会将回调的事件时间戳与存储的计数的最后修改时间进行核对,如果两者匹配(即在该分钟内没有进一步的更新发生),则发出 key/count

注意: 这个简单的例子可以用会话窗口来实现。我们在这里使用 KeyedProcessFunction 来说明它提供的基本模式。

import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...

// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  * The data type stored in the state
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * The implementation of the ProcessFunction that maintains the count and timeouts
  */
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {

  /** The state that is maintained by this process function */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(
      value: (String, String), 
      ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, 
      out: Collector[(String, Long)]): Unit = {

    // initialize or retrieve/update the state
    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // write the state back
    state.update(current)

    // schedule the next timer 60 seconds from the current event time
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(
      timestamp: Long, 
      ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, 
      out: Collector[(String, Long)]): Unit = {

    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}

注意:在 Flink 1.4.0 之前,当从处理时间计时器调用时,ProcessFunction.onTimer() 方法将当前处理时间设置为事件时间戳。这种行为非常微妙,可能不会被用户发现。嗯,这是有害的,因为处理时间时间戳是不确定的,而且不与水印对齐。此外,用户实现的逻辑依赖于这个错误的时间戳极有可能是无意中的错误。所以我们决定修复它。升级到 1.4.0 后,使用这个错误的事件时间时间戳的 Flink 作业将失败,用户应该将他们的作业调整为正确的逻辑。

KeyedProcessFunction #

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer(...) 方法中提供了对定时器键的访问。

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
  var key = ctx.getCurrentKey
  // ...
}

定时器 #

两种类型的定时器(处理时间和事件时间)都由 TimerService 内部维护,并排队执行。

TimerService 对每个键和时间戳的定时器进行重复复制,即每个键和时间戳最多只有一个定时器。如果同一个时间戳注册了多个定时器,onTimer() 方法将只被调用一次。

注意 Flink 同步调用 onTimer()processElement()。因此,用户不必担心状态的并发修改。

容错性 #

定时器是容错的,并与应用程序的状态一起检查点。在故障恢复或从保存点启动应用程序时,定时器将被恢复。

注意: 在恢复之前应该启动的检查点处理时间定时器将立即启动。这可能发生在应用程序从故障中恢复或从保存点启动时。

注意: 定时器总是异步检查点,除了 RocksDB 后端/与增量快照/与基于堆的定时器的组合(将用FLINK-10026解决)。注意,大量的定时器会增加检查点时间,因为定时器是检查点状态的一部分。请参阅 “定时器凝聚 “部分,了解如何减少定时器数量的建议。

定时器凝聚 #

由于 Flink 对每个键和时间戳只维护一个定时器,您可以通过降低定时器分辨率来凝聚定时器的数量。

对于1秒的定时器分辨率(事件或处理时间),您可以将目标时间四舍五入到整秒。定时器最多会提前1秒发射,但不会晚于要求的毫秒精度。因此,每个键和秒最多有一个定时器。

val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)

由于事件时间定时器只在有水印出现时才会启动,你也可以通过使用当前的水印来安排和凝聚这些定时器与下一个水印。

val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)

定时器也可以通过以下方式停止或删除。

停止一个处理时间的定时器。

val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)

停止事件时间定时器。

val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

注意: 如果没有注册给定时间戳的定时器,停止定时器没有效果。

原文连接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html