Apache Spark 2.4.0 特性 - Watermark Configuration
— 焉知非鱼有关 Apache Spark 2.4.0 功能的系列继续进行。在上周发现了桶剪枝之后,现在该切换到结构化流模块并查看其主要进展了。
这篇文章介绍了 2.4.0 版本中添加的水位配置功能。第一部分解释了它的变化。第二个给出了实现细节,其中显示了用于保证向后兼容性的策略之一。最后一部分包含说明水位配置功能的测试用例。
最小或最大水位? #
2.3.0 版可以连接流。如 Apache Spark 结构化流中的流之间的内联中所述,这样做不是一件容易的事,因为联接的流可能具有不同的延迟。水位是控制哪些数据可以联接的方法之一,即使它来得很晚。在 2.4.0 发行版之前,定义了多个水位时,默认情况下使用最小值。它使我们能够以最慢的流移动,并且不会丢失太多数据:
通过 Apache Spark 2.4.0 中引入的更改,我们可以在这种情况下配置使用的水位,并采用最小或最大水位。 当然,选择最大值将导致应用程序以最快的速度向前移动,从而丢失更多数据:
水位配置实现 #
水位管理是通过 spark.sql.streaming.multipleWatermarkPolicy 配置条目实现的。它采用“最小”或“最大”值之一。它们中的每一个都导致相应的 MultipleWatermarkPolicy 实例的初始化:MinWatermark 和 MaxWatermark。两者都附带一种解决使用的水位值的方法。根据使用的策略,此方法仅是对 Seq 的 min 或 max 方法的调用。负责调用已配置策略的类为 WatermarkTracker。
向后兼容性由 MultipleWatermarkPolicy 条目的默认值 ‘min’ 保证。除非您明确更改此值,否则您应该能够在新的 Spark 版本上运行旧的流传输管道,而不会出现问题。甚至那些从 Apache Spark 2.4.0 之前的检查点恢复的管道。而且由于我们在谈论检查点,所以要记住的一件事很重要。从检查点还原管道后,我们无法修改水位策略。任何更改都将被忽略。
最大水位示例 #
为了看到这个新功能的实际效果,我们将执行2个测试用例。前者将具有最小水位,因此可以接受更多迟到数据:
object FirstWatermark {
var FirstKnownValue = ""
}
def launchDataInjection(mainEventsStream: MemoryStream[MainEvent],
joinedEventsStream: MemoryStream[JoinedEvent], query: StreamingQuery): Unit = {
new Thread(new Runnable() {
override def run(): Unit = {
val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
var key = 0
val processingTimeFrom1970 = 10000L // 10 sec
stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
FirstWatermark.FirstKnownValue = query.lastProgress.eventTime.get("watermark")
key = 2
// We send keys: 2, 3, 4, 5, 6 in late to see watermark applied
var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
while (query.isActive) {
if (key % 2 == 0) {
stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
} else {
mainEventsStream.addData(MainEvent(s"key${key}", startingTime, new Timestamp(startingTime)))
}
startingTime += 1000L
key += 1
}
}
}).start()
}
"min watermark" should "be used when the min policy is configured" in {
val sparkSession: SparkSession = SparkSession.builder()
.appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "min")
.master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
import sparkSession.implicits._
val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
.withWatermark("mainEventTimeWatermark", "4 seconds")
val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
.withWatermark("joinedEventTimeWatermark", "8 seconds")
val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
"leftOuter")
val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()
while (!query.isActive) {}
launchDataInjection(mainEventsStream, joinedEventsStream, query)
query.awaitTermination(150000)
// Do not assert on the watermark directly - it can change depending on the execution environment
// We simply check the first known watermark value after the change
FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:02.000Z"
}
最小水位的使用涉及不太积极的拒绝策略。 在这里,仅早于 1970-01-01T00:00:02.000Z 的消息将被忽略。 以下使用 max 策略的测试用例不是这种情况:
"max watermark" should "be used when the max policy is configured" in {
val sparkSession: SparkSession = SparkSession.builder()
.appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "max")
.master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
import sparkSession.implicits._
val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
// Max = 06, hence it takes 4 seconds
// Min = 02, hence it takes 8 seconds
val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
.withWatermark("mainEventTimeWatermark", "4 seconds")
val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
.withWatermark("joinedEventTimeWatermark", "8 seconds")
val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
"leftOuter")
val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()
while (!query.isActive) {}
launchDataInjection(mainEventsStream, joinedEventsStream, query)
query.awaitTermination(150000)
FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:06.000Z"
}
如您所见,任何早于 1970-01-01T00:00:06.000Z 的消息都将被丢弃。 此值对应于处理中的第一个水位,由最大水位策略解析。
水位配置为基于 Apache Spark 的流管道增加了更多灵活性。 在 2.4.0 发行版之前,引擎始终在联接的流中选择最小水位。 现在由用户来定义联接策略。