Spark Structured Streaming 中的输出模式
— 焉知非鱼结构化流引入了许多有关基于 DStream 的流的新概念。其中之一是输出模式。
这篇文章介绍了 Spark 2.0.0 中引入的用于处理流数据输出的输出模式。第一部分通过简短的理论部分向他们展示。第二部分介绍了它们的 API。最后一部分显示了它们在某些学习测试中的工作方式。
输出模式定义 #
输出模式指定将数据写入结果表的方式。在可用的输出模式中,我们可以区分:
- 追加(append)-这里仅将新行写入输出接收器。此模式保留给处理,不进行任何汇总,非常适合不可变的结果。 需要注意的重要一点是与水位的关系。如果在聚合中定义了水位,则它控制何时将附加数据发送到结果表。它仅在“中间”状态(=处理期间更改的状态)完成之后才发生。仅当水位的新值在给定组中的最新条目之前通过时,才会发生这种情况。例如,假设水位设置为19:00:00,并且组“A”的记录的最新值来自 19:00:05。现在,当水位传递到 19:00:06时,组“A”的结果将被发送到结果表。引擎认为不会传递任何新的事件,因此可以安全地将结果发送到输出表。
- 完成(compelete)-在这种模式下,所有行每次都被写入输出接收器。当流有一些更新时,将进行写入。此模式专用于具有聚合的处理。
- 更新(update)-与上一个相似,不同之处在于仅将更新的行写入输出接收器。
输出模式 API #
输出模式定义发生在 DataStreamWriter#outputMode(outputMode: String)方法中。因此,传递的名称稍后会从 InternalOutputModes 对象转换为相应的 case 对象。
解析的实例主要在 DataStreamWriter 类中使用。它从那里传递到 StreamingQueryManager#startQuery(userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, sink: Sink, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()).
。顾名思义,此方法开始执行流查询。
在物理执行方面,我们可以在 StateStoreSaveExec 中找到输出模式的轨迹。在那里存储中间状态结果。顺便说一下,我们可以找到许多有关水位的参考,有助于消除过时的结果。如果您想了解更多信息,请转到 Apache Spark 结构化流中有关 StateStore 的文章。
输出模式示例 #
下表总结了可用于给定类型的处理的模式。在每个测试之后,编写一些测试以显示使用情况和未使用情况:
- 带水位的聚合
- 附加模式: 受支持-但结果仅在超过水位后才发出(=输出完成)。如果未将汇总应用于水位的列,则该查询将不起作用。
- 完整模式: 受支持-与更新不同,不使用水位
- 更新模式: 支持-水位用于删除太旧的聚合
测试:
"the count on watermark column" should "be correctly computed in append mode" in {
val testKey = "append-output-mode-with-watermark-aggregation"
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val now = 5000L
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("created")
.count()
val query = aggregatedStream.writeStream.outputMode("append")
.foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}"))
.start()
new Thread(new Runnable() {
override def run(): Unit = {
// send the first batch - max time = 10 seconds, so the watermark will be 9 seconds
inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+4000), 3),
(new Timestamp(now+5000), 4))
while (!query.isActive) {}
Thread.sleep(10000)
inputStream.addData((new Timestamp(4000L), 5), (new Timestamp(now+4000), 6))
inputStream.addData((new Timestamp(11000), 7))
Thread.sleep(10000)
inputStream.addData((new Timestamp(11000), 8))
}
}).start()
query.awaitTermination(45000)
val readValues = InMemoryKeyedStore.getValues(testKey)
// As you can see, only the result for now+5000 was emitted. It's because the append output
// mode emits the results once a new watermark is defined and the accumulated results are below
// the new threshold
readValues should have size 2
readValues should contain allOf("1970-01-01 01:00:09.0 -> 2", "1970-01-01 01:00:10.0 -> 3")
}
"the count on non-watermark column" should "fail in append mode" in {
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("number")
.count()
val exception = intercept[AnalysisException]{
aggregatedStream.writeStream.outputMode("append")
.foreach(new NoopForeachWriter[Row]()).start()
}
exception.message should include("Append output mode not supported when there are streaming aggregations on " +
"streaming DataFrames/DataSets without watermark")
}
"the count on watermark column" should "be correctly computed in update mode" in {
val testKey = "update-output-mode-with-watermark-aggregation"
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val now = 5000L
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("created")
.count()
val query = aggregatedStream.writeStream.outputMode("update")
.foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
new Thread(new Runnable() {
override def run(): Unit = {
inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
(new Timestamp(now+5000), 4))
while (!query.isActive) {}
Thread.sleep(10000)
inputStream.addData((new Timestamp(4000L), 5))
inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
}
}).start()
query.awaitTermination(45000)
val readValues = InMemoryKeyedStore.getValues(testKey)
readValues should have size 2
readValues should contain allOf("1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1")
}
"the count on non-watermark column" should "be correctly computed in update mode" in {
val testKey = "update-output-mode-without-watermark-aggregation"
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val now = 5000L
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("number")
.count()
val query = aggregatedStream.writeStream.outputMode("update")
.foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
new Thread(new Runnable() {
override def run(): Unit = {
inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
(new Timestamp(now+5000), 3))
while (!query.isActive) {}
Thread.sleep(10000)
inputStream.addData((new Timestamp(4000L), 6))
inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
}
}).start()
query.awaitTermination(45000)
val readValues = InMemoryKeyedStore.getValues(testKey)
readValues should have size 5
readValues should contain allOf("1 -> 1", "3 -> 2", "2 -> 1", "6 -> 2", "7 -> 1")
}
"the count on watermark column" should "be correctly computed in complete mode" in {
val testKey = "update-output-mode-with-watermark-aggregation"
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val now = 5000L
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("created")
.count()
val query = aggregatedStream.writeStream.outputMode("complete")
.foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
new Thread(new Runnable() {
override def run(): Unit = {
inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
(new Timestamp(now+5000), 4))
while (!query.isActive) {}
Thread.sleep(10000)
inputStream.addData((new Timestamp(4000L), 5))
inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
}
}).start()
query.awaitTermination(45000)
val readValues = InMemoryKeyedStore.getValues(testKey)
println(s"${readValues}")
readValues should have size 5
readValues.sorted should equal (Seq("1970-01-01 01:00:04.0 -> 1", "1970-01-01 01:00:05.0 -> 1",
"1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1"))
}
"the count on non-watermark column" should "be correctly computed in complete mode" in {
val testKey = "update-output-mode-with-watermark-aggregation"
val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
val now = 5000L
val aggregatedStream = inputStream.toDS().toDF("created", "number")
.withWatermark("created", "1 second")
.groupBy("number")
.count()
val query = aggregatedStream.writeStream.outputMode("complete")
.foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
new Thread(new Runnable() {
override def run(): Unit = {
inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
(new Timestamp(now+5000), 4))
while (!query.isActive) {}
Thread.sleep(10000)
inputStream.addData((new Timestamp(4000L), 5))
inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
}
}).start()
query.awaitTermination(45000)
val readValues = InMemoryKeyedStore.getValues(testKey)
println(s"${readValues}")
readValues should have size 11
readValues.sorted should equal (Seq("1 -> 1", "1 -> 1", "2 -> 1", "2 -> 1",
"3 -> 1", "3 -> 1", "4 -> 1", "4 -> 1", "5 -> 1", "6 -> 1", "7 -> 1"))
}
- 剩余的聚合(无水位)
- 附加模式:不支持-聚合可以随时更新,这违反了该模式的原理
- 完整模式:支持
- 更新模式:受支持-但与上一点不同,旧聚合不被清除。 引擎不知道聚合何时过旧。
测试:
"append mode without aggregation" should "be disallow in structured streaming without watermark" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val counter =inputStream.toDS().toDF("id", "name").agg(count("*"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
counter.writeStream.outputMode("append").foreach(new NoopForeachWriter()).start()
}
exception.message should include ("Append output mode not supported when there are streaming " +
"aggregations on streaming DataFrames/DataSets without watermark")
}
"complete mode without aggregation" should "be allowed in structured streaming without watermark" in {
val testKey = "no-aggregation-complete-output-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val query = counter.writeStream.outputMode("complete").foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(row: Row): Unit = {
println(s"processing ${row}")
InMemoryKeyedStore.addValue(testKey, s"${row.getAs[String]("name")}=${row.getAs[Long]("count(1)")}")
}
override def close(errorOrNull: Throwable): Unit = {}
}).start()
new Thread(new Runnable() {
override def run(): Unit = {
Thread.sleep(10000)
// Add only 1 row to show that all entries are sent to the result table every time
inputStream.addData((1L, "test1"))
}
}).start()
query.awaitTermination(30000)
val processedRows = InMemoryKeyedStore.getValues(testKey)
processedRows should have size 6
val rowsTest1 = processedRows.filter(_.startsWith("test1="))
rowsTest1 should have size 2
rowsTest1 should contain allOf("test1=3", "test1=4")
val rowsTest2 = processedRows.filter(_.startsWith("test2="))
rowsTest2 should have size 2
rowsTest2 should contain only("test2=3")
val rowsTest3 = processedRows.filter(_.startsWith("test3="))
rowsTest3 should have size 2
rowsTest3 should contain only("test3=3")
}
"update mode without aggregation" should "be allowed in structured streaming without watermark" in {
val testKey = "no-aggregation-update-output-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val query = counter.writeStream.outputMode("update").foreach(new InMemoryStoreWriter[Row](testKey,
(processedRow) => s"${processedRow.getAs[String]("name")} -> ${processedRow.getAs[Long]("count(1)")}")).start()
new Thread(new Runnable() {
override def run(): Unit = {
Thread.sleep(10000)
// Add only 1 row to show that only updated row is sent to the result table
inputStream.addData((1L, "test1"))
}
}).start()
query.awaitTermination(30000)
val processedRows = InMemoryKeyedStore.getValues(testKey)
processedRows should have size 4
processedRows should contain allOf("test1 -> 3", "test1 -> 4", "test2 -> 3", "test3 -> 3")
}
- mapGroupsWithState 处理
- 追加模式:不支持
- 完整模式:不支持
- 更新方式:支持
测试:
private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Seq[String] = (key, values, state) => {
val stateNames = state.getOption.getOrElse(Seq.empty)
val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
state.update(stateNewNames)
stateNewNames
}
"append mode" should "not work for mapGroupWithState" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedValues =inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
val exception = intercept[AnalysisException] {
mappedValues.writeStream.outputMode("append").foreach(new NoopForeachWriter[Seq[String]]()).start()
}
exception.message should include("mapGroupsWithState is not supported with Append output mode on a " +
"streaming DataFrame/Dataset")
}
"complete mode" should "not work for mapGroupWithState" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedValues =inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
val exception = intercept[AnalysisException] {
mappedValues.writeStream.outputMode("complete").foreach(new NoopForeachWriter[Seq[String]]()).start()
}
exception.message should include("Complete output mode not supported when there are no streaming " +
"aggregations on streaming DataFrames/Datasets")
}
"update mode" should "work for mapGroupWithState" in {
val testKey = "mapGroupWithState-update-output-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedValues =inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"), (3L, "test30"))
val query = mappedValues.writeStream.outputMode("update")
.foreach(new InMemoryStoreWriter[Seq[String]](testKey, (stateSeq) => stateSeq.mkString(","))).start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
Thread.sleep(5000)
inputStream.addData((1L, "test12"), (1L, "test13"), (2L, "test21"))
}
}).start()
query.awaitTermination(30000)
val savedValues = InMemoryKeyedStore.getValues(testKey)
savedValues should have size 5
savedValues should contain allOf("test30", "test10,test11", "test20", "test10,test11,test12,test13",
"test20,test21")
}
- flatMapGroupsWithState
- 追加模式:支持。 此外,在 flatMapGroupsWithState 之后允许进行聚合。 为了工作,必须满足一个条件。 必须在 flatMapGroupsWithState 转换后声明的聚合之前定义水位。
- 完整模式:不支持
- 更新模式:受支持,但在 flatMapGroupsWithState 之后不允许聚合
测试:
private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Iterator[String] = (key, values, state) => {
val stateNames = state.getOption.getOrElse(Seq.empty)
//state.setTimeoutDuration(1000L)
val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
state.update(stateNewNames)
Iterator(stateNewNames.mkString(","))
}
"multiple flatMapGroupsWithState" should "fail because they're not all in append mode" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction)
.groupByKey(key => key)
.flatMapGroupsWithState(outputMode = OutputMode.Update(),
timeoutConf = NoTimeout())((key, values, state: GroupState[String]) => Iterator(""))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
}
exception.message should include("Multiple flatMapGroupsWithStates are not supported when they are not " +
"all in append mode or the output mode is not append on a streaming DataFrames/Datasets")
}
"flatMapGroupsWithState" should "fail for complete mode" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val exception = intercept[IllegalArgumentException] {
inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Complete(),
timeoutConf = NoTimeout())(MappingFunction)
}
exception.getMessage should include("The output mode of function should be append or update")
}
"flatMapGroupsWithState" should "fail for append mode in flatMap and update mode in sink" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())(MappingFunction)
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[String]()).start()
}
exception.message should include("flatMapGroupsWithState in append mode is not supported with Update output " +
"mode on a streaming DataFrame/Dataset")
}
"flatMapGroupsWithState" should "work for append mode in flatMap and sink" in {
val testKey = "flatMapGroupsWithState-append-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())(MappingFunction)
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
val query = flattenResults.writeStream.outputMode("append").foreach(
new InMemoryStoreWriter[String](testKey, (state) => state)).start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
Thread.sleep(5000)
inputStream.addData((1L, "test12"))
inputStream.addData((1L, "test13"))
}
}).start()
query.awaitTermination(45000)
val savedValues = InMemoryKeyedStore.getValues(testKey)
savedValues should have size 3
savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
}
"flatMapGroupsWithState" should "fail for append mode in flatMap and complete mode in sink" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())(MappingFunction)
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
}
exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
"on streaming DataFrames/Datasets")
}
"flatMapGroupsWithState with aggregation after mapping" should "fail for append because of missing watermark" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())(MappingFunction)
.agg(count("*").as("count"))
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
}
exception.getMessage() should include("Append output mode not supported when there are streaming " +
"aggregations on streaming DataFrames/DataSets without watermark")
}
"flatMapGroupsWithState with aggregation after mapping" should "succeed for append when watermark is defined" in {
val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
val now = 5000
val flattenResults = inputStream.toDS().toDF("created", "id", "name")
.groupByKey(row => row.getAs[Timestamp]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
.toDF("created", "name")
.withWatermark("created", "1 second")
.groupBy("created")
.agg(count("*"))
inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
(new Timestamp(now), 2L, "test20"))
flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
}
"flatMapGroupsWithState with aggregation after mapping" should "fail for append when watermark is not defined" in {
val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
val now = 5000
val flattenResults = inputStream.toDS().toDF("created", "id", "name")
.groupByKey(row => row.getAs[Timestamp]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Append(),
timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
.toDF("created", "name")
.groupBy("created")
.agg(count("*"))
inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
(new Timestamp(now), 2L, "test20"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
}
exception.getMessage() should include("Append output mode not supported when there are streaming aggregations on " +
"streaming DataFrames/DataSets without watermark")
}
"flatMapGroupsWithState with aggregation after" should "fail for update mode" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Update(),
timeoutConf = NoTimeout())(MappingFunction)
.agg(count("*"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[Row]()).start()
}
exception.message should include("flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
}
"flatMapGroupsWithState without aggregation" should "be correctly executed in update mode" in {
val testKey = "flatMapGroupsWithState-no-aggregation"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val flattenResults = inputStream.toDS().toDF("id", "name")
.groupByKey(row => row.getAs[Long]("id"))
.flatMapGroupsWithState(outputMode = OutputMode.Update(),
timeoutConf = NoTimeout())(MappingFunction)
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
val query = flattenResults.writeStream.outputMode("update")
.foreach(new InMemoryStoreWriter[String](testKey, (state) => state)).start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
Thread.sleep(5000)
inputStream.addData((1L, "test12"))
inputStream.addData((1L, "test13"))
}
}).start()
query.awaitTermination(40000)
val savedValues = InMemoryKeyedStore.getValues(testKey)
savedValues should have size 3
savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
}
- 剩余处理类型
- 追加模式:支持
- 完整模式:不支持-将全部数据保存在结果表中非常困难
- 更新方式:支持
测试:
"append" should "work for map transform" in {
val testKey = "other-processing-append-output-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedResult = inputStream.toDS().toDF("id", "name")
.map(row => row.getAs[String]("name"))
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
val query = mappedResult.writeStream.outputMode("append")
.foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
Thread.sleep(5000)
inputStream.addData((1L, "test12"))
inputStream.addData((1L, "test13"))
}
}).start()
query.awaitTermination(30000)
val processedValues = InMemoryKeyedStore.getValues(testKey)
processedValues should have size 5
processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
}
"update" should "work for map transform" in {
val testKey = "other-processing-update-output-mode"
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedResult = inputStream.toDS().toDF("id", "name")
.map(row => row.getAs[String]("name"))
inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
val query = mappedResult.writeStream.outputMode("update")
.foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
new Thread(new Runnable() {
override def run(): Unit = {
while (!query.isActive) {}
Thread.sleep(5000)
inputStream.addData((1L, "test12"))
inputStream.addData((1L, "test13"))
}
}).start()
query.awaitTermination(30000)
val processedValues = InMemoryKeyedStore.getValues(testKey)
processedValues should have size 5
processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
}
"complete mode" should "not be accepted in mapping transform" in {
val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
val mappedResults = inputStream.toDS().toDF("id", "name")
.map(row => row.getAs[String]("name"))
inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
val exception = intercept[AnalysisException] {
mappedResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
}
exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
"on streaming DataFrames/Datasets")
}
Apache Spark 中的输出模式确定如何生成输出。 在3种不同的策略中,其中一种总是返回完整的结果,而另两种则要么追加不应该再接收数据的结果,要么更新已经计算的结果。 所有这些主要行为均在第3部分定义的测试中显示。