Wait the light to fall

Spark 结构化流之旅

焉知非鱼

http://vishnuviswanath.com/spark_structured_streaming.html

img

Structured Streaming 是 Apache Spark 的流式引擎, 可用于进行近实时分析。 在这篇博客中,我们通过一个非常简单的用例来探索结构化流。 想象一下,你开始一段旅程,需要检查车辆是否超速。 我们将创建一个简单的近实时流应用程序来计算每几秒钟车辆的平均速度,同时谈论 SlidingWindowTumblingWindowEventTimeProcessingTimeWatermarksKafka Source&Sink。 此博客中使用的所有代码都可以在我的 Github仓库中找到。

基于流的微批处理

在我们进入用例之前,让我们先看看 Apache Spark 中流式传输的工作原理。 Spark 中的结构化流,与其前身(DStream)类似,使用微批处理进行流式处理。 也就是说,spark 等待非常小的间隔,例如 1 秒(或甚至 0 秒 - 即,尽快),并将在该间隔期间接收的所有事件批量聚集成微批。 然后,Driver 程序将调度此微批处理在 Executors 中作为 Tasks 执行。 完成微批处理后,将收集下一批并再次调度。 频繁进行该调度以给出流式执行的印象。

img

Kafka Source

我们将读取 Kafka topic - cars 中的事件。 为此,我们需要将 format 设置为 “kafka”,用 boker 地址设置 kafka.bootstrap.server 并使用 “subscribe” 选项提供 topic 名。

val df: DataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "cars")
      //.schema(schema)  : we cannot set a schema for kafka source. Kafka source has a fixed schema of (key, value)
      .load()

为了模拟向我们发送传感器数据的车辆,我们将创建一个 Kafka producer,将 id,速度,加速度和时间戳写入“cars” topic。 代码可以在这里找到 RandomCarsKafkaProducer.scala。 请注意,此处的时间戳称为 EventTime,因为它是在其源处生成事件(消息)的时间。

img

注意:如果你需要设置本地 Kafka broker, 这儿有相关说明。

接下来,我们将原始数据解析为 case class,以便我们有一个可以使用的结构。

case class CarEvent(carId: String, speed: Option[Int], acceleration: Option[Double], timestamp: Timestamp)

object CarEvent {
	def apply(rawStr: String): CarEvent = {
	  val parts = rawStr.split(",")
	  CarEvent(parts(0), Some(Integer.parseInt(parts(1))), Some(java.lang.Double.parseDouble(parts(2))), new Timestamp(parts(3).toLong))
	}
}
val cars: Dataset[CarEvent] = df
  .selectExpr("CAST(value AS STRING)")
  .map(r  CarEvent(r.getString(0)))

这产生 CarEvent 类型的 DataSet。

执行聚合

我们就从找出每辆车的平均速度开始。 这可以通过在 carId 上执行 groupby 并应用 avg 聚合函数来完成。

val aggregates = cars
    .groupBy("carId")
    .agg(
      "speed"  "avg"
    )

这计算了每个微批次中收到的事件的平均速度。 在结构化流中,可以使用触发器(Triggers)控制微批次的间隔。 Spark 的 Trigger 概念与 FlinkApex 等一次性事件(event-at-a-time)流处理系统略有不同。 在 Spark 中,触发器设置为指定在检查新数据是否可用之前等待的时间。 如果未设置触发器,Spark 将在上一次微批处理执行完成后立即检查新数据的可用性。 而在一次事件系统中,当新数据进入时,它会在窗口的内部状态中被收集,直到触发器触发为止。

那很简单! 但是如果我们想要计算过去 5 秒内车辆的平均速度怎么办呢? 此外,我们希望根据事件的 EventTime 计算它(即,基于事件发生在源的时间,而不是基于它在系统中处理的时间。)如果你不知道什么是 EventTime,请继续阅读。

EventTime & Processing Time

EventTime 是在其源处生成事件的时间,而 ProcessingTime 是系统处理该事件的时间。 一些流处理系统还有一个时间,即 IngestionTime - 事件/消息被摄入系统的时间。 了解 EventTime 和 ProcessingTime 之间的区别非常重要。

img

上图中的红点是来自车辆的消息,然后通过 Kafka topic 流向 Spark 的 Kafka 源,然后在 task 执行期间到达 executor。 这些点之间可能会有轻微的延迟(或者如果存在任何网络连接问题,可能会有很长的延迟)。 源中的时间是所谓的 EventTime,executor 处的时间就是所谓的 ProcessingTime。 您可以将摄取时间视为首次在 Kafka 源中读入系统的时间(IngestionTime 与 spark 无关)。

现在您已经了解了不同的时间特征,让我们回到计算过去 5 秒内汽车平均速度的用例。 为此,我们需要根据 EventTime 将事件分组为5秒间隔时间组。 这种分组称为窗口。

Windows

在 Spark 中,通过在 groupBy 子句中添加一个额外的键(窗口)来完成窗口化。 对于每条消息,其 EventTime(传感器生成的时间戳)用于标识消息所属的窗口。 基于窗口类型(滚动/滑动),事件可能属于一个或多个窗口。 要了解如何,我们需要首先了解 TumblingWindow 和 SlidingWindow 是什么。

滚动窗口和滑动窗口 #

翻滚窗口是一个不重叠的窗口,翻过每个“window-size”。 例如,对于大小为4秒的滚动窗口,可能有 [00:00至00:04), [00:04:00:08), [00:08:00:12) 等窗口(在这里忽略日, 小时等)。 如果传入的事件具有 EventTime 00:05,则该事件将被分配窗口 - [00:04至00:08)

SlidingWindow 是给定大小(例如4秒)的窗口,每隔给定间隔(例如2秒)滑动一次。 这意味着滑动窗口可能与另一个窗口重叠。 对于大小为4秒的窗口,每2秒钟可以滑动窗口[00:00到00:04), [00:02到00:06), [00:04到00:08) 等等。请注意, 窗口1和2在这里重叠。 如果 EventTime 00:05 的事件进入,则该事件将属于窗口 [00:02至00:06) 和 [00:04至00:08)。

img

为了进行窗口化,Spark 添加了一个名为 “window” 的新列,并将提供的 “timestamp” 列分解为一行或多行(基于其值和窗口的大小和滑动),并在该列上执行 groupby。 这隐含地将属于时间间隔的所有事件拉入相同的“窗口”。

侧注:滚动窗口也可以被认为是滑动窗口,其滑动间隔与窗口大小相同。 即,每4秒滑动4秒的滑动窗口与4秒大小的滚动窗口相同。 事实上,这正是 Spark 内部所做的。

在这里,我们根据 ‘window’和 carId 对汽车 DataSet 进行分组。 请注意,window() 是 Spark 中返回 Column 的函数。

//a tumbling window of size 4 seconds
val aggregates = cars
	.groupBy(window($"timestamp","4 seconds"), $"carId")
	.agg(avg("speed").alias("speed"))
	.where("speed > 70")

//a sliding window of size 4 seconds that slides every 2 seconds can be created using cars.groupBy(window($"timestamp","4 seconds","2 seconds"), $"carId")

这将生成 carId,avg 速度和相应时间窗口的 DataFrame。 例如输出:

  • Batch 1
    • [2018-01-21 00:50:00, 2018-01-21 00:50:04] car1 75.0

输出模式

这个难题的最后(几乎)部分是将我们生成的结果输出到接收器 - Kafka topic。 Spark 提供三种输出模式 - Complete, Update and Append。 每种模式的不同之处在于 Spark 如何在处理微批次后更新状态并输出结果。

img

在每个微批处理期间,Spark 会更新上一批中某些键的值,有些是新的,有些则保持不变。 在“Complete”模式下,将输出所有行,在“Update”模式下,仅输出新行和更新的行。 Append 模式略有不同,在 Append 模式下,不会有任何更新的行,它只输出新行。

Kafka Sink

写入Kafka非常简单 - 将 format 设置为“kafka”,使用选项 kafka.bootstrap.server 将接收器指向 Kafka broker,并设置选项 topic 以告知要写入到哪个 Kafka topic。 Kafka sink 期望数据中存在字段 value。 我们可以利用 Spark SQL 的 selectExpr 将字段 ‘speed’ 转换为 ‘value’,并将其转换为 String。 key 是可选的,但如果您有多个分区并希望跨分区分发数据,则需要它。 使用 Kafka 接收器时必须使用 checkpointLocation,它可以启用故障恢复并且只需一次处理。

val writeToKafka = aggregates
  .selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic", "fastcars")
  .option("checkpointLocation", "/tmp/sparkcheckpoint/")
  .queryName("kafka spark streaming kafka")
  .outputMode("update")
  .start()

运行应用程序的输出将如下所示:

  • Batch: 1
    • [2018-01-21 00:50:00, 2018-01-21 00:50:04] car1 75.0
  • Batch: 2
    • [2018-01-21 00:50:04, 2018-01-21 00:50:08] car2 20.0
    • [2018-01-21 00:50:12, 2018-01-21 00:50:16] car2 20.0
    • [2018-01-21 00:50:00, 2018-01-21 00:50:04] car1 62.5

注意,结构化流式 API 隐式地维护聚合函数的批次的 state (即状态),即,在上面的示例中,在第二批次中计算的平均速度将是在第一批和第二批期间接收的事件的平均值。 因此,作为用户,您不必进行自定义状态管理。 但随着时间的推移,维持一个大的状态的成本也随之增加,没有人愿意永远保持这种状态。 这可以使用水印来实现。

水印

在 Spark 中,Watermark 用于根据当前最大事件时间决定何时清除状态。 根据您指定的 delay,Watermark 落后于目前为止所见的最大事件时间。 例如,如果 dealy 是 3 秒并且当前最大事件时间是 10:00:45 那么水印是 10:00:42。 这意味着 Spark 将保持结束时间小于 10:00:42 的窗口状态。

val aggregates = cars
  .withWatermark("timestamp", "3 seconds") //set watermark using timestamp filed with a max delay of 3s.
  .groupBy(window($"timestamp","4 seconds"), $"carId")
  .agg(avg("speed").alias("speed"))
  .where("speed > 70")

要理解的一个微妙但重要的细节是,当使用基于 EventTime 的处理时,只有在收到具有更高时间戳值的消息/事件时,时间才会进行。 可以把它想象成 Spark 中的时钟,但不像普通时钟每秒钟都滴答(基于 ProcessingTime),这个时钟只有在你收到一个时间戳更高的事件时才会移动。

让我们看一个示例,看看当有迟到的消息时它是如何工作的。 我们将关注 [10:00到10:10) 之间的单个窗口和5秒的最大延迟。 即 .withWatermark("timestamp", "5 seconds")

img

时间戳 10:00 到达的事件,落在窗口 [10:00,10:10),水印更新为时间戳 - 5

  • 时间戳为10:02的事件在源处生成,但是被延迟。 这个事件应该落入窗口[10:00,10:10)
  • 时间戳10:04的事件在10:05到达晚了,但是这仍然落在窗口[10:00,10:10),因为当前的水印是 09:55,这是<窗口结束时间。 水印更新至 10:04 - 00:05 = 09:59。
  • 时间戳为10:16的事件到达,这将水印更新为 10:11。 (此事件将落入窗口[10:10,10:20),但此处不相关)。
  • 时间戳为10:02的延迟事件到达,但窗口 [10:00,10:10) 被清除,因此该事件将被删除。

设置水印将确保状态不会永远增长。 另外,请注意其中一个晚到的事件是如何被处理而另一个被忽略的(因为为时已晚)。

结论

我们构建了一个简单的流应用程序,同时解释了 EventTime 处理,窗口化,水印,输出模式以及如何读写 Kafka。 我的 Github 仓库中提供了此代码和更多示例代码。 我希望这能更好地了解 Spark Structured Streaming 中的一些新功能。 如果评论中有任何问题,请告诉我。 谢谢阅读!

继续阅读