使用 Flink 进行行程划分
— 焉知非鱼准备 #
① 启动 Flink 本地模式
/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host summer.
Starting taskexecutor daemon on host summer.
② 模拟实时流数据
raku fake-streaming.pl6
该脚本每隔秒打印一行数据, 每隔 15 秒暂停打印, 然后在继续每隔一秒打印一行, 然后在 sleep 15 秒, 模拟行程间隔。
sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {
my $vin = 'LSJA0000000000091';
my $last_meter = 0;
react {
whenever IO::Socket::Async.listen($host, $port) -> $conn {
react {
my Bool:D $ignore = True;
whenever Supply.interval(15).rotor(1, 1 => 1) {
$ignore = !$ignore;
}
whenever Supply.interval(1) {
next if $ignore;
print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
}
whenever signal(SIGINT) {
say "Done.";
done;
}
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
}
③ 运行 Flink 程序
flink run -c ohmysummer.YetAnotherSubTrip ./target/sub-trip-1.0-SNAPSHOT.jar
④ 查看输出
tail -f /usr/local/Cellar/apache-flink/1.7.0/libexec/log/flink-ohmycloud-taskexecutor-0-summer.out
有个坑就是, 当我把 flink-ohmycloud-taskexecutor-0-summer.out 清空后, 再次运行 Flink 程序, 结果什么也不打印。因为更改了日志文件, Flink 可能不知道。重启一下 Flink cluster 就可以了:
/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/stop-cluster.sh
行程划分程序 #
⑤ SourceData
package ohmysummer.model
// 原始数据
case class SourceData (
vin: String, // 车架号
createTime: Long, // 信号发生时间
mileage: Long // 当前里程数
)
object SourceData {
}
⑥ YetAnotherSubTrip
package ohmysummer
import com.alibaba.fastjson.JSON
import ohmysummer.model.{SourceData, TripResult}
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object YetAnotherSubTrip extends App {
val port: Int = 3333
val host: String = "localhost"
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream(host, port, '\n')
// parse the data
val stream = text.map { l =>
val data = JSON.parseObject(l)
val vin = data.getString("vin")
val createTime = data.getLong("createTime")
val mileage = data.getLong("mileage")
SourceData(vin, createTime, mileage)
}
val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
val subTrip: DataStream[TripResult] = keyedSourceData
.flatMap(new SubTripFunction)
// print result stream to standard out
subTrip.print()
env.execute("Sub Trip")
}
class SubTripFunction extends RichFlatMapFunction[SourceData, TripResult] {
// state 句柄对象
private var lastTripState: ValueState[TripResult] = _
override def open(parameters: Configuration): Unit = {
// 创建 state 描述符
val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])
// 获取 state 句柄
lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
}
override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {
var lastState = getLastState(in, lastTripState)
// 从 state 中读取上一次行程
val lastTripEndTime = lastState.tripEndTime
// 检测是否需要进行行程划分
if ( (in.createTime - lastTripEndTime) > 10 ) {
lastState.tripStatus = "1"
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage
initState(in, lastTripState)
if (lastState.tripDistance >= 0.5) {
out.collect(lastState)
} else {
None
}
} else {
lastState = getUpdatedState(in, lastState)
this.lastTripState.update(lastState)
// out.collect(lastTripState.value())
None
}
}
def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
lastState.tripEndTime = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
lastState.tripStatus = "0" // 行程正在进行, 将这种类型的行程的状态置为 0
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
lastState
}
def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
if (state.value() != null) {
state.value()
} else {
TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
tripEndTime = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
startMileage = in.mileage, // 将新的源数据中的 mileage 作为行程的当前里程数
tripDistance = 0, // 行程刚开始, 将行程的行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
}
}
def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
val initTripState = TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
tripEndTime = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
startMileage = in.mileage, // 用新的源数据中的 mileage 初始化行程中的行程开始里程
tripDistance = 0, // 初始行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
this.lastTripState.update(initTripState)
}
}
完整项目在 GitHub, 欢迎 Star。