Wait the light to fall

使用 Flink 进行行程划分

焉知非鱼

准备 #

① 启动 Flink 本地模式

/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host summer.
Starting taskexecutor daemon on host summer.

② 模拟实时流数据

raku fake-streaming.pl6 

该脚本每隔秒打印一行数据, 每隔 15 秒暂停打印, 然后在继续每隔一秒打印一行, 然后在 sleep 15 秒, 模拟行程间隔。

sub MAIN(Str :$host = '0.0.0.0', Int :$port = 3333) {

    my $vin = 'LSJA0000000000091';
    my $last_meter = 0;

    react {
        whenever IO::Socket::Async.listen($host, $port) -> $conn {
            react {
                my Bool:D $ignore = True;

                whenever Supply.interval(15).rotor(1, 1 => 1) {
                    $ignore = !$ignore;
                }

                whenever Supply.interval(1) {
                    next if $ignore;
                    print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                    $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
                }

                whenever signal(SIGINT) {
                    say "Done.";
                    done;
                }
            }
        }
        CATCH {
            default {
                say .^name, ': ', .Str;
                say "handled in $?LINE";
            }
        }
    }
}

③ 运行 Flink 程序

flink run -c ohmysummer.YetAnotherSubTrip  ./target/sub-trip-1.0-SNAPSHOT.jar

④ 查看输出

tail -f /usr/local/Cellar/apache-flink/1.7.0/libexec/log/flink-ohmycloud-taskexecutor-0-summer.out

有个坑就是, 当我把 flink-ohmycloud-taskexecutor-0-summer.out 清空后, 再次运行 Flink 程序, 结果什么也不打印。因为更改了日志文件, Flink 可能不知道。重启一下 Flink cluster 就可以了:

/usr/local/Cellar/apache-flink/1.7.0/libexec/bin/stop-cluster.sh 

行程划分程序 #

⑤ SourceData

package ohmysummer.model

// 原始数据
case class SourceData (
   vin:        String, // 车架号
   createTime: Long,   // 信号发生时间
   mileage:    Long    // 当前里程数
)

object SourceData {

}

⑥ YetAnotherSubTrip

package ohmysummer
import com.alibaba.fastjson.JSON
import ohmysummer.model.{SourceData, TripResult}
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object YetAnotherSubTrip extends App {
  val port: Int = 3333
  val host: String = "localhost"

  // get the execution environment
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  // get input data by connecting to the socket
  val text = env.socketTextStream(host, port, '\n')

  // parse the data
  val stream = text.map { l =>
    val data = JSON.parseObject(l)
    val vin = data.getString("vin")
    val createTime = data.getLong("createTime")
    val mileage = data.getLong("mileage")

    SourceData(vin, createTime, mileage)
  }

  val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
  val subTrip: DataStream[TripResult] = keyedSourceData
    .flatMap(new SubTripFunction)

  // print result stream to standard out
  subTrip.print()
  env.execute("Sub Trip")
}

class SubTripFunction extends RichFlatMapFunction[SourceData, TripResult] {

  // state 句柄对象
  private var lastTripState: ValueState[TripResult] = _

  override def open(parameters: Configuration): Unit = {
    // 创建 state 描述符
    val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])

    // 获取 state 句柄
    lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
  }

  override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {

    var lastState = getLastState(in, lastTripState)

    // 从 state 中读取上一次行程
    val lastTripEndTime = lastState.tripEndTime

    // 检测是否需要进行行程划分
    if ( (in.createTime - lastTripEndTime)  > 10 ) {

      lastState.tripStatus = "1"
      val endMileage: Long = in.mileage
      lastState.tripDistance = endMileage - lastState.startMileage

      initState(in, lastTripState)

      if (lastState.tripDistance >= 0.5) {
        out.collect(lastState)
      } else {
        None
      }
    } else {
      lastState = getUpdatedState(in, lastState)
      this.lastTripState.update(lastState)
      // out.collect(lastTripState.value())
      None
    }
  }

  def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
    lastState.tripEndTime  = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
    lastState.tripStatus   = "0"           // 行程正在进行, 将这种类型的行程的状态置为 0
    val endMileage: Long   = in.mileage
    lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
    lastState
  }


  def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
    if (state.value() != null) {
      state.value()
    } else {
      TripResult(
        vin           = in.vin,
        tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
        tripEndTime   = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
        startMileage  = in.mileage,    // 将新的源数据中的 mileage 作为行程的当前里程数
        tripDistance  = 0,             // 行程刚开始, 将行程的行驶距离置为 0
        tripStatus    = "0"            // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
      )
    }
  }

  def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
    val initTripState = TripResult(
      vin           = in.vin,
      tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
      tripEndTime   = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
      startMileage  = in.mileage,    // 用新的源数据中的 mileage 初始化行程中的行程开始里程
      tripDistance  = 0,             // 初始行驶距离置为 0
      tripStatus    = "0"            // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
    )
    this.lastTripState.update(initTripState)
  }
}

完整项目在 GitHub, 欢迎 Star。