Wait the light to fall

使用 Spark Structured Streaming 重构项目

焉知非鱼

现在这个项目唯一的乐趣就是使用我学到的新技术重构之前的项目。今天完成了数据适配, 该项目目录结构如下:

├── EnApplication.scala
├── NaApplication.scala
├── conf
│   ├── HbaseConfiguration.scala
│   ├── KafkaConfiguration.scala
│   ├── MysqlConfiguration.scala
│   └── SparkConfiguration.scala
├── core
│   ├── Adapter.scala
│   └── impl
│       ├── AdapterImpl.scala
│       └── EnAdapterImpl.scala
├── model
│   ├── EnSourceData.scala
│   └── SourceData.scala
├── module
│   ├── EnMainModule.scala
│   └── MainModule.scala
├── pipeline
│   ├── NaSchema.scala
│   ├── NaSparkSession.scala
│   └── SourceDataFrame.scala
└── util
    └── XsUdfs.scala

与之前的项目一样, 依旧使用了依赖注入:

  • Kafka 配置
class KafkaConfiguration extends Serializable{
  private val config:                             Config = ConfigFactory.load()
  lazy    val kafkaConfig:                        Config = config.getConfig("kafka")

  lazy    val kafkaZookeeperHost:                 String = kafkaConfig.getString("zookeeper.connect")
  lazy    val kafkaBootstrapservers:              String = kafkaConfig.getString("bootstrap.servers")

  lazy    val kafkaNaTopic:                       String = kafkaConfig.getString("nation.topic.name")
  lazy    val kafkaNaTopicPartitionNum:           Int    = kafkaConfig.getInt("nation.topic.partition.num")
  lazy    val kafkaNaTopicReplicationFactor:      Int    = kafkaConfig.getInt("nation.topic.replication.factor")

  lazy    val kafkaEnTopic:                       String = kafkaConfig.getString("enterprise.topic.name")
  lazy    val kafkaEnTopicPartitionNum:           Int    = kafkaConfig.getInt("enterprise.topic.partition.num")
  lazy    val kafkaEnTopicReplicationFactor:      Int    = kafkaConfig.getInt("enterprise.topic.replication.factor")
}

使用 lazy val 声明的变量, 只有在需要的时候才被赋值, 没用到的时候就不会赋值, 因为 lazy 声明的是惰性的。

  • Spark 配置
class SparkConfiguration {
  private val config:                        Config = ConfigFactory.load()
  lazy    val sparkConf:                     Config = config.getConfig("spark")
  lazy    val sparkMasterNa:                 String = sparkConf.getString("nation.master")
  lazy    val sparkStreamingBatchDurationNa: Int    = sparkConf.getInt("nation.streaming.batch.duration")
  lazy    val checkPointPathNa:              String = sparkConf.getString("nation.checkpoint.path")
  lazy    val ttlNa:                         String = sparkConf.getString("nation.spark.cleaner.ttl")

  lazy    val sparkMasterEn:                 String = sparkConf.getString("enterprise.master")
  lazy    val sparkStreamingBatchDurationEn: Int    = sparkConf.getInt("enterprise.streaming.batch.duration")
  lazy    val checkPointPathEn:              String = sparkConf.getString("enterprise.checkpoint.path")
  lazy    val ttlEn:                         String = sparkConf.getString("enterprise.spark.cleaner.ttl")
}

配置分国标和企标两种配置, 都用 lazy 声明在同一个类中, 用到的时候才被赋值。

  • SourceData
case class SourceData (
                        // 整车数据
                        var vin                              : String  = "",      // 车架号
                        var vintype                          : String  = "",      // 车辆类型
                        var ts                               : Long    = -999999, // 信号发生时间
                        var veh_st                           : String  = "",      // 车辆状态
                        var veh_chargest                     : String  = "",      // 充电状态
                        var veh_spd                          : Double  = -999999, // 车速
                        var veh_odo                          : Double  = -999999, // 累计里程
                        var veh_volt                         : Double  = -999999, // 总电压
                        var veh_curr                         : Double  = -999999, // 总电流
                        var veh_soc                          : Integer = -999999, // SOC
                        var veh_dcdcst                       : String  = "",      // DC-DC状态
                        var veh_gear                         : String  = ""  ,    // 档位
                        var veh_pedal_deep                   : Integer = -999999, // 加速踏板状态
                        var veh_pedalst                      : Integer = -999999, // 制动踏板状态

                        // 驱动电机数据
                        var dm_cnt                           : Integer = -999999, // 驱动电机个数
                        var dm_seq                           : Integer = -999999, // 驱动电机序号
                        var dm_st                            : String  = "",      // 驱动电机状态
                        var dm_ctl_temp                      : Integer = -999999, // 驱动电机控制器温度
                        var dm_spd                           : Integer = -999999, // 驱动电机转速
                        var dm_torq                          : Double  = -999999, // 驱动电机扭矩
                        var dm_temp                          : Integer = -999999, // 驱动电机温度
                        var dm_ctl_volt                      : Double  = -999999, // 电机控制器输入电压
                        var dm_ctl_dc_curr                   : Double  = -999999, // 电机控制器输入电流

                        // 车辆位置数据
                        var loc_st                           : String  = "",      // 定位状态
                        var loc_lon84                        : Double  = -999999, // 经度
                        var loc_lat84                        : Double  = -999999, // 纬度

                        // 极值数据
                        var data_batt_subsys_volt_highest_seq: Integer = -999999, // 最高电压电池子系统号
                        var data_batt_sc_volt_highest        : Double  = -999999, // 电池最高电压
                        var data_batt_subsys_volt_lowest_seq : Integer = -999999, // 最低电压电池子系统号
                        var data_batt_sc_volt_highest_seq    : Integer = -999999, // 电压最高的电池单体代号
                        var data_batt_sc_volt_lowest         : Double  = -999999, // 电池最低电压
                        var data_batt_sc_volt_lowest_seq     : Integer = -999999, // 电压最低的电池单体代号
                        var data_batt_temp_highest           : Integer = -999999, // 电池最高温度
                        var data_batt_temp_probe_highest_seq : Integer = -999999, // 温度最高的电池探针序号
                        var data_batt_temp_lowestest         : Integer = -999999, // 电池最低温度
                        var data_batt_temp_probe_lowest_seq  : Integer = -999999, // 温度最低的电池单体序号
                        var maxTemp                          : Integer = -999999, // 最高温度值

                        // 可充电储能装置温度和电压数据
                        var esd_temp_subsys_seq              : Integer = -999999, // 电池探针序号
                        var esd_temp_probe_list              : String  = "",      // 电池探针温度
                        var esd_volt_subsys_seq              : Integer = -999999, // 单体电池序号
                        var esd_frame_sc_list                : String  = "",      // 单体电池电压
                        var singleBatteryCount               : Integer = -999999, // 单体电池总数

                        // 报警数据 - 故障 19 个
                        var alm_common_temp_diff             : Integer = -999999, // 温度差异报警
                        var alm_common_temp_high             : Integer = -999999, // 电池高温报警
                        var alm_common_esd_high              : Integer = -999999, // 车载储能装置类型过压报警
                        var alm_common_esd_low               : Integer = -999999, // 车载储能装置类型欠压报警
                        var alm_common_soc_low               : Integer = -999999, // SOC低报警
                        var alm_common_sc_high               : Integer = -999999, // 单体电池过压报警
                        var alm_common_sc_low                : Integer = -999999, // 单体电池欠压报警
                        var alm_common_soc_high              : Integer = -999999, // SOC过高报警
                        var alm_common_soc_hop               : Integer = -999999, // SOC跳变报警
                        var alm_common_esd_unmatch           : Integer = -999999, // 可充电储能系统不匹配报警
                        var alm_common_sc_consistency        : Integer = -999999, // 电池单体一致性差报警
                        var alm_common_insulation            : Integer = -999999, // 绝缘报警
                        var alm_common_dcdc_temp             : Integer = -999999, // DC~DC温度报警
                        var alm_common_brk                   : Integer = -999999, // 制动系统报警
                        var alm_common_dcdc_st               : Integer = -999999, // DC~DC状态报警
                        var alm_common_dmc_temp              : Integer = -999999, // 驱动电机控制器温度报警
                        var alm_common_hvil_st               : Integer = -999999, // 高压互锁状态报警
                        var alm_common_dm_temp               : Integer = -999999, // 驱动电机温度报警
                        var alm_common_esd_charge_over       : Integer = -999999, // 车载储能装置类型过充
                        var malfunctionLevel                 : String  = " "      // 故障等级 - 无故障,1级故障,2级故障,3级故障,异常,无效
                      )

SourceData 是用来适配源数据的, 后面会看到, 所有的计算逻辑都是基于适配后的数据。

  • UDF 函数
package com.gac.xs6.util

import org.apache.spark.sql.functions._
import com.github.nscala_time.time.Imports._
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.gac.xs6.model.{EnSourceData, SourceData}

object XsUdfs {
  val   getSignals = udf((vin: String, vintype: String, jsonStr: String) => {
    var raw_str = ""
    if (jsonStr.contains("实时信息上传")) {
      raw_str = jsonStr
    } else if (jsonStr.contains("补发信息上报")) {
      raw_str = jsonStr.replaceAll("补发信息", "实时信息")
    } else {
      ""
    }

    try {
      val s: SourceData            = SourceData()                        // 初始化一个国标数据源

      val signals:   JSONObject    = JSON.parseObject(raw_str) // 获取 signal 里面的 JSON 对象

      if (signals == null) {
        null
      } else {
        val header:    JSONObject    = signals.getJSONObject("数据头")   // 数据头
        val orderFlag: String        = header.getString("orderFlag")    // 获取 orderFlag 的值
        val time:      Long          = signals.getLong("时间")           // 获取时间 201904261023
        var ts:        Long          = 0L

        val realtime_information = orderFlag match {
          case "补发信息上报" => signals.getJSONArray("补发信息")
          case "实时信息上传" => signals.getJSONArray("实时信息")
          case _            => null
        }

        if (realtime_information == null || time == 0L ) { // 除实时信息和补发信息之外, 其余都丢弃
          null
        } else {

          for(info_node <- realtime_information.toArray) {

            val node_data: JSONObject = JSON.parseObject(info_node.toString)
            val part:      String     = node_data.keySet().toArray.head.toString
            val data:      JSONObject = JSON.parseObject(node_data.getString(part))

            part match {
              case "整车数据" =>
                ts = DateTime.parse(time.toString, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getMillis // 转为时间戳



                vin match {
                  case null =>
                  case x    => s.vin = x
                }

                vintype match {
                  case null =>
                  case x    => s.vintype = x
                }

                ts match {
                  case 0    =>
                  case x    => s.ts = x
                }

                data.keySet().toArray.foreach( f => {
                  f.toString match {
                    case "vehicleState"   => s.veh_st         = data.getString("vehicleState")     // 车辆状态
                    case "chargeState"    => s.veh_chargest   = data.getString("chargeState")      // 充电状态
                    case "vehicleSpeed"   => s.veh_spd        = data.getDouble("vehicleSpeed")     // 车速
                    case "odometer"       => s.veh_odo        = data.getDouble("odometer")         // 累计里程
                    case "totalVoltage"   => s.veh_volt       = data.getDouble("totalVoltage")     // 总电压
                    case "totalCurrent"   => s.veh_curr       = data.getDouble("totalCurrent")     // 总电流
                    case "soc"            => s.veh_soc        = data.getInteger("soc")             //  SOC
                    case "dcdcState"      => s.veh_dcdcst     = data.getString("dcdcState")        //  DC-DC状态
                    case "gear"           => s.veh_gear       = data.getString("gear")             //  档位
                    case "accPedalStroke" => s.veh_pedal_deep = data.getInteger("accPedalStroke")  //  加速踏板状态
                    case "brkPedalStroke" => s.veh_pedalst    = data.getInteger("brkPedalStroke")  // 制动踏板状态
                    case _                =>
                  }
                })

              case "车辆位置数据" =>
                data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "locationState" => s.loc_st    = data.getString("locationState") // 定位状态
                    case "longitude"     => s.loc_lon84 = data.getDouble("longitude")     // 经度
                    case "latitude"      => s.loc_lat84 = data.getDouble("latitude")      // 纬度
                    case _               =>
                  }
                })


              case "极值数据" =>
                data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "maxVolBatSysNum"   => s.data_batt_subsys_volt_highest_seq = data.getInteger("maxVolBatSysNum")   // 最高电压电池子系统号
                    case "maxBatUnitVoltage" => s.data_batt_sc_volt_highest         = data.getDouble("maxBatUnitVoltage")  // 电池最高电压
                    case "minVolBatSysNum"   => s.data_batt_subsys_volt_lowest_seq  = data.getInteger("minVolBatSysNum")   // 最低电压电池子系统号
                    case "maxVolBatUnitNum"  => s.data_batt_sc_volt_highest_seq     = data.getInteger("maxVolBatUnitNum")  // 电压最高的电池单体代号
                    case "minBatUnitVoltage" => s.data_batt_sc_volt_lowest          = data.getDouble("minBatUnitVoltage")  // 电池最低电压
                    case "minVolBatUnitNum"  => s.data_batt_sc_volt_lowest_seq      = data.getInteger("minVolBatUnitNum")  // 电压最低的电池单体代号
                    case "maxTemp"           => s.data_batt_temp_highest            = data.getInteger("maxTemp")           // 电池最高温度
                    case "maxTempProbeNum"   => s.data_batt_temp_probe_highest_seq  = data.getInteger("maxTempProbeNum")   // 温度最高的电池探针序号
                    case "minTemp"           => s.data_batt_temp_lowestest          = data.getInteger("minTemp")           // 电池最低温度
                    case "minTempProbeNum"   => s.data_batt_temp_probe_lowest_seq   = data.getInteger("minTempProbeNum")   // 温度最低的电池单体序号
                    case "maxTemp"           => s.maxTemp                           = data.getInteger("maxTemp")           // 温度最高值
                    case _                   =>
                  }
                })


              case "驱动电机数据" =>
                // 驱动电机数据
                val d = data.getJSONArray("driveMotorInfo").getJSONObject(0).toJSONString
                val dm_data = JSON.parseObject(d)

                data.getInteger("driveMotorNum") match { // 驱动电机个数
                  case null =>
                  case x    => s.dm_cnt = x
                }

                dm_data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "driveMotorSeq"                    => s.dm_seq         = dm_data.getInteger("driveMotorSeq")                    // 驱动电机序号
                    case "driveMotorState"                  => s.dm_st          = dm_data.getString("driveMotorState")                   // 驱动电机状态
                    case "driveMotorControllerTemp"         => s.dm_ctl_temp    = dm_data.getInteger("driveMotorControllerTemp")         // 驱动电机控制器温度
                    case "driveMotorRotationRate"           => s.dm_spd         = dm_data.getInteger("driveMotorRotationRate")           // 驱动电机转速
                    case "driveMotorToque"                  => s.dm_torq        = dm_data.getDouble("driveMotorToque")                   // 驱动电机扭矩
                    case "driveMotorTemp"                   => s.dm_temp        = dm_data.getInteger("driveMotorTemp")                   // 驱动电机温度
                    case "driveMotorControllerInputVoltage" => s.dm_ctl_volt    = dm_data.getDouble("driveMotorControllerInputVoltage")  // 电机控制器输入电压
                    case "driveMotorControllerDCbusCurrent" => s.dm_ctl_dc_curr = dm_data.getDouble("driveMotorControllerDCbusCurrent")  // 电机控制器输入电流
                    case _                                  =>
                  }
                })

              case "报警数据" =>
                data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "almCommonTempDiff"      => s.alm_common_temp_diff       = data.getInteger("almCommonTempDiff")       //  温度差异报警 BMS
                    case "almCommonTempHigh"      => s.alm_common_temp_high       = data.getInteger("almCommonTempHigh")       //  电池高温报警 BMS
                    case "almCommonEsdHigh"       => s.alm_common_esd_high        = data.getInteger("almCommonEsdHigh")        //  车载储能装置类型过压报警 BMS
                    case "almCommonEsdLow"        => s.alm_common_esd_low         = data.getInteger("almCommonEsdLow")         //  车载储能装置类型欠压报警 BMS
                    case "almCommonSocLow"        => s.alm_common_soc_low         = data.getInteger("almCommonSocLow")         //  SOC低报警 BMS
                    case "almCommonScHigh"        => s.alm_common_sc_high         = data.getInteger("almCommonScHigh")         //  单体电池过压报警 BMS
                    case "almCommonScLow"         => s.alm_common_sc_low          = data.getInteger("almCommonScLow")          //  单体电池欠压报警 BMS
                    case "almCommonSocHigh"       => s.alm_common_soc_high        = data.getInteger("almCommonSocHigh")        //  SOC过高报警 BMS
                    case "almCommonSocHop"        => s.alm_common_soc_hop         = data.getInteger("almCommonSocHop")         //  SOC跳变报警 BMS
                    case "almCommonEsdUnmatch"    => s.alm_common_esd_unmatch     = data.getInteger("almCommonEsdUnmatch")     //  可充电储能系统不匹配报警 BMS
                    case "almCommonScConsistency" => s.alm_common_sc_consistency  = data.getInteger("almCommonScConsistency")  //  电池单体一致性差报警 BMS
                    case "almCommonInsulation"    => s.alm_common_insulation      = data.getInteger("almCommonInsulation")     //  绝缘报警 BMS
                    case "almCommonDcdcTemp"      => s.alm_common_dcdc_temp       = data.getInteger("almCommonDcdcTemp")       //  DC-DC温度报警 DCDC
                    case "almCommonBrk"           => s.alm_common_brk             = data.getInteger("almCommonBrk")            //  制动系统报警 BCS/VCU
                    case "almCommonDcdcSt"        => s.alm_common_dcdc_st         = data.getInteger("almCommonDcdcSt")         //  DC-DC状态报警 DCDC
                    case "almCommonDmcTemp"       => s.alm_common_dmc_temp        = data.getInteger("almCommonDmcTemp")        //  驱动电机控制器温度报警 DCU
                    case "almCommonHVILSt"        => s.alm_common_hvil_st         = data.getInteger("almCommonHVILSt")         //  高压互锁状态报警 VCU
                    case "almCommonDmTemp"        => s.alm_common_dm_temp         = data.getInteger("almCommonDmTemp")         //  驱动电机温度报警 DCU
                    case "almCommonEsdChargeOver" => s.alm_common_esd_charge_over = data.getInteger("almCommonEsdChargeOver")  //  车载储能装置类型过充 BMS
                    case "malfunctionLevel"       => s.malfunctionLevel           = data.getString("malfunctionLevel")         //  故障等级
                    case _                        =>
                  }
                })

              case "电池电压" =>
                val esd_volt_data_node = data.getJSONArray("batteryUnitVoltage").getJSONObject(0).toJSONString
                val esd_volt_data = JSON.parseObject(esd_volt_data_node)

                esd_volt_data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "batterySysSeq"        => s.esd_volt_subsys_seq = esd_volt_data.getInteger("batterySysSeq")                        // 单体电池序号
                    case "singleBatteryVoltage" => s.esd_frame_sc_list   = esd_volt_data.getJSONArray("singleBatteryVoltage").toJSONString  // 单体电池电压
                    case "singleBatteryCount"   => s.singleBatteryCount  = esd_volt_data.getInteger("singleBatteryCount")                   // 单体电池总数
                    case _                      =>
                  }
                })

              case "电池温度" =>
                val esd_temp_data_node = data.getJSONArray("batteryUnitTemperature").getJSONObject(0).toJSONString
                val esd_temp_data = JSON.parseObject(esd_temp_data_node)

                esd_temp_data.keySet.toArray.foreach(f => {
                  f.toString match {
                    case "batterySysSeq"     => s.esd_temp_subsys_seq = esd_temp_data.getInteger("batterySysSeq")                    // 电池探针序号
                    case "probeTemperatures" => s.esd_temp_probe_list = esd_temp_data.getJSONArray("probeTemperatures").toJSONString // 电池探针温度
                    case _                   =>
                  }
                })

              case _ =>
            }
          }

          s
        }
      }
    } catch {
      case e: Exception => println(e,jsonStr)
        null
    }
  })
}

这个 UDF 函数 getSignals 接收三个函数, vin, vintype, jsonStr, 其中 jsonStr 是 Kafka 中的 *value* 值。该 UDF 函数的作用是将解析每一个 Kafka 的 value, 并将其适配为 SoureData 对象。

  • MainModule
package com.gac.xs6.module

import com.gac.xs6.conf.{HbaseConfiguration, KafkaConfiguration, SparkConfiguration}
import com.gac.xs6.NaApplication
import com.gac.xs6.core.Adapter
import com.gac.xs6.core.impl.AdapterImpl
import com.gac.xs6.pipeline.{NaSchema, NaSparkSession, SourceDataFrame}
import com.google.inject.{AbstractModule, Provides, Singleton}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

object MainModule extends AbstractModule {

  override def configure(): Unit = {
    bind(classOf[KafkaConfiguration]).asEagerSingleton()
    bind(classOf[HbaseConfiguration]).asEagerSingleton()
    bind(classOf[Adapter]).toInstance(AdapterImpl)
    bind(classOf[NaApplication])
  }

  @Provides
  @Singleton
  def sparkSession(sparkConf: SparkConfiguration): NaSparkSession = {
    new NaSparkSession {
      override def session(): SparkSession = {
        SparkSession.builder
          .master(sparkConf.sparkMasterNa)
          .appName("Stateful Structured Streaming")
          .getOrCreate()
      }
    }
  }

  @Provides
  @Singleton
  def kafkaDataSource(kafkaConf: KafkaConfiguration, sparkConf: SparkConfiguration): SourceDataFrame = {
    new SourceDataFrame {
      override def stream(): DataFrame = {

        val spark = sparkSession(sparkConf).session()
        val df: DataFrame = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaConf.kafkaBootstrapservers)
          .option("subscribe", kafkaConf.kafkaNaTopic)
          .option("startingOffsets", "earliest")
          .load()
        import spark.implicits._
        val source: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .as[(String, String)]

        val naSchema = (new NaSchema).schema
        val parsed: DataFrame = source.select($"key", from_json($"value", naSchema) as "data")
        parsed
      }
    }
  }
}

MainModule 是真正读取 Kafka 数据的地方, 我们提供一个 kafkaDataSource 函数, 这个函数返回 SourceDataFrame trait 类型, 为了返回这个类型, 我们 new 一个 SourceDataFrame:

new SourceDataFrame

实现 stream 函数, stream 函数返回一个 DataFrame.

  • Adapter Impl
package com.gac.xs6.core.impl

import com.gac.xs6.core.Adapter
import com.gac.xs6.model.SourceData
import com.gac.xs6.util.XsUdfs.getSignals
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import scala.language.postfixOps

object AdapterImpl extends Adapter {
  override def extract(spark: SparkSession, df: DataFrame): Dataset[SourceData] = {
    import spark.implicits._
    df.select(getSignals($"data.vin", $"data.vintype", $"data.signals") as "signals")
      .filter($"signals" isNotNull)
      .select($"signals.*")
      .as[SourceData]
  }
}

这个 Adapter 返回 Dataset[SourceData]

  • Adapter trait
trait Adapter {
  def extract(spark: SparkSession, df: DataFrame): Dataset[SourceData]
}
  • NaApplication
package com.gac.xs6

import com.gac.xs6.NaApplication.Params
import com.gac.xs6.conf.{KafkaConfiguration, SparkConfiguration}
import com.gac.xs6.core.Adapter
import com.gac.xs6.model.SourceData
import com.gac.xs6.module.MainModule
import com.gac.xs6.pipeline.{NaSparkSession, SourceDataFrame}
import com.google.inject.{Guice, Inject, Singleton}
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import scopt.OptionParser

object NaApplication extends App {
  val parser = new OptionParser[Params]("NaApplication") {
    head("Spark Structured Streaming from kafka")

    opt[String]('c', "conf")
      .text("config.resource for xs6-gac")
      .action((x, c) => c.copy(conf = x))

    help("help").text("prints this usage text")
  }

  parser.parse(args, Params()) match {
    case Some(params) =>
      val injector = Guice.createInjector(MainModule)
      val runner = injector.getInstance(classOf[NaApplication])
      ConfigFactory.invalidateCaches()
      runner.run(params)
    case _ => sys.exit(1)
  }

  //  case class Params(conf: String = "application.conf")
  case class Params(conf: String = "")
}

@Singleton
class NaApplication @Inject() (
                                source: SourceDataFrame,
                                adapter: Adapter,
                                sparkConf: SparkConfiguration,
                                kafkaConf: KafkaConfiguration,
                                sparkSession: NaSparkSession
                              ) extends Serializable {
  private def createNewStreamingQuery(): Unit = {

    val spark = sparkSession.session()
    val sourceDf: DataFrame = source.stream()

    val adapterData: Dataset[SourceData] = adapter.extract(spark, sourceDf)

    val adapterDf: StreamingQuery =
      adapterData.writeStream
        .outputMode(OutputMode.Update())
        .trigger(Trigger.Once)
        //.trigger(Trigger.ProcessingTime("1 seconds"))
        .format("console")
        .option("truncate", "false") // 不截断显示
        .start()


/*
    val query: StreamingQuery =
      subTripData.writeStream
        .outputMode(OutputMode.Update())
        .trigger(Trigger.Once) //
        //.trigger(Trigger.ProcessingTime("1 seconds"))
        .format("console")
        .option("truncate", "false") // 不截断显示
        .start()
    */

    spark.streams.awaitAnyTermination()

  }

  def run(params: Params): Unit = {
    createNewStreamingQuery()
  }
}

NaApplication 类接收 5 个参数:

source: SourceDataFrame,
adapter: Adapter,
sparkConf: SparkConfiguration,
kafkaConf: KafkaConfiguration,
sparkSession: NaSparkSession

我们又见到了依赖注入的强大之处。其中 source 是 DataFrame 源, adapter 是适配后的 DataFrame。

spark.streams.awaitAnyTermination() 不会阻塞其它 StreamingQuery 的执行, 而是会执行所有 StreamingQuery。

代码在 github, 仅供参考。Kafka 中的数据不便给出。