Wait the light to fall

Spark Structured Streaming 中的触发器

焉知非鱼

最近几周,我专注于 Apache Beam 项目。经过一番阅读之后,我发现了 Beam 和 Spark 结构化流之间的许多相似概念(或反之?)。相似之处之一就是触发器。

经过数月的休息后,本文介绍了另一个 Apache Spark 功能,即触发器。第一部分介绍了 Apache Spark 结构化流项目的上下文中的触发器。第二个显示了一些实现细节。最后一部分包含一些学习测试,显示了触发器如何工作。

触发器角色 #

Spark 触发器与 Apache Beam 中的触发器具有相似的作用,即,它们确定何时开始对累积数据进行处理。该处理的执行显然会向结果表发出新数据。关于 Apache Spark 中基于流的先前版本(基于 DStream),触发器的概念类似于批处理间隔属性。

在描述的版本(2.2.1)中,Spark 中有2种不同的触发器类型。第一种类型基于处理时间。它根据处理时间按固定间隔执行流查询。可以以任何单位时间(ms,s,min,…)定义此间隔。第二种类型称为once,因为它仅执行一次查询。执行后,查询终止,即使有新数据到达,查询也不会再次开始。

默认情况下,Apache Spark 结构化流以 0 ms的基于处理时间的触发器执行查询。这意味着 Spark 将在处理完之前的查询后尽快启动新查询。仅当存在新数据时,才会执行新的执行。

触发器内部 #

在内部,触发器分组在 org.apache.spark.sql.streaming.Trigger 类中,其中每种触发器类型都由一个或多个工厂方法表示。在处理时间的情况下,我们可以使用以下方法创建触发器:ProcessingTime(长间隔Ms),ProcessingTime(长间隔,TimeUnit timeUnit),ProcessingTime(持续时间间隔)或 ProcessingTime(字符串间隔)。它们全部都从 org.apache.spark.sql.streaming.ProcessingTime 对象调用幕后创建或应用方法。一次触发器由返回 OneTimeTrigger 案例对象的 Once() 表示。

如此创建的触发器实例稍后将在流查询中用作 org.apache.spark.sql.execution.streaming.StreamExecution 属性的一部分。在此实例中,触发器用于构建org.apache.spark.sql.execution.streaming.TriggerExecutor 实现的正确实例,该实现将是 ProcessingTimeExecutor 来处理基于时间的触发器,或者是 OneTimeExecutor 来执行一次触发器。

稍后,流查询由 TriggerExecutor的execute(triggerHandler: ()=> Boolean) 方法执行。此方法的实现取决于触发器类型。对于一次执行的触发器,execute 方法仅启动一次 TriggerHandler 函数。对于ProcessingTimeExecutor,execute 方法是一个长时间运行的过程(while(true)循环),其中触发器在执行查询之前等待间隔时间。

触发器还与 org.apache.spark.sql.execution.streaming.ProgressReporter#finishTrigger(hasNewData: Boolean) 方法中定义的统计信息有关。

触发器范例 #

下面的学习测试显示了一些触发特性:

"once trigger" should "execute the query only once" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  inputStream.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.outputMode("update").trigger(Trigger.Once())
    .foreach(new NoopForeachWriter[Row]())
    .start()
 
  // Put here a very big timeout
  // The test will end much more before this time and it proves that
  // the query is executed only once, as the trigger defines it
  val queryStartTime = System.currentTimeMillis()
  query.awaitTermination(Long.MaxValue)
  val queryEndTime = System.currentTimeMillis()
 
  val queryExecutionTime = queryEndTime - queryStartTime
  queryExecutionTime should be < Long.MaxValue
}
 
"the information about trigger" should "be available in the last progress object" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1 to 30)
        Thread.sleep(1000)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("5 seconds"))
    .foreach(new NoopForeachWriter[Row]())
    .start()
 
  query.awaitTermination(10000)
 
  val lastProgress = query.lastProgress
  val progressJson = lastProgress.json
  // The result should be similar to:
  // {"id":"41fb220b-5fc7-456b-b104-f49d374f25d8","runId":"4aaf947f-1747-43c3-a422-0f6209c26709","name":null,
  // "timestamp":"2018-02-11T11:21:25.000Z","numInputRows":480,"inputRowsPerSecond":122.88786482334869,
  // "processedRowsPerSecond":1467.8899082568807,"durationMs":{"addBatch":127,"getBatch":135,"getOffset":0,
  // "queryPlanning":36,"triggerExecution":326,"walCommit":18},
  // "stateOperators":[],
  // "sources":[{"description":"MemoryStream[value#1]","startOffset":2,"endOffset":6,"numInputRows":480,
  // "inputRowsPerSecond":122.88786482334869,"processedRowsPerSecond":1467.8899082568807}],
  // "sink":{"description":"org.apache.spark.sql.execution.streaming.ForeachSink@5192026d"}}
  progressJson should include ("\"triggerExecution\":")
}
 
"processing time-based trigger" should "be executed the time defined in the query" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Starting Trigger Calculation"),
    (loggingEvent: LoggingEvent) => LogMessage(s"${loggingEvent.timeStamp}", ""))
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1)
        Thread.sleep(5000)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
    .foreach(new NoopForeachWriter[Row]())
    .start()
 
  query.awaitTermination(15000)
 
  val triggerExecutionTimes = logAppender.getMessagesText().map(_.toLong)
  val triggersIntervals = mutable.HashMap[Long, Int]()
  for (index <- 0 until triggerExecutionTimes.size - 1) {
    val currentTimestamp = triggerExecutionTimes(index)
    val nextTimestamp = triggerExecutionTimes(index+1)
 
    val triggerDiff = nextTimestamp - currentTimestamp
 
    val currentCount = triggersIntervals.getOrElse(triggerDiff, 0)
    val newCount = currentCount + 1
    triggersIntervals.put(triggerDiff, newCount)
  }
  // Output example:
  // triggersIntervals = Map(1001 -> 1, 1434 -> 1, 1000 -> 10, 895 -> 1, 999 -> 1)
  // As you can see, sometimes the difference is +/- 1 sec because of the time took for process the data
  // It proves that the trigger was executed every ~1 second. But as you can note in the
  // next test, the trigger launch doesn't mean data processing
  triggersIntervals should contain key 1000L
}
 
"trigger lower than data arrival time" should "not process rows every trigger interval" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
    .foreach(new ForeachWriter[Row]() {
      override def open(partitionId: Long, version: Long): Boolean = true
 
      override def process(value: Row): Unit = {
        val currentTime = System.currentTimeMillis()
        Container.processingTimes.append(currentTime)
      }
 
      override def close(errorOrNull: Throwable): Unit = {}
    })
    .start()
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      while (true) {
        inputStream.addData(1)
        Thread.sleep(5000)
      }
    }
  }).start()
 
  query.awaitTermination(30000)
 
  val processingTimes = Seq[Long]()
  for (index <- 0 until Container.processingTimes.size - 1) {
    val currentTimestamp = Container.processingTimes(index)
    val nextTimestamp = Instant.ofEpochMilli(Container.processingTimes(index+1))
 
    val processingTimeDiffInSec = nextTimestamp.minusMillis(currentTimestamp).getEpochSecond
    processingTimes :+ processingTimeDiffInSec
  }
  // Even though the trigger was defined to 1 second, we can see that the data processing is not launched
  // when there is no new data to process. So logically we should't find any difference
  // of 1 second between trigger subsequent executions for data processing
  processingTimes should not contain(1L)
}
 
object Container {
  var processingTimes = new ListBuffer[Long]()
}

Apache Spark 结构化流中的触发器有助于控制微批处理的速度。 如第一节所述,存在两种不同类型的触发器:基于时间的触发器和一次性触发器(仅执行一次查询)。 但是,触发器类并不是该过程中涉及的单个类。 它们的逻辑由 TriggerExecutor 实现执行,在每个微批执行中都调用。