Wait the light to fall

自定义会话窗口

焉知非鱼

Spark Structured Streaming 允许我们以一种直接了当的方式在滑动事件时间窗口上执行聚合。

Spark Structured Streaming 提供开箱即用的事件时间上的窗口操作水位

用例 #

在会话持续时间内求用户收益的总和,当流中有存款事件时,存入当前余额并关闭会话。

窗口和水位 #

默认情况下,窗口和水位与基于时间的窗口一起使用。在我们的用例中,这不起作用,因为当我们收到一个存放当前余额的信号时,我们想关闭会话。

结果 #

我们可以通过基于 mapGroupsWithState 函数实现自定义会话窗口来实现.

定义自定义会话类 #

case class Transaction(sessionId: String, winAmount: Double, deposit: Boolean)
case class SessionTrackingValue(totalSum: Double)
case class SessionUpdate(sessionId: String, currentBalance: Double, depositCurrentBalance: Boolean)

创建本地 SparkSession #

val spark: SparkSession = SparkSession
  .builder
  .master("local[*]")
  .appName(getClass.getName)
  .getOrCreate()

创建 socket 流 #

val socketStream: DataFrame = spark.readStream
  // socket as stream input
  .format("socket")
  // connect to socket port localhost:9999 waiting for incoming stream
  .option("host", "localhost")
  .option("port", 9999)
  .load()

将输入映射为 Transaction case 类 #

import spark.implicits._
    
    val transactions = socketStream
      .as[String]
      .map(inputLine => {
      val fields = inputLine.split(",")
      Transaction(fields(0), fields(1).toDouble, Try(fields(2).toBoolean).getOrElse(false))
    })

如果输入中 包含 deposit 字段, 那它会被填充到 transaction 中。如果它没有被包含在 输入中, 我们使用 .getOrElse 来指定 false 时的值。

创建 KeyValueGroupedDataset 键值对儿(sessionId, Transaction) #

val idSessionKv: KeyValueGroupedDataset[String, Transaction] = transactions.groupByKey(x => x.sessionId)

使用 .mapGroupsWithState 检查状态是否存在并在状态上执行动作 #

val sessionUpdates: Dataset[SessionUpdate] = idSessionKv.mapGroupsWithState[SessionTrackingValue, SessionUpdate](GroupStateTimeout.NoTimeout()) {
      // mapGroupsWithState: key: K, it: Iterator[V], s: GroupState[S]
      case (sessionId: String, eventsIter: Iterator[Transaction], state: GroupState[SessionTrackingValue]) => {
        val events = eventsIter.toSeq
        val updatedSession =
          if (state.exists) {
            val existingState = state.get
            val updatedEvents = SessionTrackingValue(existingState.totalSum + events.map(_.winAmount).sum)
            updatedEvents
          }
          else {
            SessionTrackingValue(events.map(event => event.winAmount).sum)
          }

        state.update(updatedSession)

        val toCloseSession = events.exists(_.deposit)

        // when there is an deposit in the event, close the session by removing the state
        if (toCloseSession) {
          // here we could perform a specific action when we receive the end of the session signal (store, send, update other state)
          // in this case we would just deposit the current balance to a data store
          // state.save() .. TODO unimplemented for this example
          state.remove()
          SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = true)
        }
        else {
          SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = false)
        }
      }
    }

运行程序 #

val query: StreamingQuery = sessionUpdates
      .writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()