自定义会话窗口
— 焉知非鱼Spark Structured Streaming 允许我们以一种直接了当的方式在滑动事件时间窗口上执行聚合。
Spark Structured Streaming 提供开箱即用的事件时间上的窗口操作和水位。
用例 #
在会话持续时间内求用户收益的总和,当流中有存款事件时,存入当前余额并关闭会话。
窗口和水位 #
默认情况下,窗口和水位与基于时间的窗口一起使用。在我们的用例中,这不起作用,因为当我们收到一个存放当前余额的信号时,我们想关闭会话。
结果 #
我们可以通过基于 mapGroupsWithState 函数实现自定义会话窗口来实现.
定义自定义会话类 #
case class Transaction(sessionId: String, winAmount: Double, deposit: Boolean)
case class SessionTrackingValue(totalSum: Double)
case class SessionUpdate(sessionId: String, currentBalance: Double, depositCurrentBalance: Boolean)
创建本地 SparkSession #
val spark: SparkSession = SparkSession
.builder
.master("local[*]")
.appName(getClass.getName)
.getOrCreate()
创建 socket 流 #
val socketStream: DataFrame = spark.readStream
// socket as stream input
.format("socket")
// connect to socket port localhost:9999 waiting for incoming stream
.option("host", "localhost")
.option("port", 9999)
.load()
将输入映射为 Transaction case 类 #
import spark.implicits._
val transactions = socketStream
.as[String]
.map(inputLine => {
val fields = inputLine.split(",")
Transaction(fields(0), fields(1).toDouble, Try(fields(2).toBoolean).getOrElse(false))
})
如果输入中 包含 deposit
字段, 那它会被填充到 transaction 中。如果它没有被包含在 输入中, 我们使用 .getOrElse
来指定 false
时的值。
创建 KeyValueGroupedDataset 键值对儿(sessionId, Transaction) #
val idSessionKv: KeyValueGroupedDataset[String, Transaction] = transactions.groupByKey(x => x.sessionId)
使用 .mapGroupsWithState
检查状态是否存在并在状态上执行动作 #
val sessionUpdates: Dataset[SessionUpdate] = idSessionKv.mapGroupsWithState[SessionTrackingValue, SessionUpdate](GroupStateTimeout.NoTimeout()) {
// mapGroupsWithState: key: K, it: Iterator[V], s: GroupState[S]
case (sessionId: String, eventsIter: Iterator[Transaction], state: GroupState[SessionTrackingValue]) => {
val events = eventsIter.toSeq
val updatedSession =
if (state.exists) {
val existingState = state.get
val updatedEvents = SessionTrackingValue(existingState.totalSum + events.map(_.winAmount).sum)
updatedEvents
}
else {
SessionTrackingValue(events.map(event => event.winAmount).sum)
}
state.update(updatedSession)
val toCloseSession = events.exists(_.deposit)
// when there is an deposit in the event, close the session by removing the state
if (toCloseSession) {
// here we could perform a specific action when we receive the end of the session signal (store, send, update other state)
// in this case we would just deposit the current balance to a data store
// state.save() .. TODO unimplemented for this example
state.remove()
SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = true)
}
else {
SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = false)
}
}
}
运行程序 #
val query: StreamingQuery = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()