使用 Spark Structured Streaming 进行行程划分
— 焉知非鱼使用 Structured Spark Streaming 进行行程划分 #
我们有一辆车, 车上的传感器每隔 1 秒发出一条 JSON 格式的数据。每条数据包含 3 个字段:
field | description | type | unit |
---|---|---|---|
vin | 车辆唯一标识 | String | 辆 |
createTime | 信号发生时间 | Long | 毫秒 |
mileage | 当前里程数 | Int | 千米 |
下面一段样本数据:
{'vin':'LSJA0000000000091','createTime':1546667565,'mileage':0}
{'vin':'LSJA0000000000091','createTime':1546667566,'mileage':1}
{'vin':'LSJA0000000000091','createTime':1546667567,'mileage':2}
{'vin':'LSJA0000000000091','createTime':1546667568,'mileage':3}
{'vin':'LSJA0000000000091','createTime':1546667569,'mileage':4}
{'vin':'LSJA0000000000091','createTime':1546667570,'mileage':5}
{'vin':'LSJA0000000000091','createTime':1546667571,'mileage':6}
{'vin':'LSJA0000000000091','createTime':1546667572,'mileage':7}
{'vin':'LSJA0000000000091','createTime':1546667573,'mileage':8}
{'vin':'LSJA0000000000091','createTime':1546667574,'mileage':9}
{'vin':'LSJA0000000000091','createTime':1546667575,'mileage':10}
{'vin':'LSJA0000000000091','createTime':1546667576,'mileage':11}
{'vin':'LSJA0000000000091','createTime':1546667577,'mileage':12}
{'vin':'LSJA0000000000091','createTime':1546667578,'mileage':13}
{'vin':'LSJA0000000000091','createTime':1546667579,'mileage':14}
{'vin':'LSJA0000000000091','createTime':1546667580,'mileage':15}
{'vin':'LSJA0000000000091','createTime':1546667581,'mileage':16}
{'vin':'LSJA0000000000091','createTime':1546667582,'mileage':17}
{'vin':'LSJA0000000000091','createTime':1546667583,'mileage':18}
{'vin':'LSJA0000000000091','createTime':1546667584,'mileage':19}
{'vin':'LSJA0000000000091','createTime':1546667585,'mileage':20}
{'vin':'LSJA0000000000091','createTime':1546667586,'mileage':21}
{'vin':'LSJA0000000000091','createTime':1546667587,'mileage':22}
{'vin':'LSJA0000000000091','createTime':1546667588,'mileage':23}
{'vin':'LSJA0000000000091','createTime':1546667589,'mileage':24}
{'vin':'LSJA0000000000091','createTime':1546667590,'mileage':25}
{'vin':'LSJA0000000000091','createTime':1546667591,'mileage':26}
{'vin':'LSJA0000000000091','createTime':1546667592,'mileage':27}
{'vin':'LSJA0000000000091','createTime':1546667593,'mileage':28}
{'vin':'LSJA0000000000091','createTime':1546667594,'mileage':29}
{'vin':'LSJA0000000000091','createTime':1546667595,'mileage':30}
{'vin':'LSJA0000000000091','createTime':1546667596,'mileage':31}
{'vin':'LSJA0000000000091','createTime':1546667597,'mileage':32}
{'vin':'LSJA0000000000091','createTime':1546667598,'mileage':33}
{'vin':'LSJA0000000000091','createTime':1546667599,'mileage':34}
{'vin':'LSJA0000000000091','createTime':1546667600,'mileage':35}
{'vin':'LSJA0000000000091','createTime':1546667601,'mileage':36}
{'vin':'LSJA0000000000091','createTime':1546667602,'mileage':37}
{'vin':'LSJA0000000000091','createTime':1546667603,'mileage':38}
{'vin':'LSJA0000000000091','createTime':1546667604,'mileage':39}
{'vin':'LSJA0000000000091','createTime':1546667605,'mileage':40}
{'vin':'LSJA0000000000091','createTime':1546667606,'mileage':41}
{'vin':'LSJA0000000000091','createTime':1546667607,'mileage':42}
{'vin':'LSJA0000000000091','createTime':1546667608,'mileage':43}
{'vin':'LSJA0000000000091','createTime':1546667609,'mileage':44}
{'vin':'LSJA0000000000091','createTime':1546667610,'mileage':45}
{'vin':'LSJA0000000000091','createTime':1546667611,'mileage':46}
{'vin':'LSJA0000000000091','createTime':1546667612,'mileage':47}
{'vin':'LSJA0000000000091','createTime':1546667613,'mileage':48}
{'vin':'LSJA0000000000091','createTime':1546667614,'mileage':49}
{'vin':'LSJA0000000000091','createTime':1546667615,'mileage':50}
{'vin':'LSJA0000000000091','createTime':1546667616,'mileage':51}
{'vin':'LSJA0000000000091','createTime':1546667617,'mileage':52}
{'vin':'LSJA0000000000091','createTime':1546667618,'mileage':53}
{'vin':'LSJA0000000000091','createTime':1546667619,'mileage':54}
{'vin':'LSJA0000000000091','createTime':1546667620,'mileage':55}
{'vin':'LSJA0000000000091','createTime':1546667621,'mileage':56}
{'vin':'LSJA0000000000091','createTime':1546667622,'mileage':57}
{'vin':'LSJA0000000000091','createTime':1546667623,'mileage':58}
{'vin':'LSJA0000000000091','createTime':1546667624,'mileage':59}
{'vin':'LSJA0000000000091','createTime':1546667745,'mileage':60}
{'vin':'LSJA0000000000091','createTime':1546667746,'mileage':61}
{'vin':'LSJA0000000000091','createTime':1546667747,'mileage':62}
{'vin':'LSJA0000000000091','createTime':1546667748,'mileage':63}
{'vin':'LSJA0000000000091','createTime':1546667749,'mileage':64}
{'vin':'LSJA0000000000091','createTime':1546667750,'mileage':65}
{'vin':'LSJA0000000000091','createTime':1546667751,'mileage':66}
{'vin':'LSJA0000000000091','createTime':1546667752,'mileage':67}
{'vin':'LSJA0000000000091','createTime':1546667753,'mileage':68}
{'vin':'LSJA0000000000091','createTime':1546667754,'mileage':69}
{'vin':'LSJA0000000000091','createTime':1546667755,'mileage':70}
{'vin':'LSJA0000000000091','createTime':1546667756,'mileage':71}
{'vin':'LSJA0000000000091','createTime':1546667757,'mileage':72}
{'vin':'LSJA0000000000091','createTime':1546667758,'mileage':73}
{'vin':'LSJA0000000000091','createTime':1546667759,'mileage':74}
{'vin':'LSJA0000000000091','createTime':1546667760,'mileage':75}
{'vin':'LSJA0000000000091','createTime':1546667761,'mileage':76}
{'vin':'LSJA0000000000091','createTime':1546667762,'mileage':77}
{'vin':'LSJA0000000000091','createTime':1546667763,'mileage':78}
{'vin':'LSJA0000000000091','createTime':1546667764,'mileage':79}
{'vin':'LSJA0000000000091','createTime':1546667765,'mileage':80}
{'vin':'LSJA0000000000091','createTime':1546667766,'mileage':81}
{'vin':'LSJA0000000000091','createTime':1546667767,'mileage':82}
{'vin':'LSJA0000000000091','createTime':1546667768,'mileage':83}
{'vin':'LSJA0000000000091','createTime':1546667769,'mileage':84}
开车的时候, 从一个地方到另一个地方, 会有很多行程。
行程是一段一段的, 比如你开车送外卖, 每次停车后都要花 5 分钟把外卖送到客户手里, 你一上午送了 3 份外卖, 那么就产生了 3 个行程。每个行程之间的时间之差至少为 5 分钟, 如果 15 分钟你还没送到, 你就得主动打电话给客户说, 对不起, 我到你楼下了, 我先点”已送达“哈。
我们今天的任务就是根据传感器发送的数据划分行程。其实就是对这些数据进行分组, 同一个行程的分为一组。划分行程只需要考虑如下 3 种情况:
- (一)行程超时
- (二)行程正常结束
- (三)行程正在进行中
第一个,行程超时,开始了一个行程,但是结束行程的条件一直不满足,时间到了,这个行程还没有结束,这个时候直接返回这个行程。
第二个,行程正常结束,就是俩个信号的 createTime 之差大于 5 分钟, 也就是说,自上一条信号发送之后,已经有 5 分钟没有接收到新的数据了,那么就结束上一个行程,并开始下一个行程。
第三个,行程正在进行中,俩个信号的 createTime 之差小于 5 分钟,则认为行程还在继续,只需更新行程状态即可。
上面是用 Raku 模拟的数据, 每隔 1 秒 发送一条数据, 每隔 1 分钟”睡眠“一次, 睡眠的时候不发送数据:
# send-data-to-socket.pl6
sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {
my $vin = 'LSJA0000000000091';
my $last_meter = 0;
react {
whenever IO::Socket::Async.listen($host, $port) -> $conn {
react {
my Bool:D $ignore = True;
whenever Supply.interval(60).rotor(1, 1 => 1) {
$ignore = !$ignore;
}
whenever Supply.interval(1) {
next if $ignore;
print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
}
whenever signal(SIGINT) {
say "Done.";
done;
}
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
}
以下是使用 Structured Spark Streaming 实现的行程划分:
package ohmysummer
import com.alibaba.fastjson.JSON
import ohmysummer.conf.SocketConfiguration
import ohmysummer.model.SourceData
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.streaming.Duration
/**
* 使用 Structured Streaming 进行行程划分
* 问题:一个 sourceData 里面, 某 vin 会有俩个或俩个以上的行程吗?
* 超时时间为 15 分钟, 超时则返回 TripUpdate, 并将 status 置为 2
* 俩条数据之间的 createTime 之差如果大于 5 分钟, 则结束上一个行程, 并开始下一个行程
* 数据每 5 秒一个微批,可知一个微批里面, 某 vin 不会出现俩个以上的行程!
*
* 我们测试当然不用等那么久, ProcessingTime 即微批时间我们设置为 1s
* 俩条数据之间的 createTime 之差如果大于 1 分钟, 则结束上一个行程, 并开始下一个新的行程
* 超时时间设置为 3 分钟
*/
object SubTrip {
def main(args: Array[String]): Unit = {
val socketConf = new SocketConfiguration
val host = socketConf.host
val port = socketConf.port
val spark: SparkSession = SparkSession.builder
.master("local[2]")
.appName("Stateful Structured Streaming")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val ds: Dataset[String] = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
.as[String]
val events = ds.map { case line =>
val data = JSON.parseObject(line)
val vin = data.getString("vin")
val createTime = data.getLong("createTime")
val mileage = data.getLong("mileage")
// 将每行数据转换为 SourceData 数据源
SourceData(vin, createTime, mileage)
}
val tripIdleTimeout: Duration = Duration(3 * 60 * 1000) // 状态闲置时长,超时时间 3 分钟
val tripDuration: Duration = Duration(1 * 60 * 1000) // 两个行程间隔时长,1 分钟
val finish = events
.groupByKey(event => event.vin)
.mapGroupsWithState[TripInfo, TripUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
case (vin: String, source: Iterator[SourceData], state: GroupState[TripInfo]) =>
if (state.hasTimedOut && state.exists) { // 超时
TripUpdate(
vin,
state.get.tripStartTime,
state.get.tripEndTime,
state.get.startMileage,
state.get.endMileage,
state.get.tripDuration,
state.get.tripDistance,
tripStatus = 0 // 超时结束的行程
)
} else {
// Iterator 消费一次就没有了, 下面多次用到 source 源数据, 所以要转成 Seq,
// 否则第二个使用 source 的函数接受到的 data 就是空的了, 会报错
val data = source.toSeq
var lastTrip = getLastState(data, state) // 获取该车旧的行程状态
// 更新行程信息
if (state.exists) {
state.setTimeoutDuration(tripIdleTimeout.milliseconds)
val lastTripInfo = state.get // 获取旧的行程的状态
// 找到第一个 1 分钟未上传数据的点
data.map{ _.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds }.foreach(println(_))
val guard: Option[SourceData] = data.find(_.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds > 0)
guard match {
case Some(d) => { // 划分行程
println("开始划分新的行程了: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
initState(data, state) // 初始化一个新的行程
println("初始化新的行程后: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
println("lastTripInfo: ", lastTripInfo.tripStartTime, lastTripInfo.tripEndTime)
TripUpdate(
vin,
tripStartTime = lastTripInfo.tripStartTime,
tripEndTime = lastTripInfo.tripEndTime,
startMileage = lastTripInfo.startMileage,
endMileage = lastTripInfo.endMileage,
tripDuration = lastTripInfo.tripDuration,
tripDistance = lastTripInfo.tripDistance,
tripStatus = 1 // 正常结束的行程
)
}
case _ => {
val updatedTripInfo = getUpdateState(data, state)
state.update(updatedTripInfo)
TripUpdate(
vin,
tripStartTime = state.get.tripStartTime,
tripEndTime = state.get.tripEndTime,
startMileage = state.get.startMileage,
endMileage = state.get.endMileage,
tripDuration = state.get.tripDuration,
tripDistance = state.get.tripDistance,
tripStatus = 2 // 正常进行中的行程
)
}
}
} else { // state 不存在, 则为第一次新进来的数据, 那么初始化一个初始状态
initState(data, state)
println("initState: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
println("current State: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
val updatedTripInfo = TripInfo(
state.get.tripStartTime,
state.get.tripEndTime,
state.get.startMileage,
state.get.endMileage
)
state.update(updatedTripInfo)
TripUpdate(
vin,
tripStartTime = state.get.tripStartTime,
tripEndTime = state.get.tripEndTime,
startMileage = state.get.startMileage,
endMileage = state.get.endMileage,
tripDuration = state.get.tripDuration,
tripDistance = state.get.tripDistance,
tripStatus = 0 // 初始的行程
)
}
}
}
finish.writeStream
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("1 seconds"))
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
}
/**
* @param sourceData 新的源数据序列
* @param state 旧的状态
* @return
*/
def getUpdateState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
var tripStartTime: Long = 0
var tripEndTime: Long = 0
var startMileage: Long = 0
var endMileage: Long = 0
tripStartTime = state.get.tripStartTime
startMileage = state.get.startMileage
tripEndTime = sourceData.map(_.createTime).max // 更新 tripEndTime
endMileage = sourceData.map(_.mileage).max // 更新 endMileage
TripInfo(
tripStartTime,
tripEndTime,
startMileage,
endMileage
)
}
/**
* 获取上一个行程的状态
* @param sourceData 新摄入的源数据序列
* @param state 内存中上一个行程的状态
* @return 刷新后的行程信息
*/
def getLastState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
if (state.exists) {
state.get
} else { // 如果状态不存在, 则 sourceData 是新的源数据
val tripStartTime = sourceData.map(_.createTime).min
val startMileage = sourceData.map(_.mileage).min
TripInfo(
tripStartTime,
tripEndTime = tripStartTime, // 将新的源数据序列中的 createTime 最小值作为行程开始时间
startMileage,
endMileage = startMileage // 将新的源数据序列中的 mileage 最小值作为行程开始里程
)
}
}
/**
* 初始化下一个行程
* @param sourceData 新的源数据序列
* @param state 内存中的旧的 state
*/
def initState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): Unit = {
val tripStartTime = sourceData.map(_.createTime).min
val startMileage = sourceData.map(_.mileage).min
val initTripInfo = TripInfo(
tripStartTime,
tripEndTime = tripStartTime,
startMileage,
endMileage = startMileage
)
state.update(initTripInfo)
}
}
/**
* 行程信息
* @param tripStartTime 行程开始时间
* @param tripEndTime 行程结束时间
* @param startMileage 开始里程数
* @param endMileage 结束里程数
*/
case class TripInfo(
var tripStartTime: Long,
var tripEndTime: Long,
var startMileage: Long,
var endMileage: Long
) {
def tripDuration: Long = tripEndTime - tripStartTime
def tripDistance: Long = endMileage - startMileage
}
/**
* 更新后的行程
* @param vin 车架号
* @param tripStartTime 行程开始时间
* @param tripEndTime 行程结束时间
* @param startMileage 开始里程数
* @param endMileage 结束里程数
* @param tripDuration 行驶时长
* @param tripDistance 行驶距离
* @param tripStatus 行程状态
*/
case class TripUpdate(
vin: String,
var tripStartTime: Long,
var tripEndTime: Long,
var startMileage: Long,
var endMileage: Long,
var tripDuration: Long,
var tripDistance: Long,
var tripStatus: Int
)
SourceData 是一个 case class:
/**
* 数据源
* @param vin 车架号
* @param createTime 信号发生时间
* @param mileage 当前里程
*/
case class SourceData (
vin: String,
createTime: Long,
mileage: Long
)
和之前一样, 先执行:
raku send-data-to-socket.pl6
再启动该 structured spark streaming 程序:
#!/bin/sh
spark-submit \
--class ohmysummer.SubTrip \
--master local[2] \
--deploy-mode client \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 4 \
target/socket-structured-streaming-1.0-SNAPSHOT.jar
打印的日志如下:
2019-01-05 14:11:45 WARN ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1676 milliseconds
61000
(开始划分新的行程了: ,1546668643,179,1546668702,238)
(初始化新的行程后: ,1546668823,239,1546668823,239)
(lastTripInfo: ,1546668643,1546668702)
-------------------------------------------
Batch: 112
-------------------------------------------
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|vin |tripStartTime|tripEndTime|startMileage|endMileage|tripDuration|tripDistance|tripStatus|
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|LSJA0000000000091|1546668643 |1546668702 |179 |238 |59 |59 |1 |
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
源代码在这里。
下次再见,新年快乐!