使用 Flink 重写项目

使用 Flink 重写一下之前用 Spark Streaming 做的项目, 发现 Flink 在实时处理上比 Spark Streaming 友好。

结构

目录结构跟之前基本一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
src
├── main
│   ├── java
│   ├── resources
│   │   └── application.conf
│   └── scala
│   ├── dataset
│   └── datastream
│   ├── com
│   │   └── gac
│   │   └── xs6
│   │   ├── BigdataApplication.scala
│   │   ├── conf
│   │   │   └── KafkaConfiguration.scala
│   │   ├── core
│   │   │   ├── Adapter.scala
│   │   │   └── impl
│   │   │   ├── AdapterImpl.scala
│   │   │   └── SubTripImpl.scala
│   │   ├── model
│   │   │   ├── SourceData.scala
│   │   │   └── TripResult.scala
│   │   ├── module
│   │   │   └── MainModule.scala
│   │   └── pipeline
│   │   └── GacSubTrip.scala

Flink 程序里面依旧使用了依赖注入。MainModule object 里面的 configure 方法用于绑定接口到实现。kafkaDataSource 用于读取 Kafka 数据并返回一个 Datastream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object MainModule extends AbstractModule  {

override def configure(): Unit = {
bind(classOf[KafkaConfiguration]).asEagerSingleton()
bind(classOf[HbaseConfiguration]).asEagerSingleton()
bind(classOf[Adapter]).toInstance(AdapterImpl)
bind(classOf[SubTrip]).toInstance(SubTripImpl)
bind(classOf[BigdataApplication])
}

@Provides
@Singleton
def kafkaDataSource(kafkaConf: KafkaConfiguration): DataStream[String] = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaConf.bootstrapServers)

val myConsumer = new FlinkKafkaConsumer[String](kafkaConf.topic, new SimpleStringSchema(), properties)
// myConsumer.setStartFromEarliest() // 从开头开始消费

val stream = env.addSource(myConsumer)

stream
}
}

Adapter.scala 里面定义了两个 trait,其中 trait Adapter 用于把 Kafka 中的 JSON 字符串转为 DataStream[SourceData], 而 trait SubTrip 用于把 SourceData 流转为最终的 *TripResult 流:*

1
2
3
4
5
6
7
trait Adapter {
def extract(stream: DataStream[String]): DataStream[SourceData]
}

trait SubTrip {
def extract(stream: DataStream[SourceData]): DataStream[TripResult]
}

从 Kafka 接收数据之后, 在 Adapter里面做数据适配,返回 SourceData 流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object AdapterImpl extends Adapter {
override def extract(stream: DataStream[String]): DataStream[SourceData] = {
synchronized {
stream.map { result =>
if (null != result) {
try {
val data = JSON.parseObject(result)
SourceData(
data.getString("vin"),
data.getLong("createTime"),
data.getLong("mileage")
)
} catch {
case e: Exception => println(e)
null
}
} else {
null
}
}
}
}
}

接着进行行程划分,这个就简单了:

1
2
3
4
5
6
7
8
object SubTripImpl extends SubTrip {
override def extract(stream: DataStream[SourceData]): DataStream[TripResult] = {
val keyedSourceData: KeyedStream[SourceData, String] = stream.keyBy(_.vin)
val subTrip: DataStream[TripResult] = keyedSourceData
.flatMap(new GacSubTrip)
subTrip
}
}

使用 KeyedStream 对 vin进行分组, 行程划分的逻辑写到单独的 GacSubTrip 类中(这点比 Spark 友好):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class GacSubTrip extends RichFlatMapFunction[SourceData, TripResult] {

// state 句柄对象
private var lastTripState: ValueState[TripResult] = _

override def open(parameters: Configuration): Unit = {
// 创建 state 描述符
val lastTripDescriptor = new ValueStateDescriptor[TripResult]("lastTrip", classOf[TripResult])

// 获取 state 句柄
lastTripState = getRuntimeContext.getState[TripResult](lastTripDescriptor)
}

override def flatMap(in: SourceData, out: Collector[TripResult]): Unit = {

var lastState = getLastState(in, lastTripState)

// 从 state 中读取上一次行程
val lastTripEndTime = lastState.tripEndTime

// 检测是否需要进行行程划分
if ( (in.createTime - lastTripEndTime) > 10 ) {

lastState.tripStatus = "1"
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage

initState(in, lastTripState)

if (lastState.tripDistance >= 0.5) {
out.collect(lastState)
} else {
None
}
} else {
lastState = getUpdatedState(in, lastState)
this.lastTripState.update(lastState)
// out.collect(lastTripState.value())
None
}
}

def getUpdatedState(in: SourceData, lastState: TripResult): TripResult = {
lastState.tripEndTime = in.createTime // 行程正在进行, 将上一个行程的行程结束时间置为新的源数据的 createTime
lastState.tripStatus = "0" // 行程正在进行, 将这种类型的行程的状态置为 0
val endMileage: Long = in.mileage
lastState.tripDistance = endMileage - lastState.startMileage // 行程正在进行, 更新当前行程的行驶里程数
lastState
}


def getLastState(in: SourceData, state: ValueState[TripResult]): TripResult = {
if (state.value() != null) {
state.value()
} else {
TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 将新的源数据中的 createTime 作为行程开始时间
tripEndTime = in.createTime, // 将新的源数据中的 createTime 作为行程结束时间
startMileage = in.mileage, // 将新的源数据中的 mileage 作为行程的当前里程数
tripDistance = 0, // 行程刚开始, 将行程的行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
}
}

def initState(in: SourceData, state: ValueState[TripResult]): Unit = {
val initTripState = TripResult(
vin = in.vin,
tripStartTime = in.createTime, // 用新的源数据中的 createTime 初始化行程开始时间
tripEndTime = in.createTime, // 用新的源数据中的 createTime 初始化行程结束时间
startMileage = in.mileage, // 用新的源数据中的 mileage 初始化行程中的行程开始里程
tripDistance = 0, // 初始行驶距离置为 0
tripStatus = "0" // 这种类型的行程是刚开始的行程, 将行程状态设置为 0
)
this.lastTripState.update(initTripState)
}
}

主程序组装好各种流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class BigdataApplication @Inject() (
source: DataStream[String],
kafkaConf: KafkaConfiguration,
adapter: Adapter,
trip: SubTrip
) extends Serializable {
private def createNewStreamingContext: StreamExecutionEnvironment = {
val sourceStream: DataStream[SourceData] = adapter.extract(source)
val tripStream: DataStream[TripResult] = trip.extract(sourceStream)
tripStream.print()
tripStream.executionEnvironment
}

def run(params: Params): Unit = {
val env: StreamExecutionEnvironment = createNewStreamingContext
env.execute("Start new streaming environment")
}
}

运行

1
flink run -c datastream.com.gac.xs6.BigdataApplication  ./target/common-flink-1.0-SNAPSHOT.jar

但是 application.conf 这种配置文件还不知道怎么传递给 Flink。可能要使用 ParameterToolGithub 地址。