使用 Flink 进行行程划分

准备

① 启动 Flink 本地模式

1
/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/start-cluster.sh
1
2
3
Starting cluster.
Starting standalonesession daemon on host summer.
Starting taskexecutor daemon on host summer.

② 模拟实时流数据

1
perl6 fake-streaming.pl6

该脚本每隔秒打印一行数据, 每隔 15 秒暂停打印, 然后在继续每隔一秒打印一行, 然后在 sleep 15 秒, 模拟行程间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {

my $vin = 'LSJA0000000000091';
my $last_meter = 0;

react {
whenever IO::Socket::Async.listen($host, $port) -> $conn {
react {
my Bool:D $ignore = True;

whenever Supply.interval(15).rotor(1, 1 => 1) {
$ignore = !$ignore;
}

whenever Supply.interval(1) {
next if $ignore;
print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
}

whenever signal(SIGINT) {
say "Done.";
done;
}
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
}

③ 运行 Flink 程序

1
flink run -c ohmysummer.YetAnotherSubTrip  ./target/sub-trip-1.0-SNAPSHOT.jar

④ 查看输出

1
tail -f /usr/local/Cellar/apache-flink/1.7.0/libexec/log/flink-ohmycloud-taskexecutor-0-summer.out

有个坑就是, 当我把 flink-ohmycloud-taskexecutor-0-summer.out 清空后, 再次运行 Flink 程序, 结果什么也不打印。因为更改了日志文件, Flink 可能不知道。重启一下 Flink cluster 就可以了:

1
/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/stop-cluster.sh

行程划分程序

⑤ SourceData

1
2
3
4
5
6
7
8
9
10
11
12
package ohmysummer.model

// 原始数据
case class SourceData (
vin: String, // 车架号
createTime: Long, // 信号发生时间
mileage: Long // 当前里程数
)

object SourceData {

}

⑥ YetAnotherSubTrip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package ohmysummer
import com.alibaba.fastjson.JSON
import ohmysummer.model.{SourceData, TripResult}
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object YetAnotherSubTrip extends App {
val port: Int = 3333
val host: String = "localhost"

// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// get input data by connecting to the socket
val text = env.socketTextStream(host, port, '\n')

// parse the data
val stream = text.map { l =>
val data = JSON.parseObject(l)
val vin = data.getString("vin")
val createTime = data.getLong("createTime")
val mileage = data.getLong("mileage")

SourceData(vin, createTime, mileage)
}

val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
val subTrip: DataStream[TripResult] = keyedSourceData
.flatMap(new SubTripFunction)

// print result stream to standard out
subTrip.print()
env.execute("Sub Trip")
}

class SubTripFunction extends RichFlatMapFunction[SourceData, TripResult] {

// state 句柄对象
private var lastTripState: ValueState[TripResult] = _

override def open(parameters: Configuration): Unit = {
// 创建 state 描述符
val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])

// 获取 state 句柄
lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
}

override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {

var lastState = getLastState(in, lastTripState)

// 从 state 中读取上一次行程
val lastTripEndTime = lastState.tripEndTime

// 检测是否需要进行行程划分
if ( (in.createTime - lastTripEndTime) > 10 ) {

lastState.tripStatus = "1"
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage

initState(in, lastTripState)

if (lastState.tripDistance >= 0.5) {
out.collect(lastState)
} else {
None
}
} else {
lastState = getUpdatedState(in, lastState)
this.lastTripState.update(lastState)
// out.collect(lastTripState.value())
None
}
}

def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
lastState.tripEndTime = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
lastState.tripStatus = "0" // 行程正在进行, 将这种类型的行程的状态置为 0
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
lastState
}


def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
if (state.value() != null) {
state.value()
} else {
TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
tripEndTime = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
startMileage = in.mileage, // 将新的源数据中的 mileage 作为行程的当前里程数
tripDistance = 0, // 行程刚开始, 将行程的行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
}
}

def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
val initTripState = TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
tripEndTime = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
startMileage = in.mileage, // 用新的源数据中的 mileage 初始化行程中的行程开始里程
tripDistance = 0, // 初始行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
this.lastTripState.update(initTripState)
}
}

完整项目在 GitHub, 欢迎 Star。