Wait the light to fall

使用状态

焉知非鱼

Working With State

在本节中,您将了解 Flink 为编写有状态程序提供的 API。请看 Stateful Stream Processing 来了解有状态流处理背后的概念。

Keyed DataStream #

如果要使用 keyed state,首先需要在 DataStream 上指定一个键,这个键应该用来分隔(partition)状态(也包括流中的记录本身)。你可以在 DataStream 上使用 keyBy(KeySelector) 指定一个键。这将产生一个 KeyedDataStream,然后允许使用 keyed state 的操作。

键选择函数将一条记录作为输入,并返回该记录的键。键可以是任何类型的,并且必须从确定性计算中导出。

Flink 的数据模型不是基于键值对的。因此,您不需要将数据集类型物理地打包成键和值。键是"虚拟"的:它们被定义为实际数据上的函数,以指导分组操作符。

下面的例子显示了一个键选择函数,它只是返回对象的字段。

// 普通的 case 类
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

元组键和表达式键 #

Flink 还有两种定义键的方法:元组键和表达式键。有了它,你可以使用元组字段索引或表达式来指定键,用于选择对象的字段。我们今天不推荐使用这些,但你可以参考 DataStream 的 Javadoc 来了解它们。严格来说,使用 KeySelector 函数更胜一筹:使用 Java lambdas,它们很容易使用,而且它们在运行时的开销可能更少。

使用 Keyed State #

keyed State 接口提供了对不同类型的状态的访问,这些状态的作用域都是当前输入元素的键。这意味着,这种类型的状态只能在 KeyedStream 上使用,它可以通过 stream.keyBy(...) 来创建。

现在,我们将首先看看不同类型的状态有哪些,然后我们会看看如何在程序中使用它们。可用的状态原语有:

  • ValueState<T>:它保留了一个可更新和检索的值(如上所述,作用域为输入元素的键,因此操作符所看到的每个键都可能有一个值)。这个值可以使用 update(T) 来设置,也可以使用 T value() 来检索。

  • ListState<T>:这保存了一个元素列表。你可以在所有当前存储的元素上追加元素和检索一个 Iterable。使用 add(T)addAll(List<T>) 添加元素,可以使用 Iterable<T> get() 检索 Iterable。你也可以用 update(List<T>) 覆盖现有的列表。

  • ReducingState<T>: 这保留了一个单一的值,代表所有添加到状态的值的集合。该接口类似于 ListState,但使用 add(T) 添加的元素会使用指定的 ReduceFunction 被化简成一个总计。

  • AggregatingState<IN,OUT>:这保留了一个单一的值,代表所有添加到状态的值的聚合。与 ReducingState 相反,aggregate 类型可能与添加到状态中的元素类型不同。接口与 ListState 相同,但使用 add(IN) 添加的元素会使用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 它保存了一个映射列表。你可以将键值对放入状态中,并在所有当前存储的映射上检索一个 Iterable。使用 put(UK, UV)putAll(Map<UK, UV>) 可以添加映射。与用户键相关联的值可以使用 get(UK) 来检索。可以分别使用 entries()keys()values() 检索映射、键和值的可迭代视图。你也可以使用 isEmpty() 来检查这个映射是否包含任何键值映射。

所有类型的状态也都有一个方法 clear(),可以清除当前活动键的状态,也就是输入元素的键。

需要注意的是,这些状态对象只用于带状态的接口。状态不一定存储在里面,而可能驻留在磁盘或其他地方。第二件要记住的事情是,你从状态中得到的值取决于输入元素的键。因此,如果所涉及的键不同,你在用户函数的一次调用中得到的值可能与另一次调用中的值不同。

为了得到一个状态句柄,你必须创建一个 StateDescriptor。这里面包含了状态的名称(我们稍后会看到,你可以创建多个状态,而且它们必须有独特的名称,这样你才能引用它们),状态所拥有的值的类型,可能还有一个用户指定的函数,比如 ReduceFunction。根据你要检索的状态类型,你可以创建一个 ValueStateDescriptor、一个 ListStateDescriptor、一个 ReducingStateDescriptor 或一个 MapStateDescriptor

状态是使用 RuntimeContext 访问的,所以只有在富函数(rich functions)中才有可能。请看这里了解相关信息,但我们也会很快看到一个例子。RichFunction 中可用的 RuntimeContext 有这些方法来访问状态。

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这是一个 FlatMapFunction 的例子,它展示了所有的部分是如何结合在一起的。

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // 访问状态值
    val tmpCurrentSum = sum.value

    // 如果之前没有使用过,则为 null。
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // 更新次数
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // 更新状态
    sum.update(newSum)

    // 如果计数达到2,则发出平均数,并清除状态。
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}

这个例子实现了一个穷人的计数窗口。我们用第一个字段对元组进行 keyed 操作(在本例中,所有元组都有相同的键 1)。该函数将计数和运行的总和存储在一个 ValueState 中。一旦计数达到 2,它就会发出平均数并清除状态,这样我们就可以从 0 开始。注意,如果我们在第一个字段中的元组具有不同的值,那么这将为每个不同的输入键保持不同的状态值。

状态存活时间(TTL) #

可以为任何类型的 keyed state 分配一个生存时间(TTL)。如果配置了 TTL,并且状态值已经过期,存储的值将在尽力的基础上进行清理,这将在下面详细讨论。

所有状态集合类型都支持每个条目的 TTL。这意味着列表元素和映射条目独立过期。

为了使用状态 TTL,必须首先建立一个 StateTtlConfig 配置对象。然后可以通过传递配置在任何状态描述符中启用 TTL 功能。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

配置有几个选项需要考虑。

newBuilder 方法的第一个参数是强制性的,它是存活的时间值。

更新类型配置状态 TTL 何时被刷新(默认为 OnCreateAndWrite)。

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入访问时才会出现
  • StateTtlConfig.UpdateType.OnReadAndWrite - 也是在读的时候。

状态可见性配置如果过期值尚未清理,是否在读取访问时返回(默认为 NeverReturnExpired)。

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 过期值永不返回
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回。

NeverReturnExpired 的情况下,过期状态就像不存在一样,即使它仍然必须被删除。这个选项对于数据在 TTL 之后必须严格地成为不可读的访问状态的用例是很有用的,例如处理隐私敏感数据的应用程序。

另一个选项 ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。

注意:

  • 状态后端存储最后一次修改的时间戳和用户值,这意味着启用该功能会增加状态存储的消耗。Heap 状态后端在内存中存储了一个额外的 Java 对象,该对象有一个对用户状态对象的引用和一个原始的长值。RocksDB 状态后端每存储一个值、列表项或映射项增加8个字节。

  • 目前只支持参考处理时间的 TTL。

  • 试图使用启用 TTL 的描述符来恢复之前没有配置 TTL 的状态,或者反之,将导致兼容性失败和 StateMigrationException

  • TTL 配置不是检查点或保存点的一部分,而是 Flink 在当前运行的作业中如何处理的一种方式。

  • 带 TTL 的映射状态目前只有在用户值序列化器能够处理 null 值的情况下才支持 null 用户值。如果序列化器不支持空值,可以用 NullableSerializer 包装,代价是在序列化形式中多出一个字节。

过期状态的清理 #

默认情况下,过期的值会在读取时显式删除,如 ValueState#value,如果配置的状态后台支持,则会定期在后台进行垃圾回收。后台清理可以在 StateTtlConfig 中禁用。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground
    .build

如果想对后台的一些特殊清理进行更精细的控制,可以按照下面的描述单独配置。目前,堆状态后台依靠增量清理,RocksDB 后台使用压实过滤器进行后台清理。

全快照中的清理 #

此外,您可以在拍摄完整状态快照的瞬间激活清理,这将减少其大小。在当前的实现下,本地状态不会被清理,但在从上一个快照恢复的情况下,它将不包括删除的过期状态。可以在 StateTtlConfig 中进行配置。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

此选项不适用于 RocksDB 状态后端的增量检查点。

注意:

  • 对于现有的作业,这个清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后。
增量清理 #

另一种选择是逐步触发一些状态条目的清理。触发器可以是每次状态访问或/和每次记录处理的回调。如果这种清理策略对某些状态是激活的,存储后端就会为这个状态的所有条目保留一个惰性的全局迭代器。每次触发增量清理时,迭代器都会被提前。对遍历过的状态条目进行检查,对过期的条目进行清理。

这个功能可以在 StateTtlConfig 中配置。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build

这个策略有两个参数。第一个是每次清理触发的检查状态条目数。它总是在每次状态访问时触发。第二个参数定义是否在每次记录处理中额外触发清理。堆后端默认的后台清理每次记录处理检查5个条目而不进行清理。

注意:

  • 如果没有发生对状态的访问或者没有处理记录,过期状态将持续存在。
  • 增量清理所花费的时间会增加记录处理的延迟。
  • 目前,增量清理只在堆状态后端实现。对 RocksDB 的设置不会有影响。
  • 如果堆状态后端与同步快照一起使用,全局迭代器在迭代的时候会保留所有键的副本,因为它的具体实现不支持并发修改。那么启用这个功能会增加内存消耗。异步快照则不存在这个问题。
  • 对于现有的作业,这个清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后。
RocksDB 压缩过程中的清理 #

如果使用 RocksDB 状态后端,将调用 Flink 特定的压实过滤器进行后台清理。RocksDB 会定期运行异步压实来合并状态更新,减少存储量。Flink 压实过滤器通过 TTL 检查状态条目的过期时间戳,排除过期值。

这个功能可以在 StateTtlConfig 中配置。

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build

RocksDB 压实过滤器在处理一定数量的状态条目后,每次都会从 Flink 中查询当前的时间戳,用于检查过期情况,你可以改变它,并传递自定义值给 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法。更频繁地更新时间戳可以提高清理速度,但由于它使用了来自本地代码的 JNI 调用,因此降低了压缩性能。RocksDB 后台默认的清理方式是每次处理1000个条目后查询当前时间戳。

你可以通过激活 FlinkCompactionFilter 的调试级别来激活 RocksDB 过滤器原生代码的调试日志。

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 在压实过程中调用 TTL 过滤器会使其速度减慢。TTL 过滤器必须解析最后一次访问的时间戳,并检查每个被压缩的键的存储状态条目的到期时间。如果是集合状态类型(list 或 map),每个存储元素的检查也会被调用。
  • 如果该功能用于具有非固定字节长度元素的列表状态,则原生 TTL 过滤器必须额外调用每个至少第一个元素已过期的状态条目中元素在 JNI 上的 Flink java 类型序列化器,以确定下一个未过期元素的偏移。
  • 对于现有的作业,这种清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后。

Scala DataStream API 中的状态 #

除了上面描述的接口外,Scala API 还为 KeyedStream 上具有单个 ValueState 的有状态 map()flatMap() 函数提供了快捷方式。用户函数在 Option 中获取 ValueState 的当前值,并且必须返回一个更新的值,该值将用于更新状态。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Operator State #

Operator State(或 non-keyed state)是指绑定到一个并行操作符实例的状态。Kafka 连接器是 Flink 中使用 Operator State 的一个很好的激励例子。Kafka 消费者的每个并行实例都维护着一个主题分区和偏移的映射作为其 Operator State。

Operator State 接口支持在并行操作符实例之间重新分配状态,当并行性发生变化时。有不同的方案来进行这种重新分配。

在典型的有状态的 Flink 应用中,你不需要操作符状态。它主要是一种特殊类型的状态,用于源/接收器实现和你没有键的情况下,可以通过它来分隔状态。

广播状态 #

Broadcast State 是 Operator State 的一种特殊类型。引入它是为了支持这样的用例:一个流的记录(records)需要被广播到所有下游任务,它们被用来在所有子任务中保持相同的状态。然后在处理第二个流的记录时可以访问这个状态。作为一个广播状态可以自然出现的例子,我们可以想象一个低吞吐量的流,其中包含一组规则,我们希望对来自另一个流的所有元素进行评估。考虑到上述类型的用例,广播状态与其余运算符状态的不同之处在于。

  • 它有一个 map 格式。
  • 它只适用于有广播流和非广播流作为输入的特定操作符,以及
  • 这样的操作符可以拥有多个不同名称的广播状态。

使用 Operator State #

要使用运算符状态,有状态函数可以实现 CheckpointedFunction 接口。

CheckpointedFunction #

CheckpointedFunction 接口提供了对不同重分配方案的 non-keyed 的访问。它需要实现两个方法。

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当需要执行一个检查点时,就会调用 snapshotState()。与之对应的 initializeState(),在每次用户定义的函数被初始化时都会被调用,不管是在函数首次初始化时,还是在函数实际从早期的检查点恢复时。鉴于此,initializeState() 不仅是初始化不同类型状态的地方,也是包含状态恢复逻辑的地方。

目前,支持列表式操作符状态。状态有望成为一个可序列化对象的 List,彼此独立,因此在重新缩放时有资格重新分配。换句话说,这些对象是 non-keyed state 可以重新分配的最细粒度。根据状态访问方法的不同,定义了以下重分布方案。

  • 均分重分配: 每个操作符都会返回一个状态元素列表。整个状态在逻辑上是所有列表的连接(concatenation)。在还原/再分配时,列表被平均分成有多少个并行操作符就有多少个子列表。每个操作符都会得到一个子列表,这个子列表可以是空的,也可以包含一个或多个元素。举个例子,如果在并行度为1的情况下,一个操作符的检查点状态包含元素1和元素2,当把并行度增加到2时,元素1可能最终进入操作符实例0,而元素2将进入操作符实例1。

  • 联盟再分配。每个操作符都会返回一个状态元素列表。整个状态在逻辑上是所有 List 的连接(concatenation)。在还原/再分配时,每个操作符都会得到完整的状态元素列表。如果你的列表可能有很高的基数(cardinality),请不要使用这个功能。检查点元数据将为每个列表条目存储一个偏移,这可能会导致 RPC 帧大小或内存外错误。

下面是一个有状态的 SinkFunction 的例子,它使用 CheckpointedFunction 来缓冲元素,然后再将它们发送到外界。它演示了基本的均分重分配列表状态。

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()

  override def invoke(value: (String, Int), context: Context): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }

}

initializeState 方法的参数是一个 FunctionInitializationContext。它用于初始化 non-keyed “容器”。这些容器是一个 ListState 类型的容器,在检查点时,non-keyed 对象将被存储在那里。

请注意如何初始化状态,类似于 keyed state,用一个 StateDescriptor 来初始化,这个 StateDescriptor 包含了状态名称和状态所持有的值的类型信息。

val descriptor = new ListStateDescriptor[(String, Long)](
    "buffered-elements",
    TypeInformation.of(new TypeHint[(String, Long)]() {})
)

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态访问方法的命名约定包含其重分配模式,然后是其状态结构。例如,如果要在还原时使用 union 重分配方案的列表状态,则使用 getUnionListState(descriptor) 访问状态。如果方法名中不包含重分配模式,例如 getListState(descriptor),则仅仅意味着将使用基本的均分重分配方案。

在初始化容器后,我们使用上下文的 isRestored() 方法来检查是否在故障后恢复。如果为真,即我们正在恢复,则应用还原逻辑。

如修改后的 BufferingSink 的代码所示,在状态初始化过程中恢复的这个 ListState 被保存在一个类变量中,以便将来在 snapshotState() 中使用。在那里,ListState 会被清除掉之前检查点所包含的所有对象,然后用我们要检查点的新对象来填充。

顺便说一下, keyed state 也可以在 initializeState() 方法中初始化。这可以使用提供的 FunctionInitializationContext 来完成。

有状态的源函数 #

与其他操作符相比,有状态的源需要更多的小心。为了使状态和输出集合的更新是原子性的(对于失败/恢复时的精确一次性语义来说是必需的),用户需要从源的上下文中获得一个锁。

class CounterSource
       extends RichParallelSourceFunction[Long]
       with CheckpointedFunction {

  @volatile
  private var isRunning = true

  private var offset = 0L
  private var state: ListState[Long] = _

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock

    while (isRunning) {
      // output and state update are atomic
      lock.synchronized({
        ctx.collect(offset)

        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false
  
  override def initializeState(context: FunctionInitializationContext): Unit = {
    state = context.getOperatorStateStore.getListState(
      new ListStateDescriptor[Long]("state", classOf[Long]))

    for (l <- state.get().asScala) {
      offset = l
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    state.clear()
    state.add(offset)
  }
}

一些运算符可能需要检查点被 Flink 完全承认时的信息来与外界沟通。在这种情况下,请参见 org.apache.flink.runtime.state.CheckpointListener 接口。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html