Wait the light to fall

使用 Spark Structured Streaming 进行行程划分

焉知非鱼

使用 Structured Spark Streaming 进行行程划分 #

我们有一辆车, 车上的传感器每隔 1 秒发出一条 JSON 格式的数据。每条数据包含 3 个字段:

field description type unit
vin 车辆唯一标识 String
createTime 信号发生时间 Long 毫秒
mileage 当前里程数 Int 千米

下面一段样本数据:

{'vin':'LSJA0000000000091','createTime':1546667565,'mileage':0}
{'vin':'LSJA0000000000091','createTime':1546667566,'mileage':1}
{'vin':'LSJA0000000000091','createTime':1546667567,'mileage':2}
{'vin':'LSJA0000000000091','createTime':1546667568,'mileage':3}
{'vin':'LSJA0000000000091','createTime':1546667569,'mileage':4}
{'vin':'LSJA0000000000091','createTime':1546667570,'mileage':5}
{'vin':'LSJA0000000000091','createTime':1546667571,'mileage':6}
{'vin':'LSJA0000000000091','createTime':1546667572,'mileage':7}
{'vin':'LSJA0000000000091','createTime':1546667573,'mileage':8}
{'vin':'LSJA0000000000091','createTime':1546667574,'mileage':9}
{'vin':'LSJA0000000000091','createTime':1546667575,'mileage':10}
{'vin':'LSJA0000000000091','createTime':1546667576,'mileage':11}
{'vin':'LSJA0000000000091','createTime':1546667577,'mileage':12}
{'vin':'LSJA0000000000091','createTime':1546667578,'mileage':13}
{'vin':'LSJA0000000000091','createTime':1546667579,'mileage':14}
{'vin':'LSJA0000000000091','createTime':1546667580,'mileage':15}
{'vin':'LSJA0000000000091','createTime':1546667581,'mileage':16}
{'vin':'LSJA0000000000091','createTime':1546667582,'mileage':17}
{'vin':'LSJA0000000000091','createTime':1546667583,'mileage':18}
{'vin':'LSJA0000000000091','createTime':1546667584,'mileage':19}
{'vin':'LSJA0000000000091','createTime':1546667585,'mileage':20}
{'vin':'LSJA0000000000091','createTime':1546667586,'mileage':21}
{'vin':'LSJA0000000000091','createTime':1546667587,'mileage':22}
{'vin':'LSJA0000000000091','createTime':1546667588,'mileage':23}
{'vin':'LSJA0000000000091','createTime':1546667589,'mileage':24}
{'vin':'LSJA0000000000091','createTime':1546667590,'mileage':25}
{'vin':'LSJA0000000000091','createTime':1546667591,'mileage':26}
{'vin':'LSJA0000000000091','createTime':1546667592,'mileage':27}
{'vin':'LSJA0000000000091','createTime':1546667593,'mileage':28}
{'vin':'LSJA0000000000091','createTime':1546667594,'mileage':29}
{'vin':'LSJA0000000000091','createTime':1546667595,'mileage':30}
{'vin':'LSJA0000000000091','createTime':1546667596,'mileage':31}
{'vin':'LSJA0000000000091','createTime':1546667597,'mileage':32}
{'vin':'LSJA0000000000091','createTime':1546667598,'mileage':33}
{'vin':'LSJA0000000000091','createTime':1546667599,'mileage':34}
{'vin':'LSJA0000000000091','createTime':1546667600,'mileage':35}
{'vin':'LSJA0000000000091','createTime':1546667601,'mileage':36}
{'vin':'LSJA0000000000091','createTime':1546667602,'mileage':37}
{'vin':'LSJA0000000000091','createTime':1546667603,'mileage':38}
{'vin':'LSJA0000000000091','createTime':1546667604,'mileage':39}
{'vin':'LSJA0000000000091','createTime':1546667605,'mileage':40}
{'vin':'LSJA0000000000091','createTime':1546667606,'mileage':41}
{'vin':'LSJA0000000000091','createTime':1546667607,'mileage':42}
{'vin':'LSJA0000000000091','createTime':1546667608,'mileage':43}
{'vin':'LSJA0000000000091','createTime':1546667609,'mileage':44}
{'vin':'LSJA0000000000091','createTime':1546667610,'mileage':45}
{'vin':'LSJA0000000000091','createTime':1546667611,'mileage':46}
{'vin':'LSJA0000000000091','createTime':1546667612,'mileage':47}
{'vin':'LSJA0000000000091','createTime':1546667613,'mileage':48}
{'vin':'LSJA0000000000091','createTime':1546667614,'mileage':49}
{'vin':'LSJA0000000000091','createTime':1546667615,'mileage':50}
{'vin':'LSJA0000000000091','createTime':1546667616,'mileage':51}
{'vin':'LSJA0000000000091','createTime':1546667617,'mileage':52}
{'vin':'LSJA0000000000091','createTime':1546667618,'mileage':53}
{'vin':'LSJA0000000000091','createTime':1546667619,'mileage':54}
{'vin':'LSJA0000000000091','createTime':1546667620,'mileage':55}
{'vin':'LSJA0000000000091','createTime':1546667621,'mileage':56}
{'vin':'LSJA0000000000091','createTime':1546667622,'mileage':57}
{'vin':'LSJA0000000000091','createTime':1546667623,'mileage':58}
{'vin':'LSJA0000000000091','createTime':1546667624,'mileage':59}



{'vin':'LSJA0000000000091','createTime':1546667745,'mileage':60}
{'vin':'LSJA0000000000091','createTime':1546667746,'mileage':61}
{'vin':'LSJA0000000000091','createTime':1546667747,'mileage':62}
{'vin':'LSJA0000000000091','createTime':1546667748,'mileage':63}
{'vin':'LSJA0000000000091','createTime':1546667749,'mileage':64}
{'vin':'LSJA0000000000091','createTime':1546667750,'mileage':65}
{'vin':'LSJA0000000000091','createTime':1546667751,'mileage':66}
{'vin':'LSJA0000000000091','createTime':1546667752,'mileage':67}
{'vin':'LSJA0000000000091','createTime':1546667753,'mileage':68}
{'vin':'LSJA0000000000091','createTime':1546667754,'mileage':69}
{'vin':'LSJA0000000000091','createTime':1546667755,'mileage':70}
{'vin':'LSJA0000000000091','createTime':1546667756,'mileage':71}
{'vin':'LSJA0000000000091','createTime':1546667757,'mileage':72}
{'vin':'LSJA0000000000091','createTime':1546667758,'mileage':73}
{'vin':'LSJA0000000000091','createTime':1546667759,'mileage':74}
{'vin':'LSJA0000000000091','createTime':1546667760,'mileage':75}
{'vin':'LSJA0000000000091','createTime':1546667761,'mileage':76}
{'vin':'LSJA0000000000091','createTime':1546667762,'mileage':77}
{'vin':'LSJA0000000000091','createTime':1546667763,'mileage':78}
{'vin':'LSJA0000000000091','createTime':1546667764,'mileage':79}
{'vin':'LSJA0000000000091','createTime':1546667765,'mileage':80}
{'vin':'LSJA0000000000091','createTime':1546667766,'mileage':81}
{'vin':'LSJA0000000000091','createTime':1546667767,'mileage':82}
{'vin':'LSJA0000000000091','createTime':1546667768,'mileage':83}
{'vin':'LSJA0000000000091','createTime':1546667769,'mileage':84}

开车的时候, 从一个地方到另一个地方, 会有很多行程

行程是一段一段的, 比如你开车送外卖, 每次停车后都要花 5 分钟把外卖送到客户手里, 你一上午送了 3 份外卖, 那么就产生了 3 个行程。每个行程之间的时间之差至少为 5 分钟, 如果 15 分钟你还没送到, 你就得主动打电话给客户说, 对不起, 我到你楼下了, 我先点”已送达“哈。

我们今天的任务就是根据传感器发送的数据划分行程。其实就是对这些数据进行分组, 同一个行程的分为一组。划分行程只需要考虑如下 3 种情况:

  • (一)行程超时
  • (二)行程正常结束
  • (三)行程正在进行中

第一个,行程超时,开始了一个行程,但是结束行程的条件一直不满足,时间到了,这个行程还没有结束,这个时候直接返回这个行程。
第二个,行程正常结束,就是俩个信号的 createTime 之差大于 5 分钟, 也就是说,自上一条信号发送之后,已经有 5 分钟没有接收到新的数据了,那么就结束上一个行程,并开始下一个行程。
第三个,行程正在进行中,俩个信号的 createTime 之差小于 5 分钟,则认为行程还在继续,只需更新行程状态即可。

上面是用 Raku 模拟的数据, 每隔 1 秒 发送一条数据, 每隔 1 分钟”睡眠“一次, 睡眠的时候不发送数据:

# send-data-to-socket.pl6
sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {

    my $vin = 'LSJA0000000000091';
    my $last_meter = 0;

    react {
        whenever IO::Socket::Async.listen($host, $port) -> $conn {
            react {
                my Bool:D $ignore = True;

                whenever Supply.interval(60).rotor(1, 1 => 1) {
                    $ignore = !$ignore;
                }

                whenever Supply.interval(1) {
                    next if $ignore;
                    print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                    $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
                }

                whenever signal(SIGINT) {
                    say "Done.";
                    done;
                }
            }
        }
        CATCH {
            default {
                say .^name, ': ', .Str;
                say "handled in $?LINE";
            }
        }
    }
}

以下是使用 Structured Spark Streaming 实现的行程划分:

package ohmysummer

import com.alibaba.fastjson.JSON
import ohmysummer.conf.SocketConfiguration
import ohmysummer.model.SourceData
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.streaming.Duration


/**
  * 使用 Structured Streaming 进行行程划分
  * 问题:一个 sourceData 里面, 某 vin 会有俩个或俩个以上的行程吗?
  * 超时时间为 15 分钟, 超时则返回 TripUpdate, 并将 status 置为 2
  * 俩条数据之间的 createTime 之差如果大于 5 分钟, 则结束上一个行程, 并开始下一个行程
  * 数据每 5 秒一个微批,可知一个微批里面, 某 vin 不会出现俩个以上的行程!
  *
  * 我们测试当然不用等那么久, ProcessingTime 即微批时间我们设置为 1s
  * 俩条数据之间的 createTime 之差如果大于 1 分钟, 则结束上一个行程, 并开始下一个新的行程
  * 超时时间设置为 3 分钟
  */
object SubTrip {

  def main(args: Array[String]): Unit = {

    val socketConf = new SocketConfiguration
    val host = socketConf.host
    val port = socketConf.port

    val spark: SparkSession = SparkSession.builder
      .master("local[2]")
      .appName("Stateful Structured Streaming")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val ds: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
      .as[String]

    val events = ds.map { case line =>

      val data       = JSON.parseObject(line)
      val vin        = data.getString("vin")
      val createTime = data.getLong("createTime")
      val mileage    = data.getLong("mileage")

      // 将每行数据转换为 SourceData 数据源
      SourceData(vin, createTime, mileage)
    }

    val tripIdleTimeout: Duration = Duration(3 * 60 * 1000)  // 状态闲置时长,超时时间 3 分钟
    val tripDuration: Duration    = Duration(1 * 60 * 1000)  // 两个行程间隔时长,1 分钟

    val finish = events
      .groupByKey(event => event.vin)
      .mapGroupsWithState[TripInfo, TripUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

      case (vin: String, source: Iterator[SourceData], state: GroupState[TripInfo]) =>
        if (state.hasTimedOut && state.exists) { // 超时
          TripUpdate(
            vin,
            state.get.tripStartTime,
            state.get.tripEndTime,
            state.get.startMileage,
            state.get.endMileage,
            state.get.tripDuration,
            state.get.tripDistance,
            tripStatus = 0 // 超时结束的行程
          )
        } else {
          // Iterator 消费一次就没有了, 下面多次用到 source 源数据, 所以要转成 Seq,
          // 否则第二个使用 source 的函数接受到的 data 就是空的了, 会报错
          val data = source.toSeq
          var lastTrip = getLastState(data, state) // 获取该车旧的行程状态

          // 更新行程信息
          if (state.exists) {
            state.setTimeoutDuration(tripIdleTimeout.milliseconds)
            val lastTripInfo = state.get // 获取旧的行程的状态

            // 找到第一个 1 分钟未上传数据的点
            data.map{ _.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds }.foreach(println(_))
            val guard: Option[SourceData] = data.find(_.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds > 0)

            guard match {
              case Some(d) => { // 划分行程
                println("开始划分新的行程了: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
                initState(data, state) // 初始化一个新的行程
                println("初始化新的行程后: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)

                println("lastTripInfo: ", lastTripInfo.tripStartTime, lastTripInfo.tripEndTime)

                TripUpdate(
                  vin,
                  tripStartTime = lastTripInfo.tripStartTime,
                  tripEndTime   = lastTripInfo.tripEndTime,
                  startMileage  = lastTripInfo.startMileage,
                  endMileage    = lastTripInfo.endMileage,
                  tripDuration  = lastTripInfo.tripDuration,
                  tripDistance  = lastTripInfo.tripDistance,
                  tripStatus    = 1 // 正常结束的行程
                )

              }

              case _ => {
                val updatedTripInfo = getUpdateState(data, state)
                state.update(updatedTripInfo)
                TripUpdate(
                  vin,
                  tripStartTime = state.get.tripStartTime,
                  tripEndTime   = state.get.tripEndTime,
                  startMileage  = state.get.startMileage,
                  endMileage    = state.get.endMileage,
                  tripDuration  = state.get.tripDuration,
                  tripDistance  = state.get.tripDistance,
                  tripStatus    = 2 // 正常进行中的行程
                )
              }
            }

          } else { // state 不存在, 则为第一次新进来的数据, 那么初始化一个初始状态
            initState(data, state)
            println("initState: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
            println("current State: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
            val updatedTripInfo = TripInfo(
              state.get.tripStartTime,
              state.get.tripEndTime,
              state.get.startMileage,
              state.get.endMileage
            )

            state.update(updatedTripInfo)

            TripUpdate(
              vin,
              tripStartTime = state.get.tripStartTime,
              tripEndTime   = state.get.tripEndTime,
              startMileage  = state.get.startMileage,
              endMileage    = state.get.endMileage,
              tripDuration  = state.get.tripDuration,
              tripDistance  = state.get.tripDistance,
              tripStatus    = 0 // 初始的行程
            )
          }
        }
    }

    finish.writeStream
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("1 seconds"))
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()
  }

  /**
    * @param sourceData 新的源数据序列
    * @param state 旧的状态
    * @return
    */
  def getUpdateState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
    var tripStartTime: Long = 0
    var tripEndTime: Long = 0
    var startMileage: Long = 0
    var endMileage: Long = 0

    tripStartTime = state.get.tripStartTime
    startMileage  = state.get.startMileage
    tripEndTime   = sourceData.map(_.createTime).max // 更新 tripEndTime
    endMileage    = sourceData.map(_.mileage).max    // 更新 endMileage

    TripInfo(
      tripStartTime,
      tripEndTime,
      startMileage,
      endMileage
    )
  }

  /**
    * 获取上一个行程的状态
    * @param sourceData 新摄入的源数据序列
    * @param state 内存中上一个行程的状态
    * @return 刷新后的行程信息
    */
  def getLastState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
    if (state.exists) {
      state.get
    } else { // 如果状态不存在, 则 sourceData 是新的源数据

      val tripStartTime = sourceData.map(_.createTime).min
      val startMileage  = sourceData.map(_.mileage).min

      TripInfo(
        tripStartTime,
        tripEndTime = tripStartTime, // 将新的源数据序列中的 createTime 最小值作为行程开始时间
        startMileage,
        endMileage = startMileage    // 将新的源数据序列中的 mileage 最小值作为行程开始里程
      )
    }
  }

  /**
    * 初始化下一个行程
    * @param sourceData 新的源数据序列
    * @param state 内存中的旧的 state
    */
  def initState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): Unit = {

    val tripStartTime = sourceData.map(_.createTime).min
    val startMileage  = sourceData.map(_.mileage).min

    val initTripInfo = TripInfo(
      tripStartTime,
      tripEndTime = tripStartTime,
      startMileage,
      endMileage = startMileage
    )
    state.update(initTripInfo)
  }
}


/**
  * 行程信息
  * @param tripStartTime 行程开始时间
  * @param tripEndTime   行程结束时间
  * @param startMileage  开始里程数
  * @param endMileage    结束里程数
  */
case class TripInfo(
    var tripStartTime: Long,
    var tripEndTime:   Long,
    var startMileage:  Long,
    var endMileage:    Long
) {
  def tripDuration: Long = tripEndTime - tripStartTime
  def tripDistance: Long = endMileage  - startMileage
}

/**
  * 更新后的行程
  * @param vin           车架号
  * @param tripStartTime 行程开始时间
  * @param tripEndTime   行程结束时间
  * @param startMileage  开始里程数
  * @param endMileage    结束里程数
  * @param tripDuration  行驶时长
  * @param tripDistance  行驶距离
  * @param tripStatus    行程状态
  */
case class TripUpdate(
    vin:               String,
    var tripStartTime: Long,
    var tripEndTime:   Long,
    var startMileage:  Long,
    var endMileage:    Long,
    var tripDuration:  Long,
    var tripDistance:  Long,
    var tripStatus:    Int
)

SourceData 是一个 case class:

/**
  * 数据源
  * @param vin        车架号
  * @param createTime 信号发生时间
  * @param mileage    当前里程
  */
case class SourceData (
   vin:        String,
   createTime: Long,
   mileage:    Long
)

之前一样, 先执行:

raku send-data-to-socket.pl6

再启动该 structured spark streaming 程序:

#!/bin/sh

spark-submit \
  --class ohmysummer.SubTrip \
  --master local[2] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  target/socket-structured-streaming-1.0-SNAPSHOT.jar

打印的日志如下:

2019-01-05 14:11:45 WARN  ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1676 milliseconds
61000
(开始划分新的行程了: ,1546668643,179,1546668702,238)
(初始化新的行程后: ,1546668823,239,1546668823,239)
(lastTripInfo: ,1546668643,1546668702)
-------------------------------------------
Batch: 112
-------------------------------------------
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|vin              |tripStartTime|tripEndTime|startMileage|endMileage|tripDuration|tripDistance|tripStatus|
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|LSJA0000000000091|1546668643   |1546668702 |179         |238       |59          |59          |1         |
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+

源代码在这里

下次再见,新年快乐!