sub-trip-with-structured-spark-streaming

使用 Structured Spark Streaming 进行行程划分

我们有一辆车, 车上的传感器每隔 1 秒发出一条 JSON 格式的数据。每条数据包含 3 个字段:

field description type unit
vin 车辆唯一标识 String
createTime 信号发生时间 Long 毫秒
mileage 当前里程数 Int 千米

下面一段样本数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
{'vin':'LSJA0000000000091','createTime':1546667565,'mileage':0}
{'vin':'LSJA0000000000091','createTime':1546667566,'mileage':1}
{'vin':'LSJA0000000000091','createTime':1546667567,'mileage':2}
{'vin':'LSJA0000000000091','createTime':1546667568,'mileage':3}
{'vin':'LSJA0000000000091','createTime':1546667569,'mileage':4}
{'vin':'LSJA0000000000091','createTime':1546667570,'mileage':5}
{'vin':'LSJA0000000000091','createTime':1546667571,'mileage':6}
{'vin':'LSJA0000000000091','createTime':1546667572,'mileage':7}
{'vin':'LSJA0000000000091','createTime':1546667573,'mileage':8}
{'vin':'LSJA0000000000091','createTime':1546667574,'mileage':9}
{'vin':'LSJA0000000000091','createTime':1546667575,'mileage':10}
{'vin':'LSJA0000000000091','createTime':1546667576,'mileage':11}
{'vin':'LSJA0000000000091','createTime':1546667577,'mileage':12}
{'vin':'LSJA0000000000091','createTime':1546667578,'mileage':13}
{'vin':'LSJA0000000000091','createTime':1546667579,'mileage':14}
{'vin':'LSJA0000000000091','createTime':1546667580,'mileage':15}
{'vin':'LSJA0000000000091','createTime':1546667581,'mileage':16}
{'vin':'LSJA0000000000091','createTime':1546667582,'mileage':17}
{'vin':'LSJA0000000000091','createTime':1546667583,'mileage':18}
{'vin':'LSJA0000000000091','createTime':1546667584,'mileage':19}
{'vin':'LSJA0000000000091','createTime':1546667585,'mileage':20}
{'vin':'LSJA0000000000091','createTime':1546667586,'mileage':21}
{'vin':'LSJA0000000000091','createTime':1546667587,'mileage':22}
{'vin':'LSJA0000000000091','createTime':1546667588,'mileage':23}
{'vin':'LSJA0000000000091','createTime':1546667589,'mileage':24}
{'vin':'LSJA0000000000091','createTime':1546667590,'mileage':25}
{'vin':'LSJA0000000000091','createTime':1546667591,'mileage':26}
{'vin':'LSJA0000000000091','createTime':1546667592,'mileage':27}
{'vin':'LSJA0000000000091','createTime':1546667593,'mileage':28}
{'vin':'LSJA0000000000091','createTime':1546667594,'mileage':29}
{'vin':'LSJA0000000000091','createTime':1546667595,'mileage':30}
{'vin':'LSJA0000000000091','createTime':1546667596,'mileage':31}
{'vin':'LSJA0000000000091','createTime':1546667597,'mileage':32}
{'vin':'LSJA0000000000091','createTime':1546667598,'mileage':33}
{'vin':'LSJA0000000000091','createTime':1546667599,'mileage':34}
{'vin':'LSJA0000000000091','createTime':1546667600,'mileage':35}
{'vin':'LSJA0000000000091','createTime':1546667601,'mileage':36}
{'vin':'LSJA0000000000091','createTime':1546667602,'mileage':37}
{'vin':'LSJA0000000000091','createTime':1546667603,'mileage':38}
{'vin':'LSJA0000000000091','createTime':1546667604,'mileage':39}
{'vin':'LSJA0000000000091','createTime':1546667605,'mileage':40}
{'vin':'LSJA0000000000091','createTime':1546667606,'mileage':41}
{'vin':'LSJA0000000000091','createTime':1546667607,'mileage':42}
{'vin':'LSJA0000000000091','createTime':1546667608,'mileage':43}
{'vin':'LSJA0000000000091','createTime':1546667609,'mileage':44}
{'vin':'LSJA0000000000091','createTime':1546667610,'mileage':45}
{'vin':'LSJA0000000000091','createTime':1546667611,'mileage':46}
{'vin':'LSJA0000000000091','createTime':1546667612,'mileage':47}
{'vin':'LSJA0000000000091','createTime':1546667613,'mileage':48}
{'vin':'LSJA0000000000091','createTime':1546667614,'mileage':49}
{'vin':'LSJA0000000000091','createTime':1546667615,'mileage':50}
{'vin':'LSJA0000000000091','createTime':1546667616,'mileage':51}
{'vin':'LSJA0000000000091','createTime':1546667617,'mileage':52}
{'vin':'LSJA0000000000091','createTime':1546667618,'mileage':53}
{'vin':'LSJA0000000000091','createTime':1546667619,'mileage':54}
{'vin':'LSJA0000000000091','createTime':1546667620,'mileage':55}
{'vin':'LSJA0000000000091','createTime':1546667621,'mileage':56}
{'vin':'LSJA0000000000091','createTime':1546667622,'mileage':57}
{'vin':'LSJA0000000000091','createTime':1546667623,'mileage':58}
{'vin':'LSJA0000000000091','createTime':1546667624,'mileage':59}



{'vin':'LSJA0000000000091','createTime':1546667745,'mileage':60}
{'vin':'LSJA0000000000091','createTime':1546667746,'mileage':61}
{'vin':'LSJA0000000000091','createTime':1546667747,'mileage':62}
{'vin':'LSJA0000000000091','createTime':1546667748,'mileage':63}
{'vin':'LSJA0000000000091','createTime':1546667749,'mileage':64}
{'vin':'LSJA0000000000091','createTime':1546667750,'mileage':65}
{'vin':'LSJA0000000000091','createTime':1546667751,'mileage':66}
{'vin':'LSJA0000000000091','createTime':1546667752,'mileage':67}
{'vin':'LSJA0000000000091','createTime':1546667753,'mileage':68}
{'vin':'LSJA0000000000091','createTime':1546667754,'mileage':69}
{'vin':'LSJA0000000000091','createTime':1546667755,'mileage':70}
{'vin':'LSJA0000000000091','createTime':1546667756,'mileage':71}
{'vin':'LSJA0000000000091','createTime':1546667757,'mileage':72}
{'vin':'LSJA0000000000091','createTime':1546667758,'mileage':73}
{'vin':'LSJA0000000000091','createTime':1546667759,'mileage':74}
{'vin':'LSJA0000000000091','createTime':1546667760,'mileage':75}
{'vin':'LSJA0000000000091','createTime':1546667761,'mileage':76}
{'vin':'LSJA0000000000091','createTime':1546667762,'mileage':77}
{'vin':'LSJA0000000000091','createTime':1546667763,'mileage':78}
{'vin':'LSJA0000000000091','createTime':1546667764,'mileage':79}
{'vin':'LSJA0000000000091','createTime':1546667765,'mileage':80}
{'vin':'LSJA0000000000091','createTime':1546667766,'mileage':81}
{'vin':'LSJA0000000000091','createTime':1546667767,'mileage':82}
{'vin':'LSJA0000000000091','createTime':1546667768,'mileage':83}
{'vin':'LSJA0000000000091','createTime':1546667769,'mileage':84}

开车的时候, 从一个地方到另一个地方, 会有很多行程

行程是一段一段的, 比如你开车送外卖, 每次停车后都要花 5 分钟把外卖送到客户手里, 你一上午送了 3 份外卖, 那么就产生了 3 个行程。每个行程之间的时间之差至少为 5 分钟, 如果 15 分钟你还没送到, 你就得主动打电话给客户说, 对不起, 我到你楼下了, 我先点”已送达“哈。

我们今天的任务就是根据传感器发送的数据划分行程。其实就是对这些数据进行分组, 同一个行程的分为一组。划分行程只需要考虑如下 3 种情况:

  • (一)行程超时
  • (二)行程正常结束
  • (三)行程正在进行中

第一个,行程超时,开始了一个行程,但是结束行程的条件一直不满足,时间到了,这个行程还没有结束,这个时候直接返回这个行程。
第二个,行程正常结束,就是俩个信号的 createTime 之差大于 5 分钟, 也就是说,自上一条信号发送之后,已经有 5 分钟没有接收到新的数据了,那么就结束上一个行程,并开始下一个行程。
第三个,行程正在进行中,俩个信号的 createTime 之差小于 5 分钟,则认为行程还在继续,只需更新行程状态即可。

上面是用 Perl 6 模拟的数据, 每隔 1 秒 发送一条数据, 每隔 1 分钟”睡眠“一次, 睡眠的时候不发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# send-data-to-socket.pl6
sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {

my $vin = 'LSJA0000000000091';
my $last_meter = 0;

react {
whenever IO::Socket::Async.listen($host, $port) -> $conn {
react {
my Bool:D $ignore = True;

whenever Supply.interval(60).rotor(1, 1 => 1) {
$ignore = !$ignore;
}

whenever Supply.interval(1) {
next if $ignore;
print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
}

whenever signal(SIGINT) {
say "Done.";
done;
}
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
}

以下是使用 Structured Spark Streaming 实现的行程划分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package ohmysummer

import com.alibaba.fastjson.JSON
import ohmysummer.conf.SocketConfiguration
import ohmysummer.model.SourceData
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.streaming.Duration


/**
* 使用 Structured Streaming 进行行程划分
* 问题:一个 sourceData 里面, 某 vin 会有俩个或俩个以上的行程吗?
* 超时时间为 15 分钟, 超时则返回 TripUpdate, 并将 status 置为 2
* 俩条数据之间的 createTime 之差如果大于 5 分钟, 则结束上一个行程, 并开始下一个行程
* 数据每 5 秒一个微批,可知一个微批里面, 某 vin 不会出现俩个以上的行程!
*
* 我们测试当然不用等那么久, ProcessingTime 即微批时间我们设置为 1s
* 俩条数据之间的 createTime 之差如果大于 1 分钟, 则结束上一个行程, 并开始下一个新的行程
* 超时时间设置为 3 分钟
*/
object SubTrip {

def main(args: Array[String]): Unit = {

val socketConf = new SocketConfiguration
val host = socketConf.host
val port = socketConf.port

val spark: SparkSession = SparkSession.builder
.master("local[2]")
.appName("Stateful Structured Streaming")
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

import spark.implicits._

val ds: Dataset[String] = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
.as[String]

val events = ds.map { case line =>

val data = JSON.parseObject(line)
val vin = data.getString("vin")
val createTime = data.getLong("createTime")
val mileage = data.getLong("mileage")

// 将每行数据转换为 SourceData 数据源
SourceData(vin, createTime, mileage)
}

val tripIdleTimeout: Duration = Duration(3 * 60 * 1000) // 状态闲置时长,超时时间 3 分钟
val tripDuration: Duration = Duration(1 * 60 * 1000) // 两个行程间隔时长,1 分钟

val finish = events
.groupByKey(event => event.vin)
.mapGroupsWithState[TripInfo, TripUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

case (vin: String, source: Iterator[SourceData], state: GroupState[TripInfo]) =>
if (state.hasTimedOut && state.exists) { // 超时
TripUpdate(
vin,
state.get.tripStartTime,
state.get.tripEndTime,
state.get.startMileage,
state.get.endMileage,
state.get.tripDuration,
state.get.tripDistance,
tripStatus = 0 // 超时结束的行程
)
} else {
// Iterator 消费一次就没有了, 下面多次用到 source 源数据, 所以要转成 Seq,
// 否则第二个使用 source 的函数接受到的 data 就是空的了, 会报错
val data = source.toSeq
var lastTrip = getLastState(data, state) // 获取该车旧的行程状态

// 更新行程信息
if (state.exists) {
state.setTimeoutDuration(tripIdleTimeout.milliseconds)
val lastTripInfo = state.get // 获取旧的行程的状态

// 找到第一个 1 分钟未上传数据的点
data.map{ _.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds }.foreach(println(_))
val guard: Option[SourceData] = data.find(_.createTime * 1000L - lastTrip.tripEndTime * 1000L - tripDuration.milliseconds > 0)

guard match {
case Some(d) => { // 划分行程
println("开始划分新的行程了: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
initState(data, state) // 初始化一个新的行程
println("初始化新的行程后: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)

println("lastTripInfo: ", lastTripInfo.tripStartTime, lastTripInfo.tripEndTime)

TripUpdate(
vin,
tripStartTime = lastTripInfo.tripStartTime,
tripEndTime = lastTripInfo.tripEndTime,
startMileage = lastTripInfo.startMileage,
endMileage = lastTripInfo.endMileage,
tripDuration = lastTripInfo.tripDuration,
tripDistance = lastTripInfo.tripDistance,
tripStatus = 1 // 正常结束的行程
)

}

case _ => {
val updatedTripInfo = getUpdateState(data, state)
state.update(updatedTripInfo)
TripUpdate(
vin,
tripStartTime = state.get.tripStartTime,
tripEndTime = state.get.tripEndTime,
startMileage = state.get.startMileage,
endMileage = state.get.endMileage,
tripDuration = state.get.tripDuration,
tripDistance = state.get.tripDistance,
tripStatus = 2 // 正常进行中的行程
)
}
}

} else { // state 不存在, 则为第一次新进来的数据, 那么初始化一个初始状态
initState(data, state)
println("initState: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
println("current State: ", state.get.tripStartTime, state.get.startMileage, state.get.tripEndTime, state.get.endMileage)
val updatedTripInfo = TripInfo(
state.get.tripStartTime,
state.get.tripEndTime,
state.get.startMileage,
state.get.endMileage
)

state.update(updatedTripInfo)

TripUpdate(
vin,
tripStartTime = state.get.tripStartTime,
tripEndTime = state.get.tripEndTime,
startMileage = state.get.startMileage,
endMileage = state.get.endMileage,
tripDuration = state.get.tripDuration,
tripDistance = state.get.tripDistance,
tripStatus = 0 // 初始的行程
)
}
}
}

finish.writeStream
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("1 seconds"))
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
}

/**
* @param sourceData 新的源数据序列
* @param state 旧的状态
* @return
*/
def getUpdateState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
var tripStartTime: Long = 0
var tripEndTime: Long = 0
var startMileage: Long = 0
var endMileage: Long = 0

tripStartTime = state.get.tripStartTime
startMileage = state.get.startMileage
tripEndTime = sourceData.map(_.createTime).max // 更新 tripEndTime
endMileage = sourceData.map(_.mileage).max // 更新 endMileage

TripInfo(
tripStartTime,
tripEndTime,
startMileage,
endMileage
)
}

/**
* 获取上一个行程的状态
* @param sourceData 新摄入的源数据序列
* @param state 内存中上一个行程的状态
* @return 刷新后的行程信息
*/
def getLastState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): TripInfo = {
if (state.exists) {
state.get
} else { // 如果状态不存在, 则 sourceData 是新的源数据

val tripStartTime = sourceData.map(_.createTime).min
val startMileage = sourceData.map(_.mileage).min

TripInfo(
tripStartTime,
tripEndTime = tripStartTime, // 将新的源数据序列中的 createTime 最小值作为行程开始时间
startMileage,
endMileage = startMileage // 将新的源数据序列中的 mileage 最小值作为行程开始里程
)
}
}

/**
* 初始化下一个行程
* @param sourceData 新的源数据序列
* @param state 内存中的旧的 state
*/
def initState(sourceData: Seq[SourceData], state: GroupState[TripInfo]): Unit = {

val tripStartTime = sourceData.map(_.createTime).min
val startMileage = sourceData.map(_.mileage).min

val initTripInfo = TripInfo(
tripStartTime,
tripEndTime = tripStartTime,
startMileage,
endMileage = startMileage
)
state.update(initTripInfo)
}
}


/**
* 行程信息
* @param tripStartTime 行程开始时间
* @param tripEndTime 行程结束时间
* @param startMileage 开始里程数
* @param endMileage 结束里程数
*/
case class TripInfo(
var tripStartTime: Long,
var tripEndTime: Long,
var startMileage: Long,
var endMileage: Long
) {
def tripDuration: Long = tripEndTime - tripStartTime
def tripDistance: Long = endMileage - startMileage
}

/**
* 更新后的行程
* @param vin 车架号
* @param tripStartTime 行程开始时间
* @param tripEndTime 行程结束时间
* @param startMileage 开始里程数
* @param endMileage 结束里程数
* @param tripDuration 行驶时长
* @param tripDistance 行驶距离
* @param tripStatus 行程状态
*/
case class TripUpdate(
vin: String,
var tripStartTime: Long,
var tripEndTime: Long,
var startMileage: Long,
var endMileage: Long,
var tripDuration: Long,
var tripDistance: Long,
var tripStatus: Int
)

SourceData 是一个 case class:

1
2
3
4
5
6
7
8
9
10
11
/**
* 数据源
* @param vin 车架号
* @param createTime 信号发生时间
* @param mileage 当前里程
*/
case class SourceData (
vin: String,
createTime: Long,
mileage: Long
)

之前一样, 先执行:

1
perl6 send-data-to-socket.pl6

再启动该 structured spark streaming 程序:

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/sh

spark-submit \
--class ohmysummer.SubTrip \
--master local[2] \
--deploy-mode client \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 4 \
target/socket-structured-streaming-1.0-SNAPSHOT.jar

打印的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
2019-01-05 14:11:45 WARN  ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1676 milliseconds
61000
(开始划分新的行程了: ,1546668643,179,1546668702,238)
(初始化新的行程后: ,1546668823,239,1546668823,239)
(lastTripInfo: ,1546668643,1546668702)
-------------------------------------------
Batch: 112
-------------------------------------------
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|vin |tripStartTime|tripEndTime|startMileage|endMileage|tripDuration|tripDistance|tripStatus|
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+
|LSJA0000000000091|1546668643 |1546668702 |179 |238 |59 |59 |1 |
+-----------------+-------------+-----------+------------+----------+------------+------------+----------+

源代码在这里

下次再见,新年快乐!