Wait the light to fall

Spark Structured Streaming 中的输出模式

焉知非鱼

结构化流引入了许多有关基于 DStream 的流的新概念。其中之一是输出模式。

这篇文章介绍了 Spark 2.0.0 中引入的用于处理流数据输出的输出模式。第一部分通过简短的理论部分向他们展示。第二部分介绍了它们的 API。最后一部分显示了它们在某些学习测试中的工作方式。

输出模式定义 #

输出模式指定将数据写入结果表的方式。在可用的输出模式中,我们可以区分:

  • 追加(append)-这里仅将新行写入输出接收器。此模式保留给处理,不进行任何汇总,非常适合不可变的结果。 需要注意的重要一点是与水位的关系。如果在聚合中定义了水位,则它控制何时将附加数据发送到结果表。它仅在“中间”状态(=处理期间更改的状态)完成之后才发生。仅当水位的新值在给定组中的最新条目之前通过时,才会发生这种情况。例如,假设水位设置为19:00:00,并且组“A”的记录的最新值来自 19:00:05。现在,当水位传递到 19:00:06时,组“A”的结果将被发送到结果表。引擎认为不会传递任何新的事件,因此可以安全地将结果发送到输出表。
  • 完成(compelete)-在这种模式下,所有行每次都被写入输出接收器。当流有一些更新时,将进行写入。此模式专用于具有聚合的处理。
  • 更新(update)-与上一个相似,不同之处在于仅将更新的行写入输出接收器。

输出模式 API #

输出模式定义发生在 DataStreamWriter#outputMode(outputMode: String)方法中。因此,传递的名称稍后会从 InternalOutputModes 对象转换为相应的 case 对象。

解析的实例主要在 DataStreamWriter 类中使用。它从那里传递到 StreamingQueryManager#startQuery(userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, sink: Sink, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()).。顾名思义,此方法开始执行流查询。

在物理执行方面,我们可以在 StateStoreSaveExec 中找到输出模式的轨迹。在那里存储中间状态结果。顺便说一下,我们可以找到许多有关水位的参考,有助于消除过时的结果。如果您想了解更多信息,请转到 Apache Spark 结构化流中有关 StateStore 的文章。

输出模式示例 #

下表总结了可用于给定类型的处理的模式。在每个测试之后,编写一些测试以显示使用情况和未使用情况:

  • 带水位的聚合
  • 附加模式: 受支持-但结果仅在超过水位后才发出(=输出完成)。如果未将汇总应用于水位的列,则该查询将不起作用。
  • 完整模式: 受支持-与更新不同,不使用水位
  • 更新模式: 支持-水位用于删除太旧的聚合

测试:

"the count on watermark column" should "be correctly computed in append mode" in {
  val testKey = "append-output-mode-with-watermark-aggregation"
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val now = 5000L
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("created")
    .count()
 
  val query = aggregatedStream.writeStream.outputMode("append")
    .foreach(new InMemoryStoreWriter[Row](testKey,
      (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}"))
    .start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      // send the first batch - max time = 10 seconds, so the watermark will be 9 seconds
      inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+4000), 3),
        (new Timestamp(now+5000), 4))
      while (!query.isActive) {}
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(4000L), 5), (new Timestamp(now+4000), 6))
      inputStream.addData((new Timestamp(11000), 7))
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(11000), 8))
    }
  }).start()
 
  query.awaitTermination(45000)
 
  val readValues = InMemoryKeyedStore.getValues(testKey)
  // As you can see, only the result for now+5000 was emitted. It's because the append output
  // mode emits the results once a new watermark is defined and the accumulated results are below
  // the new threshold
  readValues should have size 2
  readValues should contain allOf("1970-01-01 01:00:09.0 -> 2", "1970-01-01 01:00:10.0 -> 3")
}
 
"the count on non-watermark column" should "fail in append mode" in {
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("number")
    .count()
 
  val exception = intercept[AnalysisException]{
    aggregatedStream.writeStream.outputMode("append")
      .foreach(new NoopForeachWriter[Row]()).start()
  }
 
  exception.message should include("Append output mode not supported when there are streaming aggregations on " +
    "streaming DataFrames/DataSets without watermark")
}
 
"the count on watermark column" should "be correctly computed in update mode" in {
  val testKey = "update-output-mode-with-watermark-aggregation"
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val now = 5000L
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("created")
    .count()
 
  val query = aggregatedStream.writeStream.outputMode("update")
    .foreach(new InMemoryStoreWriter[Row](testKey,
      (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
        (new Timestamp(now+5000), 4))
      while (!query.isActive) {}
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(4000L), 5))
      inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
    }
  }).start()
 
  query.awaitTermination(45000)
 
  val readValues = InMemoryKeyedStore.getValues(testKey)
  readValues should have size 2
  readValues should contain allOf("1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1")
}
 
"the count on non-watermark column" should "be correctly computed in update mode" in {
  val testKey = "update-output-mode-without-watermark-aggregation"
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val now = 5000L
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("number")
    .count()
 
  val query = aggregatedStream.writeStream.outputMode("update")
    .foreach(new InMemoryStoreWriter[Row](testKey,
      (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
        (new Timestamp(now+5000), 3))
      while (!query.isActive) {}
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(4000L), 6))
      inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
    }
  }).start()
 
  query.awaitTermination(45000)
 
  val readValues = InMemoryKeyedStore.getValues(testKey)
  readValues should have size 5
  readValues should contain allOf("1 -> 1", "3 -> 2", "2 -> 1", "6 -> 2", "7 -> 1")
}
 
 
"the count on watermark column" should "be correctly computed in complete mode" in {
  val testKey = "update-output-mode-with-watermark-aggregation"
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val now = 5000L
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("created")
    .count()
 
  val query = aggregatedStream.writeStream.outputMode("complete")
    .foreach(new InMemoryStoreWriter[Row](testKey,
      (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
        (new Timestamp(now+5000), 4))
      while (!query.isActive) {}
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(4000L), 5))
      inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
    }
  }).start()
 
  query.awaitTermination(45000)
 
  val readValues = InMemoryKeyedStore.getValues(testKey)
  println(s"${readValues}")
  readValues should have size 5
  readValues.sorted should equal (Seq("1970-01-01 01:00:04.0 -> 1", "1970-01-01 01:00:05.0 -> 1",
    "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1"))
}
 
"the count on non-watermark column" should "be correctly computed in complete mode" in {
  val testKey = "update-output-mode-with-watermark-aggregation"
  val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
  val now = 5000L
  val aggregatedStream = inputStream.toDS().toDF("created", "number")
    .withWatermark("created", "1 second")
    .groupBy("number")
    .count()
 
  val query = aggregatedStream.writeStream.outputMode("complete")
    .foreach(new InMemoryStoreWriter[Row](testKey,
      (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
        (new Timestamp(now+5000), 4))
      while (!query.isActive) {}
      Thread.sleep(10000)
      inputStream.addData((new Timestamp(4000L), 5))
      inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
    }
  }).start()
 
  query.awaitTermination(45000)
 
  val readValues = InMemoryKeyedStore.getValues(testKey)
  println(s"${readValues}")
  readValues should have size 11
  readValues.sorted should equal (Seq("1 -> 1", "1 -> 1", "2 -> 1", "2 -> 1",
    "3 -> 1", "3 -> 1", "4 -> 1", "4 -> 1", "5 -> 1", "6 -> 1", "7 -> 1"))
}
  • 剩余的聚合(无水位)
  • 附加模式:不支持-聚合可以随时更新,这违反了该模式的原理
  • 完整模式:支持
  • 更新模式:受支持-但与上一点不同,旧聚合不被清除。 引擎不知道聚合何时过旧。

测试:

"append mode without aggregation" should "be disallow in structured streaming without watermark" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val counter =inputStream.toDS().toDF("id", "name").agg(count("*"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    counter.writeStream.outputMode("append").foreach(new NoopForeachWriter()).start()
  }
  exception.message should include ("Append output mode not supported when there are streaming " +
    "aggregations on streaming DataFrames/DataSets without watermark")
}
 
"complete mode without aggregation" should "be allowed in structured streaming without watermark" in {
  val testKey = "no-aggregation-complete-output-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val query = counter.writeStream.outputMode("complete").foreach(new ForeachWriter[Row] {
    override def open(partitionId: Long, version: Long): Boolean = true
 
    override def process(row: Row): Unit = {
      println(s"processing ${row}")
      InMemoryKeyedStore.addValue(testKey, s"${row.getAs[String]("name")}=${row.getAs[Long]("count(1)")}")
    }
 
    override def close(errorOrNull: Throwable): Unit = {}
  }).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      Thread.sleep(10000)
      // Add only 1  row to show that all entries are sent to the result table every time
      inputStream.addData((1L, "test1"))
    }
  }).start()
 
  query.awaitTermination(30000)
 
  val processedRows = InMemoryKeyedStore.getValues(testKey)
  processedRows should have size 6
  val rowsTest1 = processedRows.filter(_.startsWith("test1="))
  rowsTest1 should have size 2
  rowsTest1 should contain allOf("test1=3", "test1=4")
  val rowsTest2 = processedRows.filter(_.startsWith("test2="))
  rowsTest2 should have size 2
  rowsTest2 should contain only("test2=3")
  val rowsTest3 = processedRows.filter(_.startsWith("test3="))
  rowsTest3 should have size 2
  rowsTest3 should contain only("test3=3")
}
 
"update mode without aggregation" should "be allowed in structured streaming without watermark" in {
  val testKey = "no-aggregation-update-output-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val query = counter.writeStream.outputMode("update").foreach(new InMemoryStoreWriter[Row](testKey,
    (processedRow) => s"${processedRow.getAs[String]("name")} -> ${processedRow.getAs[Long]("count(1)")}")).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      Thread.sleep(10000)
      // Add only 1  row to show that only updated row is sent to the result table
      inputStream.addData((1L, "test1"))
    }
  }).start()
 
  query.awaitTermination(30000)
 
  val processedRows = InMemoryKeyedStore.getValues(testKey)
  processedRows should have size 4
  processedRows should contain allOf("test1 -> 3", "test1 -> 4", "test2 -> 3", "test3 -> 3")
}
  • mapGroupsWithState 处理
  • 追加模式:不支持
  • 完整模式:不支持
  • 更新方式:支持

测试:

private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Seq[String] = (key, values, state) => {
  val stateNames = state.getOption.getOrElse(Seq.empty)
  val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
  state.update(stateNewNames)
  stateNewNames
}
 
"append mode" should "not work for mapGroupWithState" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedValues =inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
 
  val exception = intercept[AnalysisException] {
    mappedValues.writeStream.outputMode("append").foreach(new NoopForeachWriter[Seq[String]]()).start()
  }
 
  exception.message should include("mapGroupsWithState is not supported with Append output mode on a " +
    "streaming DataFrame/Dataset")
}
 
"complete mode" should "not work for mapGroupWithState" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedValues =inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
 
  val exception = intercept[AnalysisException] {
    mappedValues.writeStream.outputMode("complete").foreach(new NoopForeachWriter[Seq[String]]()).start()
  }
 
  exception.message should include("Complete output mode not supported when there are no streaming " +
    "aggregations on streaming DataFrames/Datasets")
}
 
"update mode" should "work for mapGroupWithState" in {
  val testKey = "mapGroupWithState-update-output-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedValues =inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"), (3L, "test30"))
 
  val query = mappedValues.writeStream.outputMode("update")
    .foreach(new InMemoryStoreWriter[Seq[String]](testKey, (stateSeq) => stateSeq.mkString(","))).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      Thread.sleep(5000)
      inputStream.addData((1L, "test12"), (1L, "test13"), (2L, "test21"))
    }
  }).start()
 
  query.awaitTermination(30000)
 
  val savedValues = InMemoryKeyedStore.getValues(testKey)
  savedValues should have size 5
  savedValues should contain allOf("test30", "test10,test11", "test20", "test10,test11,test12,test13",
    "test20,test21")
}
  • flatMapGroupsWithState
  • 追加模式:支持。 此外,在 flatMapGroupsWithState 之后允许进行聚合。 为了工作,必须满足一个条件。 必须在 flatMapGroupsWithState 转换后声明的聚合之前定义水位。
  • 完整模式:不支持
  • 更新模式:受支持,但在 flatMapGroupsWithState 之后不允许聚合

测试:

private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Iterator[String] = (key, values, state) => {
  val stateNames = state.getOption.getOrElse(Seq.empty)
  //state.setTimeoutDuration(1000L)
  val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
  state.update(stateNewNames)
  Iterator(stateNewNames.mkString(","))
}
 
"multiple flatMapGroupsWithState" should "fail because they're not all in append mode" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction)
    .groupByKey(key => key)
    .flatMapGroupsWithState(outputMode = OutputMode.Update(),
      timeoutConf = NoTimeout())((key, values, state: GroupState[String]) => Iterator(""))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
  }
 
  exception.message should include("Multiple flatMapGroupsWithStates are not supported when they are not " +
    "all in append mode or the output mode is not append on a streaming DataFrames/Datasets")
}
 
"flatMapGroupsWithState" should "fail for complete mode" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
 
  val exception = intercept[IllegalArgumentException] {
    inputStream.toDS().toDF("id", "name")
      .groupByKey(row => row.getAs[Long]("id"))
      .flatMapGroupsWithState(outputMode = OutputMode.Complete(),
        timeoutConf = NoTimeout())(MappingFunction)
  }
 
  exception.getMessage should include("The output mode of function should be append or update")
}
 
"flatMapGroupsWithState" should "fail for append mode in flatMap and update mode in sink" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())(MappingFunction)
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[String]()).start()
  }
 
  exception.message should include("flatMapGroupsWithState in append mode is not supported with Update output " +
    "mode on a streaming DataFrame/Dataset")
}
 
"flatMapGroupsWithState" should "work for append mode in flatMap and sink" in {
  val testKey = "flatMapGroupsWithState-append-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())(MappingFunction)
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
 
  val query = flattenResults.writeStream.outputMode("append").foreach(
    new InMemoryStoreWriter[String](testKey, (state) => state)).start()
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      Thread.sleep(5000)
      inputStream.addData((1L, "test12"))
      inputStream.addData((1L, "test13"))
    }
  }).start()
 
  query.awaitTermination(45000)
  val savedValues = InMemoryKeyedStore.getValues(testKey)
  savedValues should have size 3
  savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
}
 
"flatMapGroupsWithState" should "fail for append mode in flatMap and complete mode in sink" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())(MappingFunction)
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
  }
 
  exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
    "on streaming DataFrames/Datasets")
}
 
"flatMapGroupsWithState with aggregation after mapping" should "fail for append because of missing watermark" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())(MappingFunction)
    .agg(count("*").as("count"))
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
  }
 
  exception.getMessage() should include("Append output mode not supported when there are streaming " +
    "aggregations on streaming DataFrames/DataSets without watermark")
}
 
"flatMapGroupsWithState with aggregation after mapping" should "succeed for append when watermark is defined" in {
  val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
  val now = 5000
  val flattenResults = inputStream.toDS().toDF("created", "id", "name")
    .groupByKey(row => row.getAs[Timestamp]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
    .toDF("created", "name")
    .withWatermark("created", "1 second")
    .groupBy("created")
    .agg(count("*"))
  inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
    (new Timestamp(now), 2L, "test20"))
 
  flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
}
 
"flatMapGroupsWithState with aggregation after mapping" should "fail for append when watermark is not defined" in {
  val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
  val now = 5000
  val flattenResults = inputStream.toDS().toDF("created", "id", "name")
    .groupByKey(row => row.getAs[Timestamp]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Append(),
      timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
    .toDF("created", "name")
    .groupBy("created")
    .agg(count("*"))
  inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
    (new Timestamp(now), 2L, "test20"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
  }
 
  exception.getMessage() should include("Append output mode not supported when there are streaming aggregations on " +
    "streaming DataFrames/DataSets without watermark")
}
 
"flatMapGroupsWithState with aggregation after" should "fail for update mode" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Update(),
    timeoutConf = NoTimeout())(MappingFunction)
    .agg(count("*"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[Row]()).start()
  }
 
  exception.message should include("flatMapGroupsWithState in update mode is not supported with " +
    "aggregation on a streaming DataFrame/Dataset")
}
 
"flatMapGroupsWithState without aggregation" should "be correctly executed in update mode" in {
  val testKey = "flatMapGroupsWithState-no-aggregation"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val flattenResults = inputStream.toDS().toDF("id", "name")
    .groupByKey(row => row.getAs[Long]("id"))
    .flatMapGroupsWithState(outputMode = OutputMode.Update(),
      timeoutConf = NoTimeout())(MappingFunction)
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
 
  val query = flattenResults.writeStream.outputMode("update")
    .foreach(new InMemoryStoreWriter[String](testKey, (state) => state)).start()
 
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      Thread.sleep(5000)
      inputStream.addData((1L, "test12"))
      inputStream.addData((1L, "test13"))
    }
  }).start()
  query.awaitTermination(40000)
 
  val savedValues = InMemoryKeyedStore.getValues(testKey)
  savedValues should have size 3
  savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
}
  • 剩余处理类型
  • 追加模式:支持
  • 完整模式:不支持-将全部数据保存在结果表中非常困难
  • 更新方式:支持

测试:

"append" should "work for map transform" in {
  val testKey = "other-processing-append-output-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedResult = inputStream.toDS().toDF("id", "name")
    .map(row => row.getAs[String]("name"))
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
 
  val query = mappedResult.writeStream.outputMode("append")
    .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      Thread.sleep(5000)
      inputStream.addData((1L, "test12"))
      inputStream.addData((1L, "test13"))
    }
  }).start()
  query.awaitTermination(30000)
 
  val processedValues = InMemoryKeyedStore.getValues(testKey)
  processedValues should have size 5
  processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
 
}
 
"update" should "work for map transform" in {
  val testKey = "other-processing-update-output-mode"
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedResult = inputStream.toDS().toDF("id", "name")
    .map(row => row.getAs[String]("name"))
  inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
 
  val query = mappedResult.writeStream.outputMode("update")
    .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      Thread.sleep(5000)
      inputStream.addData((1L, "test12"))
      inputStream.addData((1L, "test13"))
    }
  }).start()
  query.awaitTermination(30000)
 
  val processedValues = InMemoryKeyedStore.getValues(testKey)
  processedValues should have size 5
  processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
}
 
"complete mode" should "not be accepted in mapping transform" in {
  val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
  val mappedResults = inputStream.toDS().toDF("id", "name")
    .map(row => row.getAs[String]("name"))
  inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
 
  val exception = intercept[AnalysisException] {
    mappedResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
  }
 
  exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
    "on streaming DataFrames/Datasets")
}

Apache Spark 中的输出模式确定如何生成输出。 在3种不同的策略中,其中一种总是返回完整的结果,而另两种则要么追加不应该再接收数据的结果,要么更新已经计算的结果。 所有这些主要行为均在第3部分定义的测试中显示。