使用 Spark Structured Streaming 重构项目

现在这个项目唯一的乐趣就是使用我学到的新技术重构之前的项目。今天完成了数据适配, 该项目目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
├── EnApplication.scala
├── NaApplication.scala
├── conf
│   ├── HbaseConfiguration.scala
│   ├── KafkaConfiguration.scala
│   ├── MysqlConfiguration.scala
│   └── SparkConfiguration.scala
├── core
│   ├── Adapter.scala
│   └── impl
│   ├── AdapterImpl.scala
│   └── EnAdapterImpl.scala
├── model
│   ├── EnSourceData.scala
│   └── SourceData.scala
├── module
│   ├── EnMainModule.scala
│   └── MainModule.scala
├── pipeline
│   ├── NaSchema.scala
│   ├── NaSparkSession.scala
│   └── SourceDataFrame.scala
└── util
└── XsUdfs.scala

与之前的项目一样, 依旧使用了依赖注入:

  • Kafka 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class KafkaConfiguration extends Serializable{
private val config: Config = ConfigFactory.load()
lazy val kafkaConfig: Config = config.getConfig("kafka")

lazy val kafkaZookeeperHost: String = kafkaConfig.getString("zookeeper.connect")
lazy val kafkaBootstrapservers: String = kafkaConfig.getString("bootstrap.servers")

lazy val kafkaNaTopic: String = kafkaConfig.getString("nation.topic.name")
lazy val kafkaNaTopicPartitionNum: Int = kafkaConfig.getInt("nation.topic.partition.num")
lazy val kafkaNaTopicReplicationFactor: Int = kafkaConfig.getInt("nation.topic.replication.factor")

lazy val kafkaEnTopic: String = kafkaConfig.getString("enterprise.topic.name")
lazy val kafkaEnTopicPartitionNum: Int = kafkaConfig.getInt("enterprise.topic.partition.num")
lazy val kafkaEnTopicReplicationFactor: Int = kafkaConfig.getInt("enterprise.topic.replication.factor")
}

使用 lazy val 声明的变量, 只有在需要的时候才被赋值, 没用到的时候就不会赋值, 因为 lazy 声明的是惰性的。

  • Spark 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
class SparkConfiguration {
private val config: Config = ConfigFactory.load()
lazy val sparkConf: Config = config.getConfig("spark")
lazy val sparkMasterNa: String = sparkConf.getString("nation.master")
lazy val sparkStreamingBatchDurationNa: Int = sparkConf.getInt("nation.streaming.batch.duration")
lazy val checkPointPathNa: String = sparkConf.getString("nation.checkpoint.path")
lazy val ttlNa: String = sparkConf.getString("nation.spark.cleaner.ttl")

lazy val sparkMasterEn: String = sparkConf.getString("enterprise.master")
lazy val sparkStreamingBatchDurationEn: Int = sparkConf.getInt("enterprise.streaming.batch.duration")
lazy val checkPointPathEn: String = sparkConf.getString("enterprise.checkpoint.path")
lazy val ttlEn: String = sparkConf.getString("enterprise.spark.cleaner.ttl")
}

配置分国标和企标两种配置, 都用 lazy 声明在同一个类中, 用到的时候才被赋值。

  • SourceData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
case class SourceData (
// 整车数据
var vin : String = "", // 车架号
var vintype : String = "", // 车辆类型
var ts : Long = -999999, // 信号发生时间
var veh_st : String = "", // 车辆状态
var veh_chargest : String = "", // 充电状态
var veh_spd : Double = -999999, // 车速
var veh_odo : Double = -999999, // 累计里程
var veh_volt : Double = -999999, // 总电压
var veh_curr : Double = -999999, // 总电流
var veh_soc : Integer = -999999, // SOC
var veh_dcdcst : String = "", // DC-DC状态
var veh_gear : String = "" , // 档位
var veh_pedal_deep : Integer = -999999, // 加速踏板状态
var veh_pedalst : Integer = -999999, // 制动踏板状态

// 驱动电机数据
var dm_cnt : Integer = -999999, // 驱动电机个数
var dm_seq : Integer = -999999, // 驱动电机序号
var dm_st : String = "", // 驱动电机状态
var dm_ctl_temp : Integer = -999999, // 驱动电机控制器温度
var dm_spd : Integer = -999999, // 驱动电机转速
var dm_torq : Double = -999999, // 驱动电机扭矩
var dm_temp : Integer = -999999, // 驱动电机温度
var dm_ctl_volt : Double = -999999, // 电机控制器输入电压
var dm_ctl_dc_curr : Double = -999999, // 电机控制器输入电流

// 车辆位置数据
var loc_st : String = "", // 定位状态
var loc_lon84 : Double = -999999, // 经度
var loc_lat84 : Double = -999999, // 纬度

// 极值数据
var data_batt_subsys_volt_highest_seq: Integer = -999999, // 最高电压电池子系统号
var data_batt_sc_volt_highest : Double = -999999, // 电池最高电压
var data_batt_subsys_volt_lowest_seq : Integer = -999999, // 最低电压电池子系统号
var data_batt_sc_volt_highest_seq : Integer = -999999, // 电压最高的电池单体代号
var data_batt_sc_volt_lowest : Double = -999999, // 电池最低电压
var data_batt_sc_volt_lowest_seq : Integer = -999999, // 电压最低的电池单体代号
var data_batt_temp_highest : Integer = -999999, // 电池最高温度
var data_batt_temp_probe_highest_seq : Integer = -999999, // 温度最高的电池探针序号
var data_batt_temp_lowestest : Integer = -999999, // 电池最低温度
var data_batt_temp_probe_lowest_seq : Integer = -999999, // 温度最低的电池单体序号
var maxTemp : Integer = -999999, // 最高温度值

// 可充电储能装置温度和电压数据
var esd_temp_subsys_seq : Integer = -999999, // 电池探针序号
var esd_temp_probe_list : String = "", // 电池探针温度
var esd_volt_subsys_seq : Integer = -999999, // 单体电池序号
var esd_frame_sc_list : String = "", // 单体电池电压
var singleBatteryCount : Integer = -999999, // 单体电池总数

// 报警数据 - 故障 19 个
var alm_common_temp_diff : Integer = -999999, // 温度差异报警
var alm_common_temp_high : Integer = -999999, // 电池高温报警
var alm_common_esd_high : Integer = -999999, // 车载储能装置类型过压报警
var alm_common_esd_low : Integer = -999999, // 车载储能装置类型欠压报警
var alm_common_soc_low : Integer = -999999, // SOC低报警
var alm_common_sc_high : Integer = -999999, // 单体电池过压报警
var alm_common_sc_low : Integer = -999999, // 单体电池欠压报警
var alm_common_soc_high : Integer = -999999, // SOC过高报警
var alm_common_soc_hop : Integer = -999999, // SOC跳变报警
var alm_common_esd_unmatch : Integer = -999999, // 可充电储能系统不匹配报警
var alm_common_sc_consistency : Integer = -999999, // 电池单体一致性差报警
var alm_common_insulation : Integer = -999999, // 绝缘报警
var alm_common_dcdc_temp : Integer = -999999, // DC~DC温度报警
var alm_common_brk : Integer = -999999, // 制动系统报警
var alm_common_dcdc_st : Integer = -999999, // DC~DC状态报警
var alm_common_dmc_temp : Integer = -999999, // 驱动电机控制器温度报警
var alm_common_hvil_st : Integer = -999999, // 高压互锁状态报警
var alm_common_dm_temp : Integer = -999999, // 驱动电机温度报警
var alm_common_esd_charge_over : Integer = -999999, // 车载储能装置类型过充
var malfunctionLevel : String = " " // 故障等级 - 无故障,1级故障,2级故障,3级故障,异常,无效
)

SourceData 是用来适配源数据的, 后面会看到, 所有的计算逻辑都是基于适配后的数据。

  • UDF 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package com.gac.xs6.util

import org.apache.spark.sql.functions._
import com.github.nscala_time.time.Imports._
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.gac.xs6.model.{EnSourceData, SourceData}

object XsUdfs {
val getSignals = udf((vin: String, vintype: String, jsonStr: String) => {
var raw_str = ""
if (jsonStr.contains("实时信息上传")) {
raw_str = jsonStr
} else if (jsonStr.contains("补发信息上报")) {
raw_str = jsonStr.replaceAll("补发信息", "实时信息")
} else {
""
}

try {
val s: SourceData = SourceData() // 初始化一个国标数据源

val signals: JSONObject = JSON.parseObject(raw_str) // 获取 signal 里面的 JSON 对象

if (signals == null) {
null
} else {
val header: JSONObject = signals.getJSONObject("数据头") // 数据头
val orderFlag: String = header.getString("orderFlag") // 获取 orderFlag 的值
val time: Long = signals.getLong("时间") // 获取时间 201904261023
var ts: Long = 0L

val realtime_information = orderFlag match {
case "补发信息上报" => signals.getJSONArray("补发信息")
case "实时信息上传" => signals.getJSONArray("实时信息")
case _ => null
}

if (realtime_information == null || time == 0L ) { // 除实时信息和补发信息之外, 其余都丢弃
null
} else {

for(info_node <- realtime_information.toArray) {

val node_data: JSONObject = JSON.parseObject(info_node.toString)
val part: String = node_data.keySet().toArray.head.toString
val data: JSONObject = JSON.parseObject(node_data.getString(part))

part match {
case "整车数据" =>
ts = DateTime.parse(time.toString, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getMillis // 转为时间戳



vin match {
case null =>
case x => s.vin = x
}

vintype match {
case null =>
case x => s.vintype = x
}

ts match {
case 0 =>
case x => s.ts = x
}

data.keySet().toArray.foreach( f => {
f.toString match {
case "vehicleState" => s.veh_st = data.getString("vehicleState") // 车辆状态
case "chargeState" => s.veh_chargest = data.getString("chargeState") // 充电状态
case "vehicleSpeed" => s.veh_spd = data.getDouble("vehicleSpeed") // 车速
case "odometer" => s.veh_odo = data.getDouble("odometer") // 累计里程
case "totalVoltage" => s.veh_volt = data.getDouble("totalVoltage") // 总电压
case "totalCurrent" => s.veh_curr = data.getDouble("totalCurrent") // 总电流
case "soc" => s.veh_soc = data.getInteger("soc") // SOC
case "dcdcState" => s.veh_dcdcst = data.getString("dcdcState") // DC-DC状态
case "gear" => s.veh_gear = data.getString("gear") // 档位
case "accPedalStroke" => s.veh_pedal_deep = data.getInteger("accPedalStroke") // 加速踏板状态
case "brkPedalStroke" => s.veh_pedalst = data.getInteger("brkPedalStroke") // 制动踏板状态
case _ =>
}
})

case "车辆位置数据" =>
data.keySet.toArray.foreach(f => {
f.toString match {
case "locationState" => s.loc_st = data.getString("locationState") // 定位状态
case "longitude" => s.loc_lon84 = data.getDouble("longitude") // 经度
case "latitude" => s.loc_lat84 = data.getDouble("latitude") // 纬度
case _ =>
}
})


case "极值数据" =>
data.keySet.toArray.foreach(f => {
f.toString match {
case "maxVolBatSysNum" => s.data_batt_subsys_volt_highest_seq = data.getInteger("maxVolBatSysNum") // 最高电压电池子系统号
case "maxBatUnitVoltage" => s.data_batt_sc_volt_highest = data.getDouble("maxBatUnitVoltage") // 电池最高电压
case "minVolBatSysNum" => s.data_batt_subsys_volt_lowest_seq = data.getInteger("minVolBatSysNum") // 最低电压电池子系统号
case "maxVolBatUnitNum" => s.data_batt_sc_volt_highest_seq = data.getInteger("maxVolBatUnitNum") // 电压最高的电池单体代号
case "minBatUnitVoltage" => s.data_batt_sc_volt_lowest = data.getDouble("minBatUnitVoltage") // 电池最低电压
case "minVolBatUnitNum" => s.data_batt_sc_volt_lowest_seq = data.getInteger("minVolBatUnitNum") // 电压最低的电池单体代号
case "maxTemp" => s.data_batt_temp_highest = data.getInteger("maxTemp") // 电池最高温度
case "maxTempProbeNum" => s.data_batt_temp_probe_highest_seq = data.getInteger("maxTempProbeNum") // 温度最高的电池探针序号
case "minTemp" => s.data_batt_temp_lowestest = data.getInteger("minTemp") // 电池最低温度
case "minTempProbeNum" => s.data_batt_temp_probe_lowest_seq = data.getInteger("minTempProbeNum") // 温度最低的电池单体序号
case "maxTemp" => s.maxTemp = data.getInteger("maxTemp") // 温度最高值
case _ =>
}
})


case "驱动电机数据" =>
// 驱动电机数据
val d = data.getJSONArray("driveMotorInfo").getJSONObject(0).toJSONString
val dm_data = JSON.parseObject(d)

data.getInteger("driveMotorNum") match { // 驱动电机个数
case null =>
case x => s.dm_cnt = x
}

dm_data.keySet.toArray.foreach(f => {
f.toString match {
case "driveMotorSeq" => s.dm_seq = dm_data.getInteger("driveMotorSeq") // 驱动电机序号
case "driveMotorState" => s.dm_st = dm_data.getString("driveMotorState") // 驱动电机状态
case "driveMotorControllerTemp" => s.dm_ctl_temp = dm_data.getInteger("driveMotorControllerTemp") // 驱动电机控制器温度
case "driveMotorRotationRate" => s.dm_spd = dm_data.getInteger("driveMotorRotationRate") // 驱动电机转速
case "driveMotorToque" => s.dm_torq = dm_data.getDouble("driveMotorToque") // 驱动电机扭矩
case "driveMotorTemp" => s.dm_temp = dm_data.getInteger("driveMotorTemp") // 驱动电机温度
case "driveMotorControllerInputVoltage" => s.dm_ctl_volt = dm_data.getDouble("driveMotorControllerInputVoltage") // 电机控制器输入电压
case "driveMotorControllerDCbusCurrent" => s.dm_ctl_dc_curr = dm_data.getDouble("driveMotorControllerDCbusCurrent") // 电机控制器输入电流
case _ =>
}
})

case "报警数据" =>
data.keySet.toArray.foreach(f => {
f.toString match {
case "almCommonTempDiff" => s.alm_common_temp_diff = data.getInteger("almCommonTempDiff") // 温度差异报警 BMS
case "almCommonTempHigh" => s.alm_common_temp_high = data.getInteger("almCommonTempHigh") // 电池高温报警 BMS
case "almCommonEsdHigh" => s.alm_common_esd_high = data.getInteger("almCommonEsdHigh") // 车载储能装置类型过压报警 BMS
case "almCommonEsdLow" => s.alm_common_esd_low = data.getInteger("almCommonEsdLow") // 车载储能装置类型欠压报警 BMS
case "almCommonSocLow" => s.alm_common_soc_low = data.getInteger("almCommonSocLow") // SOC低报警 BMS
case "almCommonScHigh" => s.alm_common_sc_high = data.getInteger("almCommonScHigh") // 单体电池过压报警 BMS
case "almCommonScLow" => s.alm_common_sc_low = data.getInteger("almCommonScLow") // 单体电池欠压报警 BMS
case "almCommonSocHigh" => s.alm_common_soc_high = data.getInteger("almCommonSocHigh") // SOC过高报警 BMS
case "almCommonSocHop" => s.alm_common_soc_hop = data.getInteger("almCommonSocHop") // SOC跳变报警 BMS
case "almCommonEsdUnmatch" => s.alm_common_esd_unmatch = data.getInteger("almCommonEsdUnmatch") // 可充电储能系统不匹配报警 BMS
case "almCommonScConsistency" => s.alm_common_sc_consistency = data.getInteger("almCommonScConsistency") // 电池单体一致性差报警 BMS
case "almCommonInsulation" => s.alm_common_insulation = data.getInteger("almCommonInsulation") // 绝缘报警 BMS
case "almCommonDcdcTemp" => s.alm_common_dcdc_temp = data.getInteger("almCommonDcdcTemp") // DC-DC温度报警 DCDC
case "almCommonBrk" => s.alm_common_brk = data.getInteger("almCommonBrk") // 制动系统报警 BCS/VCU
case "almCommonDcdcSt" => s.alm_common_dcdc_st = data.getInteger("almCommonDcdcSt") // DC-DC状态报警 DCDC
case "almCommonDmcTemp" => s.alm_common_dmc_temp = data.getInteger("almCommonDmcTemp") // 驱动电机控制器温度报警 DCU
case "almCommonHVILSt" => s.alm_common_hvil_st = data.getInteger("almCommonHVILSt") // 高压互锁状态报警 VCU
case "almCommonDmTemp" => s.alm_common_dm_temp = data.getInteger("almCommonDmTemp") // 驱动电机温度报警 DCU
case "almCommonEsdChargeOver" => s.alm_common_esd_charge_over = data.getInteger("almCommonEsdChargeOver") // 车载储能装置类型过充 BMS
case "malfunctionLevel" => s.malfunctionLevel = data.getString("malfunctionLevel") // 故障等级
case _ =>
}
})

case "电池电压" =>
val esd_volt_data_node = data.getJSONArray("batteryUnitVoltage").getJSONObject(0).toJSONString
val esd_volt_data = JSON.parseObject(esd_volt_data_node)

esd_volt_data.keySet.toArray.foreach(f => {
f.toString match {
case "batterySysSeq" => s.esd_volt_subsys_seq = esd_volt_data.getInteger("batterySysSeq") // 单体电池序号
case "singleBatteryVoltage" => s.esd_frame_sc_list = esd_volt_data.getJSONArray("singleBatteryVoltage").toJSONString // 单体电池电压
case "singleBatteryCount" => s.singleBatteryCount = esd_volt_data.getInteger("singleBatteryCount") // 单体电池总数
case _ =>
}
})

case "电池温度" =>
val esd_temp_data_node = data.getJSONArray("batteryUnitTemperature").getJSONObject(0).toJSONString
val esd_temp_data = JSON.parseObject(esd_temp_data_node)

esd_temp_data.keySet.toArray.foreach(f => {
f.toString match {
case "batterySysSeq" => s.esd_temp_subsys_seq = esd_temp_data.getInteger("batterySysSeq") // 电池探针序号
case "probeTemperatures" => s.esd_temp_probe_list = esd_temp_data.getJSONArray("probeTemperatures").toJSONString // 电池探针温度
case _ =>
}
})

case _ =>
}
}

s
}
}
} catch {
case e: Exception => println(e,jsonStr)
null
}
})
}

这个 UDF 函数 getSignals 接收三个函数, vin, vintype, jsonStr, 其中 jsonStr 是 Kafka 中的 value** 值。该 UDF 函数的作用是将解析每一个 Kafka 的 value, 并将其适配为 SoureData** 对象。

  • MainModule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.gac.xs6.module

import com.gac.xs6.conf.{HbaseConfiguration, KafkaConfiguration, SparkConfiguration}
import com.gac.xs6.NaApplication
import com.gac.xs6.core.Adapter
import com.gac.xs6.core.impl.AdapterImpl
import com.gac.xs6.pipeline.{NaSchema, NaSparkSession, SourceDataFrame}
import com.google.inject.{AbstractModule, Provides, Singleton}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

object MainModule extends AbstractModule {

override def configure(): Unit = {
bind(classOf[KafkaConfiguration]).asEagerSingleton()
bind(classOf[HbaseConfiguration]).asEagerSingleton()
bind(classOf[Adapter]).toInstance(AdapterImpl)
bind(classOf[NaApplication])
}

@Provides
@Singleton
def sparkSession(sparkConf: SparkConfiguration): NaSparkSession = {
new NaSparkSession {
override def session(): SparkSession = {
SparkSession.builder
.master(sparkConf.sparkMasterNa)
.appName("Stateful Structured Streaming")
.getOrCreate()
}
}
}

@Provides
@Singleton
def kafkaDataSource(kafkaConf: KafkaConfiguration, sparkConf: SparkConfiguration): SourceDataFrame = {
new SourceDataFrame {
override def stream(): DataFrame = {

val spark = sparkSession(sparkConf).session()
val df: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConf.kafkaBootstrapservers)
.option("subscribe", kafkaConf.kafkaNaTopic)
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
val source: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val naSchema = (new NaSchema).schema
val parsed: DataFrame = source.select($"key", from_json($"value", naSchema) as "data")
parsed
}
}
}
}

MainModule 是真正读取 Kafka 数据的地方, 我们提供一个 kafkaDataSource 函数, 这个函数返回 SourceDataFrame trait 类型, 为了返回这个类型, 我们 new 一个 SourceDataFrame:

1
new SourceDataFrame

实现 stream 函数, stream 函数返回一个 DataFrame.

  • Adapter Impl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.gac.xs6.core.impl

import com.gac.xs6.core.Adapter
import com.gac.xs6.model.SourceData
import com.gac.xs6.util.XsUdfs.getSignals
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import scala.language.postfixOps

object AdapterImpl extends Adapter {
override def extract(spark: SparkSession, df: DataFrame): Dataset[SourceData] = {
import spark.implicits._
df.select(getSignals($"data.vin", $"data.vintype", $"data.signals") as "signals")
.filter($"signals" isNotNull)
.select($"signals.*")
.as[SourceData]
}
}

这个 Adapter 返回 Dataset[SourceData]

  • Adapter trait
1
2
3
trait Adapter {
def extract(spark: SparkSession, df: DataFrame): Dataset[SourceData]
}
  • NaApplication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.gac.xs6

import com.gac.xs6.NaApplication.Params
import com.gac.xs6.conf.{KafkaConfiguration, SparkConfiguration}
import com.gac.xs6.core.Adapter
import com.gac.xs6.model.SourceData
import com.gac.xs6.module.MainModule
import com.gac.xs6.pipeline.{NaSparkSession, SourceDataFrame}
import com.google.inject.{Guice, Inject, Singleton}
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import scopt.OptionParser

object NaApplication extends App {
val parser = new OptionParser[Params]("NaApplication") {
head("Spark Structured Streaming from kafka")

opt[String]('c', "conf")
.text("config.resource for xs6-gac")
.action((x, c) => c.copy(conf = x))

help("help").text("prints this usage text")
}

parser.parse(args, Params()) match {
case Some(params) =>
val injector = Guice.createInjector(MainModule)
val runner = injector.getInstance(classOf[NaApplication])
ConfigFactory.invalidateCaches()
runner.run(params)
case _ => sys.exit(1)
}

// case class Params(conf: String = "application.conf")
case class Params(conf: String = "")
}

@Singleton
class NaApplication @Inject() (
source: SourceDataFrame,
adapter: Adapter,
sparkConf: SparkConfiguration,
kafkaConf: KafkaConfiguration,
sparkSession: NaSparkSession
) extends Serializable {
private def createNewStreamingQuery(): Unit = {

val spark = sparkSession.session()
val sourceDf: DataFrame = source.stream()

val adapterData: Dataset[SourceData] = adapter.extract(spark, sourceDf)

val adapterDf: StreamingQuery =
adapterData.writeStream
.outputMode(OutputMode.Update())
.trigger(Trigger.Once)
//.trigger(Trigger.ProcessingTime("1 seconds"))
.format("console")
.option("truncate", "false") // 不截断显示
.start()


/*
val query: StreamingQuery =
subTripData.writeStream
.outputMode(OutputMode.Update())
.trigger(Trigger.Once) //
//.trigger(Trigger.ProcessingTime("1 seconds"))
.format("console")
.option("truncate", "false") // 不截断显示
.start()
*/

spark.streams.awaitAnyTermination()

}

def run(params: Params): Unit = {
createNewStreamingQuery()
}
}

NaApplication 类接收 5 个参数:

1
2
3
4
5
source: SourceDataFrame,
adapter: Adapter,
sparkConf: SparkConfiguration,
kafkaConf: KafkaConfiguration,
sparkSession: NaSparkSession

我们又见到了依赖注入的强大之处。其中 source 是 DataFrame 源, adapter 是适配后的 DataFrame。

spark.streams.awaitAnyTermination() 不会阻塞其它 StreamingQuery 的执行, 而是会执行所有 StreamingQuery。

代码在 github, 仅供参考。Kafka 中的数据不便给出。