使用 Flink 重写项目
— 焉知非鱼使用 Flink 重写一下之前用 Spark Streaming 做的项目, 发现 Flink 在实时处理上比 Spark Streaming 友好。
结构 #
目录结构跟之前基本一样:
src
├── main
│ ├── java
│ ├── resources
│ │ └── application.conf
│ └── scala
│ ├── dataset
│ └── datastream
│ ├── com
│ │ └── gac
│ │ └── xs6
│ │ ├── BigdataApplication.scala
│ │ ├── conf
│ │ │ └── KafkaConfiguration.scala
│ │ ├── core
│ │ │ ├── Adapter.scala
│ │ │ └── impl
│ │ │ ├── AdapterImpl.scala
│ │ │ └── SubTripImpl.scala
│ │ ├── model
│ │ │ ├── SourceData.scala
│ │ │ └── TripResult.scala
│ │ ├── module
│ │ │ └── MainModule.scala
│ │ └── pipeline
│ │ └── GacSubTrip.scala
Flink 程序里面依旧使用了依赖注入。MainModule
object 里面的 configure
方法用于绑定接口到实现。kafkaDataSource
用于读取 Kafka 数据并返回一个 Datastream:
object MainModule extends AbstractModule {
override def configure(): Unit = {
bind(classOf[KafkaConfiguration]).asEagerSingleton()
bind(classOf[HbaseConfiguration]).asEagerSingleton()
bind(classOf[Adapter]).toInstance(AdapterImpl)
bind(classOf[SubTrip]).toInstance(SubTripImpl)
bind(classOf[BigdataApplication])
}
@Provides
@Singleton
def kafkaDataSource(kafkaConf: KafkaConfiguration): DataStream[String] = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaConf.bootstrapServers)
val myConsumer = new FlinkKafkaConsumer[String](kafkaConf.topic, new SimpleStringSchema(), properties)
// myConsumer.setStartFromEarliest() // 从开头开始消费
val stream = env.addSource(myConsumer)
stream
}
}
Adapter.scala
里面定义了两个 trait,其中 trait Adapter
用于把 Kafka 中的 JSON 字符串转为 DataStream[SourceData], 而 trait SubTrip
用于把 SourceData 流转为最终的 TripResult 流:
trait Adapter {
def extract(stream: DataStream[String]): DataStream[SourceData]
}
trait SubTrip {
def extract(stream: DataStream[SourceData]): DataStream[TripResult]
}
从 Kafka 接收数据之后, 在 Adapter里面做数据适配,返回 SourceData 流:
object AdapterImpl extends Adapter {
override def extract(stream: DataStream[String]): DataStream[SourceData] = {
synchronized {
stream.map { result =>
if (null != result) {
try {
val data = JSON.parseObject(result)
SourceData(
data.getString("vin"),
data.getLong("createTime"),
data.getLong("mileage")
)
} catch {
case e: Exception => println(e)
null
}
} else {
null
}
}
}
}
}
接着进行行程划分,这个就简单了:
object SubTripImpl extends SubTrip {
override def extract(stream: DataStream[SourceData]): DataStream[TripResult] = {
val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
val subTrip: DataStream[TripResult] = keyedSourceData
.flatMap(new GacSubTrip)
subTrip
}
}
使用 KeyedStream
对 vin进行分组, 行程划分的逻辑写到单独的 GacSubTrip
类中(这点比 Spark 友好):
class GacSubTrip extends RichFlatMapFunction[SourceData, TripResult] {
// state 句柄对象
private var lastTripState: ValueState[TripResult] = _
override def open(parameters: Configuration): Unit = {
// 创建 state 描述符
val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])
// 获取 state 句柄
lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
}
override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {
var lastState = getLastState(in, lastTripState)
// 从 state 中读取上一次行程
val lastTripEndTime = lastState.tripEndTime
// 检测是否需要进行行程划分
if ( (in.createTime - lastTripEndTime) > 10 ) {
lastState.tripStatus = "1"
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage
initState(in, lastTripState)
if (lastState.tripDistance >= 0.5) {
out.collect(lastState)
} else {
None
}
} else {
lastState = getUpdatedState(in, lastState)
this.lastTripState.update(lastState)
// out.collect(lastTripState.value())
None
}
}
def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
lastState.tripEndTime = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
lastState.tripStatus = "0" // 行程正在进行, 将这种类型的行程的状态置为 0
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
lastState
}
def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
if (state.value() != null) {
state.value()
} else {
TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
tripEndTime = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
startMileage = in.mileage, // 将新的源数据中的 mileage 作为行程的当前里程数
tripDistance = 0, // 行程刚开始, 将行程的行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
}
}
def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
val initTripState = TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
tripEndTime = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
startMileage = in.mileage, // 用新的源数据中的 mileage 初始化行程中的行程开始里程
tripDistance = 0, // 初始行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
this.lastTripState.update(initTripState)
}
}
主程序组装好各种流:
class BigdataApplication @Inject() (
source: DataStream[String],
kafkaConf: KafkaConfiguration,
adapter: Adapter,
trip: SubTrip
) extends Serializable {
private def createNewStreamingContext: StreamExecutionEnvironment = {
val sourceStream: DataStream[SourceData] = adapter.extract(source)
val tripStream: DataStream[TripResult] = trip.extract(sourceStream)
tripStream.print()
tripStream.executionEnvironment
}
def run(params: Params): Unit = {
val env: StreamExecutionEnvironment = createNewStreamingContext
env.execute("Start new streaming environment")
}
}
运行 #
flink run -c datastream.com.gac.xs6.BigdataApplication ./target/common-flink-1.0-SNAPSHOT.jar
但是 application.conf
这种配置文件还不知道怎么传递给 Flink。可能要使用 ParameterTool。Github 地址。