Spark Structured Streaming 中的触发器
— 焉知非鱼最近几周,我专注于 Apache Beam 项目。经过一番阅读之后,我发现了 Beam 和 Spark 结构化流之间的许多相似概念(或反之?)。相似之处之一就是触发器。
经过数月的休息后,本文介绍了另一个 Apache Spark 功能,即触发器。第一部分介绍了 Apache Spark 结构化流项目的上下文中的触发器。第二个显示了一些实现细节。最后一部分包含一些学习测试,显示了触发器如何工作。
触发器角色 #
Spark 触发器与 Apache Beam 中的触发器具有相似的作用,即,它们确定何时开始对累积数据进行处理。该处理的执行显然会向结果表发出新数据。关于 Apache Spark 中基于流的先前版本(基于 DStream),触发器的概念类似于批处理间隔属性。
在描述的版本(2.2.1)中,Spark 中有2种不同的触发器类型。第一种类型基于处理时间。它根据处理时间按固定间隔执行流查询。可以以任何单位时间(ms,s,min,…)定义此间隔。第二种类型称为once,因为它仅执行一次查询。执行后,查询终止,即使有新数据到达,查询也不会再次开始。
默认情况下,Apache Spark 结构化流以 0 ms的基于处理时间的触发器执行查询。这意味着 Spark 将在处理完之前的查询后尽快启动新查询。仅当存在新数据时,才会执行新的执行。
触发器内部 #
在内部,触发器分组在 org.apache.spark.sql.streaming.Trigger 类中,其中每种触发器类型都由一个或多个工厂方法表示。在处理时间的情况下,我们可以使用以下方法创建触发器:ProcessingTime(长间隔Ms),ProcessingTime(长间隔,TimeUnit timeUnit),ProcessingTime(持续时间间隔)或 ProcessingTime(字符串间隔)。它们全部都从 org.apache.spark.sql.streaming.ProcessingTime 对象调用幕后创建或应用方法。一次触发器由返回 OneTimeTrigger 案例对象的 Once()
表示。
如此创建的触发器实例稍后将在流查询中用作 org.apache.spark.sql.execution.streaming.StreamExecution 属性的一部分。在此实例中,触发器用于构建org.apache.spark.sql.execution.streaming.TriggerExecutor 实现的正确实例,该实现将是 ProcessingTimeExecutor 来处理基于时间的触发器,或者是 OneTimeExecutor 来执行一次触发器。
稍后,流查询由 TriggerExecutor的execute(triggerHandler: ()=> Boolean) 方法执行。此方法的实现取决于触发器类型。对于一次执行的触发器,execute 方法仅启动一次 TriggerHandler 函数。对于ProcessingTimeExecutor,execute 方法是一个长时间运行的过程(while(true)循环),其中触发器在执行查询之前等待间隔时间。
触发器还与 org.apache.spark.sql.execution.streaming.ProgressReporter#finishTrigger(hasNewData: Boolean) 方法中定义的统计信息有关。
触发器范例 #
下面的学习测试显示了一些触发特性:
"once trigger" should "execute the query only once" in {
val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
inputStream.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val stream = inputStream.toDS().toDF("number")
val query = stream.writeStream.outputMode("update").trigger(Trigger.Once())
.foreach(new NoopForeachWriter[Row]())
.start()
// Put here a very big timeout
// The test will end much more before this time and it proves that
// the query is executed only once, as the trigger defines it
val queryStartTime = System.currentTimeMillis()
query.awaitTermination(Long.MaxValue)
val queryEndTime = System.currentTimeMillis()
val queryExecutionTime = queryEndTime - queryStartTime
queryExecutionTime should be < Long.MaxValue
}
"the information about trigger" should "be available in the last progress object" in {
val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
new Thread(new Runnable() {
override def run(): Unit = {
while (true) {
inputStream.addData(1 to 30)
Thread.sleep(1000)
}
}
}).start()
val stream = inputStream.toDS().toDF("number")
val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("5 seconds"))
.foreach(new NoopForeachWriter[Row]())
.start()
query.awaitTermination(10000)
val lastProgress = query.lastProgress
val progressJson = lastProgress.json
// The result should be similar to:
// {"id":"41fb220b-5fc7-456b-b104-f49d374f25d8","runId":"4aaf947f-1747-43c3-a422-0f6209c26709","name":null,
// "timestamp":"2018-02-11T11:21:25.000Z","numInputRows":480,"inputRowsPerSecond":122.88786482334869,
// "processedRowsPerSecond":1467.8899082568807,"durationMs":{"addBatch":127,"getBatch":135,"getOffset":0,
// "queryPlanning":36,"triggerExecution":326,"walCommit":18},
// "stateOperators":[],
// "sources":[{"description":"MemoryStream[value#1]","startOffset":2,"endOffset":6,"numInputRows":480,
// "inputRowsPerSecond":122.88786482334869,"processedRowsPerSecond":1467.8899082568807}],
// "sink":{"description":"org.apache.spark.sql.execution.streaming.ForeachSink@5192026d"}}
progressJson should include ("\"triggerExecution\":")
}
"processing time-based trigger" should "be executed the time defined in the query" in {
val logAppender = InMemoryLogAppender.createLogAppender(Seq("Starting Trigger Calculation"),
(loggingEvent: LoggingEvent) => LogMessage(s"${loggingEvent.timeStamp}", ""))
val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
new Thread(new Runnable() {
override def run(): Unit = {
while (true) {
inputStream.addData(1)
Thread.sleep(5000)
}
}
}).start()
val stream = inputStream.toDS().toDF("number")
val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
.foreach(new NoopForeachWriter[Row]())
.start()
query.awaitTermination(15000)
val triggerExecutionTimes = logAppender.getMessagesText().map(_.toLong)
val triggersIntervals = mutable.HashMap[Long, Int]()
for (index <- 0 until triggerExecutionTimes.size - 1) {
val currentTimestamp = triggerExecutionTimes(index)
val nextTimestamp = triggerExecutionTimes(index+1)
val triggerDiff = nextTimestamp - currentTimestamp
val currentCount = triggersIntervals.getOrElse(triggerDiff, 0)
val newCount = currentCount + 1
triggersIntervals.put(triggerDiff, newCount)
}
// Output example:
// triggersIntervals = Map(1001 -> 1, 1434 -> 1, 1000 -> 10, 895 -> 1, 999 -> 1)
// As you can see, sometimes the difference is +/- 1 sec because of the time took for process the data
// It proves that the trigger was executed every ~1 second. But as you can note in the
// next test, the trigger launch doesn't mean data processing
triggersIntervals should contain key 1000L
}
"trigger lower than data arrival time" should "not process rows every trigger interval" in {
val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
val stream = inputStream.toDS().toDF("number")
val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
val currentTime = System.currentTimeMillis()
Container.processingTimes.append(currentTime)
}
override def close(errorOrNull: Throwable): Unit = {}
})
.start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
while (true) {
inputStream.addData(1)
Thread.sleep(5000)
}
}
}).start()
query.awaitTermination(30000)
val processingTimes = Seq[Long]()
for (index <- 0 until Container.processingTimes.size - 1) {
val currentTimestamp = Container.processingTimes(index)
val nextTimestamp = Instant.ofEpochMilli(Container.processingTimes(index+1))
val processingTimeDiffInSec = nextTimestamp.minusMillis(currentTimestamp).getEpochSecond
processingTimes :+ processingTimeDiffInSec
}
// Even though the trigger was defined to 1 second, we can see that the data processing is not launched
// when there is no new data to process. So logically we should't find any difference
// of 1 second between trigger subsequent executions for data processing
processingTimes should not contain(1L)
}
object Container {
var processingTimes = new ListBuffer[Long]()
}
Apache Spark 结构化流中的触发器有助于控制微批处理的速度。 如第一节所述,存在两种不同类型的触发器:基于时间的触发器和一次性触发器(仅执行一次查询)。 但是,触发器类并不是该过程中涉及的单个类。 它们的逻辑由 TriggerExecutor 实现执行,在每个微批执行中都调用。