Wait the light to fall

Apache Spark 2.4.0 特性 - Watermark Configuration

焉知非鱼

有关 Apache Spark 2.4.0 功能的系列继续进行。在上周发现了桶剪枝之后,现在该切换到结构化流模块并查看其主要进展了。

这篇文章介绍了 2.4.0 版本中添加的水位配置功能。第一部分解释了它的变化。第二个给出了实现细节,其中显示了用于保证向后兼容性的策略之一。最后一部分包含说明水位配置功能的测试用例。

最小或最大水位? #

2.3.0 版可以连接流。如 Apache Spark 结构化流中的流之间的内联中所述,这样做不是一件容易的事,因为联接的流可能具有不同的延迟。水位是控制哪些数据可以联接的方法之一,即使它来得很晚。在 2.4.0 发行版之前,定义了多个水位时,默认情况下使用最小值。它使我们能够以最慢的流移动,并且不会丢失太多数据:

img

通过 Apache Spark 2.4.0 中引入的更改,我们可以在这种情况下配置使用的水位,并采用最小或最大水位。 当然,选择最大值将导致应用程序以最快的速度向前移动,从而丢失更多数据:

img

水位配置实现 #

水位管理是通过 spark.sql.streaming.multipleWatermarkPolicy 配置条目实现的。它采用“最小”或“最大”值之一。它们中的每一个都导致相应的 MultipleWatermarkPolicy 实例的初始化:MinWatermark 和 MaxWatermark。两者都附带一种解决使用的水位值的方法。根据使用的策略,此方法仅是对 Seq 的 min 或 max 方法的调用。负责调用已配置策略的类为 WatermarkTracker。

向后兼容性由 MultipleWatermarkPolicy 条目的默认值 ‘min’ 保证。除非您明确更改此值,否则您应该能够在新的 Spark 版本上运行旧的流传输管道,而不会出现问题。甚至那些从 Apache Spark 2.4.0 之前的检查点恢复的管道。而且由于我们在谈论检查点,所以要记住的一件事很重要。从检查点还原管道后,我们无法修改水位策略。任何更改都将被忽略。

最大水位示例 #

为了看到这个新功能的实际效果,我们将执行2个测试用例。前者将具有最小水位,因此可以接受更多迟到数据:

object FirstWatermark {
  var FirstKnownValue = ""
}
  def launchDataInjection(mainEventsStream: MemoryStream[MainEvent],
                          joinedEventsStream: MemoryStream[JoinedEvent], query: StreamingQuery): Unit = {
    new Thread(new Runnable() {
      override def run(): Unit = {
        val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
        var key = 0
        val processingTimeFrom1970 = 10000L // 10 sec
        stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
        FirstWatermark.FirstKnownValue = query.lastProgress.eventTime.get("watermark")
        key = 2
        // We send keys: 2, 3, 4, 5, 6  in late to see watermark applied
        var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
        while (query.isActive) {
          if (key % 2 == 0) {
            stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
          } else {
            mainEventsStream.addData(MainEvent(s"key${key}", startingTime, new Timestamp(startingTime)))
          }
          startingTime += 1000L
          key += 1
        }
      }
    }).start()
  }
 
  "min watermark" should "be used when the min policy is configured" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "min")
      .master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
    import sparkSession.implicits._
 
    val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
    val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
    val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
      .withWatermark("mainEventTimeWatermark", "4 seconds")
    val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
      .withWatermark("joinedEventTimeWatermark", "8 seconds")
 
    val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
      mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
      "leftOuter")
 
    val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()
 
    while (!query.isActive) {}
    launchDataInjection(mainEventsStream, joinedEventsStream, query)
    query.awaitTermination(150000)
 
    // Do not assert on the watermark directly - it can change depending on the execution environment
    // We simply check the first known watermark value after the change
    FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:02.000Z"
  }

最小水位的使用涉及不太积极的拒绝策略。 在这里,仅早于 1970-01-01T00:00:02.000Z 的消息将被忽略。 以下使用 max 策略的测试用例不是这种情况:

"max watermark" should "be used when the max policy is configured" in {
  val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "max")
    .master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
  import sparkSession.implicits._
 
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  // Max = 06, hence it takes 4 seconds
  // Min = 02, hence it takes 8 seconds
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "4 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
    .withWatermark("joinedEventTimeWatermark", "8 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
    "leftOuter")
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  launchDataInjection(mainEventsStream, joinedEventsStream, query)
  query.awaitTermination(150000)
 
  FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:06.000Z"
}

如您所见,任何早于 1970-01-01T00:00:06.000Z 的消息都将被丢弃。 此值对应于处理中的第一个水位,由最大水位策略解析。

水位配置为基于 Apache Spark 的流管道增加了更多灵活性。 在 2.4.0 发行版之前,引擎始终在联接的流中选择最小水位。 现在由用户来定义联接策略。