Wait the light to fall

检查点

焉知非鱼

Checkpointing

Flink 中的每一个函数和操作符都可以是有状态的(详情请看使用状态)。有状态的函数在单个元素/事件的处理过程中存储数据,使得状态成为任何类型的更复杂操作的关键构建模块。

为了使状态具有容错性,Flink 需要对状态进行 checkpoint。检查点允许 Flink 恢复流中的状态和位置,使应用程序具有与无故障执行相同的语义。

关于流式容错的文档详细描述了 Flink 的流式容错机制背后的技术。

前提条件 #

Flink 的检查点机制与流和状态的持久存储交互。一般来说,它需要:

  • 一个能在一定时间内重放记录(replay records)的持久(或耐用)数据源。这种源的例子是持久性消息队列(如 Apache Kafka、RabbitMQ、Amazon Kinesis、Google PubSub)或文件系统(如 HDFS、S3、GFS、NFS、Ceph…)。
  • 状态的持久性存储,通常是一个分布式文件系统(如 HDFS、S3、GFS、NFS、Ceph…)。

启用和配置检查点 #

默认情况下,检查点被禁用。要启用检查点,在 StreamExecutionEnvironment 上调用 enableCheckpointing(n),其中 n 是检查点间隔,单位为毫秒。

检查点的其他参数包括:

  • exactly-once vs. at-least-once:你可以选择向 enableCheckpointing(n) 方法传递一个模式,以便在两个保证级别之间进行选择。对于大多数应用来说,exactly-once 是比较好的。At-least-once 可能适用于某些超低延迟(持续几毫秒)的应用。

  • 检查点超时。如果一个正在进行中的检查点没有完成,那么它被中止的时间。

  • 检查点之间的最小时间。为了确保流应用在检查点之间有一定的进度,可以定义检查点之间需要经过多少时间。例如,如果这个值设置为5000,那么下一个检查点将在上一个检查点完成后不早于5秒开始,无论检查点持续时间和检查点间隔如何。请注意,这意味着检查点间隔永远不会小于这个参数。

通过定义"检查点之间的时间"(time between checkpoints)通常比检查点间隔更容易配置应用程序,因为"检查点之间的时间"不容易受到检查点有时可能比平均时间长的事实的影响(例如,如果目标存储系统暂时缓慢)。

请注意,这个值也意味着并发检查点的数量为1。

  • 并发检查点的数量。默认情况下,当一个检查点仍在进行时,系统不会触发另一个检查点。这可以确保拓扑不会在检查点上花费太多时间,而使处理流的工作没有进展。可以允许多个重叠的检查点,这对于那些有一定处理延迟(例如因为函数调用外部服务,需要一些时间来响应),但仍然希望做非常频繁的检查点(100s毫秒),以便在故障时重新处理很少的管道来说是很有意思的。

当定义了检查点之间的最小时间时,不能使用这个选项。

  • 外部化检查点。您可以配置周期性检查点,使其在外部持久化。外部化的检查点会将它们的元数据写入持久化存储中,当作业失败时不会自动清理。这样一来,如果你的工作失败了,你身边就会有一个检查点来恢复。关于外部化检查点的部署说明中有更多细节。

  • Fail/checkpoint 错误时继续执行任务。这决定了如果在执行任务的检查点过程中出现错误,任务是否会失败。这是默认行为。另外,当禁用该功能时,任务将简单地拒绝向检查点协调器提供检查点并继续运行。

  • 更喜欢用于恢复的检查点。这决定了即使有更近的保存点可用时,任务是否会回退到最新的检查点,以减少恢复时间。

  • 不对齐的检查点。你可以启用不对齐的检查点,以大大减少背压下的检查点时间。仅适用于精确的一次检查点,且并发检查点数量为1。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// 高级选项:

// 设置模式为 exactly-once (这是默认的)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// enables the experimental unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()

相关配置选项 #

更多的参数和/或默认值可以通过 conf/flink-conf.yaml 来设置(参见配置的完整指南)。

默认值 类型 描述
state.backend (none) String 用于存储和 checkpoint 状态的状态后端。
state.backend.async true Boolean 状态后端是否应该在可能的情况下使用异步快照方法的选项,可配置。有些状态后端可能不支持异步快照,或者只支持异步快照,而忽略这个选项。
state.backend.fs.memory-threshold 20 kb MemorySize 状态数据文件的最小尺寸。小于这个大小的所有状态块都内嵌存储在根检查点元数据文件中。该配置的最大内存阈值为1MB。
state.backend.fs.write-buffer-size 4096 Integer 写入文件系统的检查点流的默认写缓冲区大小。实际的写缓冲区大小是由这个选项和选项 ‘state.backend.fs.memory-threshold’ 的最大值决定的。
state.backend.incremental false Boolean 如果可能,状态后端是否应该创建增量检查点。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。一旦启用,在 Web UI 中显示的状态大小或从 rest API 中获取的状态大小只代表 delta 检查点大小,而不是完整的检查点大小。一些状态后端可能不支持增量检查点而忽略这个选项。
state.backend.local-recovery false Boolean 这个选项可以配置这个状态后端的本地恢复。默认情况下,本地恢复是被停用的。本地恢复目前只覆盖 keyed state 后端。目前,MemoryStateBackend 不支持本地恢复,忽略此选项。
state.checkpoints.dir (none) String 在 Flink 支持的文件系统中,用于存储检查点数据文件和元数据的默认目录。该存储路径必须可以从所有参与进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.num-retained 1 Integer 保留已完成的检查点的最大数量。
state.savepoints.dir (none) String 保存点的默认目录。由将保存点写入文件系统的状态后端(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)使用。
taskmanager.state.local.root-dirs (none) String 配置参数,定义本地恢复中存储基于文件的状态的根目录。本地恢复目前只覆盖 keyed state 后端。目前,MemoryStateBackend 不支持本地恢复,忽略这个选项。

选择状态后端 #

Flink 的检查点机制在定时器和有状态的操作符中存储所有状态的一致快照,包括连接器、窗口和任何用户定义的状态。检查点存储的位置(例如,JobManager内存、文件系统、数据库)取决于配置的状态后端。

默认情况下,状态保存在 TaskManager 的内存中,检查点保存在 JobManager 的内存中。为了正确地持久化大状态,Flink 支持各种方法在其他状态后端存储和检查点状态。状态后端的选择可以通过 StreamExecutionEnvironment.setStateBackend(...) 进行配置。

有关可用的状态后端以及作业范围(job-wide)和集群范围(cluster-wide)配置选项的更多细节,请参见状态后端

迭代作业中的状态检查点 #

Flink 目前只为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊标志:env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

请注意,循环边缘中飞行中的记录(以及与之相关的状态变化)将在失败时丢失。

重新启动策略 #

Flink 支持不同的重启策略,这些策略可以控制作业(job)在发生故障时如何重启。更多信息,请参阅重启策略