Wait the light to fall

使用 Flink 重写项目

焉知非鱼

使用 Flink 重写一下之前用 Spark Streaming 做的项目, 发现 Flink 在实时处理上比 Spark Streaming 友好。

结构 #

目录结构跟之前基本一样:

src
├── main
│   ├── java
│   ├── resources
│   │   └── application.conf
│   └── scala
│       ├── dataset
│       └── datastream
│           ├── com
│           │   └── gac
│           │       └── xs6
│           │           ├── BigdataApplication.scala
│           │           ├── conf
│           │           │   └── KafkaConfiguration.scala
│           │           ├── core
│           │           │   ├── Adapter.scala
│           │           │   └── impl
│           │           │       ├── AdapterImpl.scala
│           │           │       └── SubTripImpl.scala
│           │           ├── model
│           │           │   ├── SourceData.scala
│           │           │   └── TripResult.scala
│           │           ├── module
│           │           │   └── MainModule.scala
│           │           └── pipeline
│           │               └── GacSubTrip.scala

Flink 程序里面依旧使用了依赖注入。MainModule object 里面的 configure 方法用于绑定接口到实现。kafkaDataSource 用于读取 Kafka 数据并返回一个 Datastream:

object MainModule extends AbstractModule  {

  override def configure(): Unit = {
    bind(classOf[KafkaConfiguration]).asEagerSingleton()
    bind(classOf[HbaseConfiguration]).asEagerSingleton()
    bind(classOf[Adapter]).toInstance(AdapterImpl)
    bind(classOf[SubTrip]).toInstance(SubTripImpl)
    bind(classOf[BigdataApplication])
  }

  @Provides
  @Singleton
  def kafkaDataSource(kafkaConf: KafkaConfiguration): DataStream[String] = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaConf.bootstrapServers)

    val myConsumer = new FlinkKafkaConsumer[String](kafkaConf.topic, new SimpleStringSchema(), properties)
   // myConsumer.setStartFromEarliest() // 从开头开始消费

    val stream = env.addSource(myConsumer)

    stream
  }
}

Adapter.scala 里面定义了两个 trait,其中 trait Adapter 用于把 Kafka 中的 JSON 字符串转为 DataStream[SourceData], 而 trait SubTrip 用于把 SourceData 流转为最终的 TripResult 流:

trait Adapter {
  def extract(stream: DataStream[String]): DataStream[SourceData]
}

trait SubTrip {
  def extract(stream: DataStream[SourceData]): DataStream[TripResult]
}

从 Kafka 接收数据之后, 在 Adapter里面做数据适配,返回 SourceData 流:

object AdapterImpl extends Adapter {
  override def extract(stream: DataStream[String]): DataStream[SourceData] = {
    synchronized {
      stream.map { result =>
        if (null != result) {
          try {
            val data = JSON.parseObject(result)
            SourceData(
              data.getString("vin"),
              data.getLong("createTime"),
              data.getLong("mileage")
            )
          } catch {
            case e: Exception => println(e)
            null
          }
        } else {
          null
        }
      }
    }
  }
}

接着进行行程划分,这个就简单了:

object SubTripImpl extends SubTrip {
  override def extract(stream: DataStream[SourceData]): DataStream[TripResult] = {
    val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
    val subTrip: DataStream[TripResult] = keyedSourceData
      .flatMap(new GacSubTrip)
    subTrip
  }
}

使用 KeyedStream 对 vin进行分组, 行程划分的逻辑写到单独的 GacSubTrip 类中(这点比 Spark 友好):

class GacSubTrip extends RichFlatMapFunction[SourceData, TripResult] {

  // state 句柄对象
  private var lastTripState: ValueState[TripResult] = _

  override def open(parameters: Configuration): Unit = {
    // 创建 state 描述符
    val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])

    // 获取 state 句柄
    lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
  }

  override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {

    var lastState = getLastState(in, lastTripState)

    // 从 state 中读取上一次行程
    val lastTripEndTime = lastState.tripEndTime

    // 检测是否需要进行行程划分
    if ( (in.createTime - lastTripEndTime)  > 10 ) {

      lastState.tripStatus = "1"
      val endMileage: Long = in.mileage
      lastState.tripDistance = endMileage - lastState.startMileage

      initState(in, lastTripState)

      if (lastState.tripDistance >= 0.5) {
        out.collect(lastState)
      } else {
        None
      }
    } else {
      lastState = getUpdatedState(in, lastState)
      this.lastTripState.update(lastState)
      // out.collect(lastTripState.value())
      None
    }
  }

  def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
    lastState.tripEndTime  = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
    lastState.tripStatus   = "0"           // 行程正在进行, 将这种类型的行程的状态置为 0
    val endMileage: Long   = in.mileage
    lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
    lastState
  }


  def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
    if (state.value() != null) {
      state.value()
    } else {
      TripResult(
        vin           = in.vin,
        tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
        tripEndTime   = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
        startMileage  = in.mileage,    // 将新的源数据中的 mileage 作为行程的当前里程数
        tripDistance  = 0,  // 行程刚开始, 将行程的行驶距离置为 0
        tripStatus    = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
      )
    }
  }

  def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
    val initTripState = TripResult(
      vin           = in.vin,
      tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
      tripEndTime   = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
      startMileage  = in.mileage,    // 用新的源数据中的 mileage 初始化行程中的行程开始里程
      tripDistance  = 0,  // 初始行驶距离置为 0
      tripStatus    = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
    )
    this.lastTripState.update(initTripState)
  }
}

主程序组装好各种流:

class BigdataApplication @Inject() (
                                   source: DataStream[String],
                                   kafkaConf: KafkaConfiguration,
                                   adapter: Adapter,
                                   trip: SubTrip
                                   ) extends Serializable {
  private def createNewStreamingContext: StreamExecutionEnvironment = {
    val sourceStream: DataStream[SourceData] = adapter.extract(source)
    val tripStream: DataStream[TripResult] = trip.extract(sourceStream)
    tripStream.print()
    tripStream.executionEnvironment
  }

  def run(params: Params): Unit = {
    val env: StreamExecutionEnvironment = createNewStreamingContext
    env.execute("Start new streaming environment")
  }
}

运行 #

flink run -c datastream.com.gac.xs6.BigdataApplication  ./target/common-flink-1.0-SNAPSHOT.jar

但是 application.conf 这种配置文件还不知道怎么传递给 Flink。可能要使用 ParameterToolGithub 地址。