Wait the light to fall

执行模式(批/流)

焉知非鱼

Execution Mode(Batch/Streaming)

执行模式(流/批)

DataStream API 支持不同的运行时执行模式,你可以根据用例的要求和作业的特点从中选择。

DataStream API 有一种"经典"的执行行为,我们称之为 STREAMING 执行模式。这应该用于需要连续增量处理并预计无限期保持在线的无边界作业。

此外,还有一种批式执行模式,我们称之为 BATCH 执行模式。这种执行作业的方式更容易让人联想到批处理框架,如 MapReduce。这应该用于有边界的作业,对于这些作业,你有一个已知的固定输入,并且不会连续运行。

Apache Flink 对流和批处理的统一方法意味着,无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终结果。重要的是要注意这里的 final 是什么意思:在 streaming 模式下执行的作业可能会产生增量更新(想想数据库中的 upserts),而 batch 作业在最后只会产生一个最终结果。如果解释正确的话,最终的结果是一样的,但是到达那里的方式可能是不同的。

通过启用 BATCH 执行,我们允许 Flink 应用额外的优化,而这些优化只有在我们知道我们的输入是有边界的情况下才能进行。例如,可以使用不同的 join/aggregation 策略,此外还可以使用不同的 shuffle 实现,允许更高效的任务调度和故障恢复行为。下面我们将介绍一些执行行为的细节。

什么时候可以/应该使用 BATCH 执行模式? #

BATCH 执行模式只能用于有边界的 Job/Link 程序。边界性是数据源的一个属性,它告诉我们在执行之前,来自该数据源的所有输入是否都是已知的,或者是否会有新的数据出现,可能是无限的。而一个作业,如果它的所有源都是有界的,则是有界的,否则就是无界的。

另一方面,STREAMING 执行模式既可以用于有界作业,也可以用于无界作业。

作为经验法则,当你的程序是有界的时候,你应该使用 BATCH 执行模式,因为这样会更有效率。当你的程序是无边界的时候,你必须使用 STREAMING 执行模式,因为只有这种模式足够通用,能够处理连续的数据流。

一个明显的例外情况是,当你想使用一个有界作业来引导一些作业状态,然后你想在一个无界作业中使用。例如,通过使用 STREAMING 模式运行一个有界作业,取一个保存点,然后在一个无界作业上恢复该保存点。这是一个非常特殊的用例,当我们允许将保存点作为 BATCH 执行作业的额外输出时,这个用例可能很快就会过时。

另一个可能使用 STREAMING 模式运行有边界作业的情况是为最终将在无边界源中运行的代码编写测试时。对于测试来说,在这些情况下使用有界源可能更自然。

配置 BATCH 执行模式 #

执行模式可以通过 execute.runtim-mode 设置来配置。有三种可能的值:

  • STREAMING: 经典的 DataStream 执行模式(默认)
  • BATCH: 在 DataStream API 上进行批量式执行
  • AUTOMATIC:让系统根据源的边界性来决定

这可以通过 bin/flink run ... 的命令行参数进行配置,或者在创建/配置 StreamExecutionEnvironment 时进行编程。

下面是如何通过命令行配置执行模式:

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

这个例子展示了如何在代码中配置执行模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

注意:我们建议用户不要在程序中设置运行模式,而是在提交应用程序时使用命令行进行设置。保持应用程序代码的免配置可以让程序更加灵活,因为同一个应用程序可以在任何执行模式下执行。

执行行为 #

本节概述了 BATCH 执行模式的执行行为,并与 STREAMING 执行模式进行了对比。详细内容请参考介绍该功能的 FLIP-134FLIP-140

任务调度和网络洗牌(Shuffle) #

Flink 作业(job)由不同的操作(operation)组成,这些操作在数据流图中连接在一起。系统决定如何在不同的进程/机器(TaskManager)上安排这些操作的执行,以及如何在它们之间洗牌(发送)数据。

多个操作/运算符可以使用一种称为链式的功能链在一起。Flink 认为作为调度单位的一组一个或多个(链式)运算符(operators )被称为任务(task)。通常,子任务(subtask)一词用来指在多个 TaskManager 上并行运行的单个任务实例,但我们在这里只使用任务(task)一词。

任务调度和网络洗牌对于 BATCHSTREAMING 执行模式的工作方式不同。主要是由于我们知道我们的输入数据在 BATCH 执行模式下是有边界的,这使得 Flink 可以使用更高效的数据结构和算法。

我们将用这个例子来解释任务调度和网络传输的差异。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> source = env.fromElements(...);

source.name("source")
	.map(...).name("map1")
	.map(...).name("map2")
	.rebalance()
	.map(...).name("map3")
	.map(...).name("map4")
	.keyBy((value) -> value)
	.map(...).name("map5")
	.map(...).name("map6")
	.sinkTo(...).name("sink");

暗示操作之间1对1连接模式的操作,如 map()flatMap()filter(),可以直接将数据转发到下一个操作,这使得这些操作可以链在一起。这意味着 Flink 通常不会在它们之间插入网络洗牌。

keyBy()rebalance() 等操作则需要在不同的任务并行实例之间进行数据洗牌。这就会引起网络洗牌。

对于上面的例子,Flink 会把操作分组为这样的任务。

  • Task1: source, map1 和 map2
  • Task2: map3, map4
  • Task3: map5, map6 和 sink

而我们在任务1和2,以及任务2和3之间进行网络洗牌。这是该作业的可视化表示。

img

STREAMING 执行模式 #

STREAMING 执行模式下,所有任务需要一直在线/运行。这使得 Flink 可以通过整个管道立即处理新的记录,而我们需要的是连续和低延迟的流处理。这也意味着分配给一个任务的 TaskManagers 需要有足够的资源来同时运行所有的任务。

网络洗牌是流水线式的,这意味着记录会被立即发送到下游任务,并在网络层上进行一些缓冲。同样,这也是需要的,因为当处理连续的数据流时,在任务(或任务的管道)之间没有自然的数据点(时间点)可以物化。这与 BATCH 执行模式形成了鲜明的对比,在 BATCH 执行模式下,中间的结果可以被具体化,如下所述。

BATCH 执行模式 #

BATCH 执行模式下,一个作业的任务可以被分离成可以一个接一个执行的阶段。我们之所以能做到这一点,是因为输入是有边界的,因此 Flink 可以在进入下一个阶段之前完全处理管道的一个阶段。在上面的例子中,工作会有三个阶段,对应着被洗牌障碍分开的三个任务。

分阶段处理并不是像上面针对 STREAMING 模式所解释的那样,立即向下游任务发送记录,而是需要 Flink 将任务的中间结果物化到一些非永续存储中,让下游任务在上游任务已经下线后再读取。这将增加处理的延迟,但也会带来其他有趣的特性。首先,这允许 Flink 在故障发生时回溯到最新的可用结果,而不是重新启动整个任务。另一个副作用是,BATCH 作业可以在更少的资源上执行(就 TaskManagers 的可用槽而言),因为系统可以一个接一个地顺序执行任务。

TaskManagers 将至少在下游任务没有消耗它们的情况下保留中间结果。(从技术上讲,它们将被保留到消耗的流水线区域产生它们的输出为止)。在这之后,只要空间允许,它们就会被保留,以便在失败的情况下,可以回溯到前面提到的结果。

状态后端/状态 #

在 STREAMING 模式下,Flink 使用 StateBackend 来控制状态的存储方式和检查点的工作方式。

在 BATCH 模式下,配置的状态后端被忽略。取而代之的是,keyed 操作的输入按键分组(使用排序),然后我们依次处理一个键的所有记录。这样就可以同时只保留一个键的状态。当转到下一个键时,一个给定键的状态将被丢弃。

关于这方面的背景信息,请参见 FLIP-140

事件时间/水印 #

在支持事件时间方面,Flink 的流运行时建立在一个悲观的假设上,即事件可能会出现顺序外,即一个时间戳t的事件可能会在一个时间戳t+1的事件之后出现。正因为如此,系统永远无法确定在给定的时间戳T下,未来不会再有时间戳 t<T 的元素出现。为了摊平这种失序性对最终结果的影响,同时使系统实用,在 STREAMING 模式下,Flink 使用了一种名为 Watermarks 的启发式方法。一个带有时间戳T的水印标志着没有时间戳 t<T 的元素会跟随。

在 BATCH 模式下,输入的数据集是事先已知的,不需要这样的启发式,因为至少可以按照时间戳对元素进行排序,从而按照时间顺序进行处理。对于熟悉流的读者来说,在 BATCH 中,我们可以假设"完美的水印"。

鉴于上述情况,在 BATCH 模式下,我们只需要在输入的末尾有一个与每个键相关的 MAX_WATERMARK,如果输入流没有键,则在输入的末尾有一个。基于这个方案,所有注册的定时器都会在时间结束时触发,用户定义的 WatermarkAssigners 或 WatermarkStrategies 会被忽略。

处理时间 #

处理时间是指在处理记录的具体实例上,处理记录的机器上的挂钟时间。根据这个定义,我们看到,基于处理时间的计算结果是不可重复的。这是因为同一条记录被处理两次,会有两个不同的时间戳。

尽管如此,在 STREAMING 模式下使用处理时间还是很有用的。原因与流媒体管道经常实时摄取其无限制的输入有关,所以事件时间和处理时间之间存在相关性。此外,由于上述原因,在 STREAMING 模式下,事件时间的1h往往可以几乎是1h的处理时间,也就是挂钟时间。所以使用处理时间可以用于早期(不完全)发射,给出预期结果的提示。

在批处理世界中,这种相关性并不存在,因为在批处理世界中,输入的数据集是静态的,是预先知道的。鉴于此,在 BATCH 模式中,我们允许用户请求当前的处理时间,并注册处理时间计时器,但与事件时间的情况一样,所有的计时器都要在输入结束时发射。

在概念上,我们可以想象,在作业执行过程中,处理时间不会提前,当整个输入处理完毕后,我们会快进到时间结束。

故障恢复 #

在 STREAMING 执行模式下,Flink 使用检查点进行故障恢复。请看一下检查点文档,了解关于这个和如何配置它的实践文档。关于通过状态快照进行容错,也有一个比较入门的章节,从更高的层面解释了这些概念。

Checkpointing 用于故障恢复的特点之一是,Flink 在发生故障时,会从检查点重新启动所有正在运行的任务。这可能比我们在 BATCH 模式下所要做的事情更昂贵(如下文所解释),这也是如果你的任务允许的话应该使用 BATCH 执行模式的原因之一。

在 BATCH 执行模式下,Flink 会尝试并回溯到之前的处理阶段,对于这些阶段,仍然有中间结果。潜在地,只有失败的任务(或它们在图中的前辈)才需要重新启动,与从检查点重新启动所有任务相比,可以提高作业的处理效率和整体处理时间。

重要的考虑因素 #

与经典的 STREAMING 执行模式相比,在 BATCH 模式下,有些东西可能无法按照预期工作。一些功能的工作方式会略有不同,而其他功能则不支持。

BATCH 模式下的行为变化。

  • “滚动"操作,如 reduce()sum(),会对 STREAMING 模式下每一条新记录发出增量更新。在 BATCH 模式下,这些操作不是"滚动”。它们只发出最终结果。

BATCH 模式下不支持的:

自定义操作符应谨慎执行,否则可能会有不恰当的行为。更多细节请参见下面的补充说明。

检查点 #

如上所述,批处理程序的故障恢复不使用检查点。

重要的是要记住,因为没有检查点,某些功能如 CheckpointListener,以及因此,Kafka 的 EXACTLY_ONCE 模式或 StreamingFileSink 的 OnCheckpointRollingPolicy 将无法工作。如果你需要一个在 BATCH 模式下工作的事务型接收器,请确保它使用 FLIP-143 中提出的统一接收器 API。

你仍然可以使用所有的状态原语,只是用于故障恢复的机制会有所不同。

广播状态 #

引入这个特性是为了让用户实现这样的用例:一个"控制"流需要被广播到所有下游任务,而广播的元素,例如规则,需要应用到另一个流的所有输入元素。

在这种模式下,Flink 不提供关于读取输入的顺序的保证。像上面这样的用例在流媒体世界中是有意义的,因为在这个世界中,作业预计会运行很长时间,而输入数据是事先不知道的。在这些设置中,需求可能会随着时间的推移而改变,这取决于输入的数据。

但在批处理世界中,我们认为这种用例没有太大意义,因为输入(包括元素和控制流)是静态的,而且是预先知道的。

我们计划在未来为BATCH处理支持这种模式的变化,即完全先处理广播端。

编写自定义操作符 #

注意:自定义操作符是 Apache Flink 的一种高级使用模式。对于大多数的使用情况,可以考虑使用(keyed-)过程函数来代替。

在编写自定义操作符时,记住 BATCH 执行模式的假设是很重要的。否则,一个在 STREAMING 模式下运行良好的操作符可能会在 BATCH 模式下产生错误的结果。操作符永远不会被限定在一个特定的键上,这意味着他们看到了 Flink 试图利用的 BATCH 处理的一些属性。

首先你不应该在一个操作符内缓存最后看到的水印。在 BATCH 模式下,我们会逐个键处理记录。因此,水印会在每个键之间从 MAX_VALUE 切换到 MIN_VALUE。你不应该认为水印在一个操作符中总是上升的。出于同样的原因,定时器将首先按键的顺序发射,然后按每个键内的时间戳顺序发射。此外,不支持手动更改键的操作。