Wait the light to fall

读取 Parquet 并写回 Kafka

焉知非鱼

读取 parquet 文件发到 Kafka

定义 schema #

根据

val df = spark.read.parquet("xxxx/part-*")
df.printSchema

得到的 schema 如下:

root
 |-- alm_dm_list: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- alm_esd_list: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- alm_others_list: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- data_batt_sc_volt_lowest: double (nullable = true)
 |-- veh_dcdcst: integer (nullable = true)
 |-- alm_common_dcdc_st: integer (nullable = true)
 |-- dm_cnt: integer (nullable = true)
 |-- alm_common_temp_diff: integer (nullable = true)
 |-- data_batt_temp_probe_highest_seq: integer (nullable = true)
 |-- alm_common_dm_temp: integer (nullable = true)
 |-- alm_common_soc_high: integer (nullable = true)
 |-- alm_eng_cnt: integer (nullable = true)
 |-- alm_common_esd_high: integer (nullable = true)
 |-- alm_common_esd_unmatch: integer (nullable = true)
 |-- alm_common_soc_low: integer (nullable = true)
 |-- alm_esd_cnt: integer (nullable = true)
 |-- veh_curr: double (nullable = true)
 |-- esd_temp_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- esd_temp_probe_cnt: integer (nullable = true)
 |    |    |-- esd_temp_probe_list: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- esd_temp_subsys_seq: integer (nullable = true)
 |-- esd_temp_subsys_cnt: integer (nullable = true)
 |-- veh_pedal_deep: integer (nullable = true)
 |-- esd_volt_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- esd_curr: double (nullable = true)
 |    |    |-- esd_frame_sc_cnt: integer (nullable = true)
 |    |    |-- esd_frame_sc_list: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- esd_frame_start: integer (nullable = true)
 |    |    |-- esd_sc_cnt: integer (nullable = true)
 |    |    |-- esd_volt: double (nullable = true)
 |    |    |-- esd_volt_subsys_seq: integer (nullable = true)
 |-- alm_others_cnt: integer (nullable = true)
 |-- data_batt_sc_volt_highest: double (nullable = true)
 |-- veh_chargest: integer (nullable = true)
 |-- alm_common_esd_charge_over: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- alm_common_sc_consistency: integer (nullable = true)
 |-- eng_consumption: double (nullable = true)
 |-- veh_pedalst: integer (nullable = true)
 |-- data_batt_subsys_temp_lowest_seq: integer (nullable = true)
 |-- loc_lon84: double (nullable = true)
 |-- esd_volt_subsys_cnt: integer (nullable = true)
 |-- data_batt_temp_lowestest: integer (nullable = true)
 |-- data_batt_subsys_volt_lowest_seq: integer (nullable = true)
 |-- alm_common_brk: integer (nullable = true)
 |-- alm_common_esd_low: integer (nullable = true)
 |-- veh_insulation: integer (nullable = true)
 |-- veh_volt: double (nullable = true)
 |-- alm_common_temp_high: integer (nullable = true)
 |-- alm_dm_cnt: integer (nullable = true)
 |-- alm_lvl_higest: integer (nullable = true)
 |-- vin: string (nullable = true)
 |-- veh_odo: double (nullable = true)
 |-- data_batt_subsys_temp_highest_seq: integer (nullable = true)
 |-- data_batt_sc_volt_lowest_seq: integer (nullable = true)
 |-- loc_lat84: double (nullable = true)
 |-- eng_st: integer (nullable = true)
 |-- veh_soc: integer (nullable = true)
 |-- data_batt_subsys_volt_highest_seq: integer (nullable = true)
 |-- eng_spd: integer (nullable = true)
 |-- alm_common_insulation: integer (nullable = true)
 |-- alm_common_soc_hop: integer (nullable = true)
 |-- alm_common_flag: string (nullable = true)
 |-- alm_common_dcdc_temp: integer (nullable = true)
 |-- data_batt_temp_probe_lowest_seq: integer (nullable = true)
 |-- veh_runmode: integer (nullable = true)
 |-- veh_st: integer (nullable = true)
 |-- veh_spd: double (nullable = true)
 |-- data_batt_temp_highest: integer (nullable = true)
 |-- loc_st: integer (nullable = true)
 |-- alm_common_sc_low: integer (nullable = true)
 |-- veh_gear: string (nullable = true)
 |-- alm_common_dmc_temp: integer (nullable = true)
 |-- alm_common_sc_high: integer (nullable = true)
 |-- data_batt_sc_volt_highest_seq: integer (nullable = true)
 |-- alm_common_hvil_st: integer (nullable = true)
 |-- dm_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dm_ctl_dc_curr: double (nullable = true)
 |    |    |-- dm_ctl_temp: integer (nullable = true)
 |    |    |-- dm_ctl_volt: double (nullable = true)
 |    |    |-- dm_seq: integer (nullable = true)
 |    |    |-- dm_spd: integer (nullable = true)
 |    |    |-- dm_st: integer (nullable = true)
 |    |    |-- dm_temp: integer (nullable = true)
 |    |    |-- dm_torq: integer (nullable = true)

从而构造出我们在 JSON 转换时用到的 schema:

package ohmysummer.pipeline.schema

import org.apache.spark.sql.types._

class ParquetSchema {
  val schema = new StructType()
    .add("alm_dm_list", ArrayType(IntegerType, true), true)
    .add("alm_esd_list", ArrayType(IntegerType, true), true)
    .add("alm_others_list", ArrayType(IntegerType, true), true)
    .add("data_batt_sc_volt_lowest", DoubleType, true)
    .add("veh_dcdcst", IntegerType, true)
    .add("alm_common_dcdc_st", IntegerType, true)
    .add("dm_cnt", IntegerType, true)
    .add("alm_common_temp_diff", IntegerType, true)
    .add("data_batt_temp_probe_highest_seq", IntegerType, true)
    .add("alm_common_dm_temp", IntegerType, true)
    .add("alm_common_soc_high", IntegerType, true)
    .add("alm_eng_cnt", IntegerType, true)
    .add("alm_common_esd_high", IntegerType, true)
    .add("alm_common_esd_unmatch", IntegerType, true)
    .add("alm_common_soc_low", IntegerType, true)
    .add("alm_esd_cnt", IntegerType, true)
    .add("veh_curr",DoubleType, true)
    .add("esd_temp_data", ArrayType(new StructType()
      .add("esd_temp_probe_cnt", IntegerType, true)
      .add("esd_temp_probe_list", ArrayType(IntegerType, true), true)
      .add("esd_temp_subsys_seq", IntegerType, true)
    ), true)
    .add("esd_temp_subsys_cnt", IntegerType, true)
    .add("veh_pedal_deep", IntegerType, true)
    .add("esd_volt_data", ArrayType(new StructType()
      .add("esd_curr", DoubleType, true)
      .add("esd_frame_sc_cnt", IntegerType, true)
      .add("esd_frame_sc_list", ArrayType(DoubleType, true), true)
      .add("esd_frame_start", IntegerType, true)
      .add("esd_sc_cnt", IntegerType, true)
      .add("esd_volt", DoubleType, true)
      .add("esd_volt_subsys_seq", IntegerType, true)
    ), true)
    .add("alm_others_cnt", IntegerType, true)
    .add("data_batt_sc_volt_highest", DoubleType, true)
    .add("veh_chargest", IntegerType, true)
    .add("alm_common_esd_charge_over", IntegerType, true)
    .add("ts", LongType, true)
    .add("alm_common_sc_consistency", IntegerType, true)
    .add("eng_consumption", DoubleType, true)
    .add("veh_pedalst", IntegerType, true)
    .add("data_batt_subsys_temp_lowest_seq", IntegerType, true)
    .add("loc_lon84", DoubleType, true)
    .add("esd_volt_subsys_cnt", IntegerType, true)
    .add("data_batt_temp_lowestest", IntegerType, true)
    .add("data_batt_subsys_volt_lowest_seq", IntegerType, true)
    .add("alm_common_brk", IntegerType, true)
    .add("alm_common_esd_low", IntegerType, true)
    .add("veh_insulation", IntegerType, true)
    .add("veh_volt", DoubleType, true)
    .add("alm_common_temp_high", IntegerType, true)
    .add("alm_dm_cnt", IntegerType, true)
    .add("alm_lvl_higest", IntegerType, true)
    .add("vin", StringType, true)
    .add("veh_odo", DoubleType, true)
    .add("data_batt_subsys_temp_highest_seq", IntegerType, true)
    .add("data_batt_sc_volt_lowest_seq", IntegerType, true)
    .add("loc_lat84", DoubleType, true)
    .add("eng_st", IntegerType, true)
    .add("veh_soc", IntegerType, true)
    .add("data_batt_subsys_volt_highest_seq", IntegerType, true)
    .add("eng_spd", IntegerType, true)
    .add("alm_common_insulation", IntegerType, true)
    .add("alm_common_soc_hop", IntegerType, true)
    .add("alm_common_flag", StringType, true)
    .add("alm_common_dcdc_temp", IntegerType, true)
    .add("data_batt_temp_probe_lowest_seq", IntegerType, true)
    .add("veh_runmode", IntegerType, true)
    .add("veh_st", IntegerType, true)
    .add("veh_spd", DoubleType, true)
    .add("data_batt_temp_highest", IntegerType, true)
    .add("loc_st", IntegerType, true)
    .add("alm_common_sc_low", IntegerType, true)
    .add("veh_gear", StringType, true)
    .add("alm_common_dmc_temp", IntegerType, true)
    .add("alm_common_sc_high", IntegerType, true)
    .add("data_batt_sc_volt_highest_seq", IntegerType, true)
    .add("alm_common_hvil_st", IntegerType, true)
    .add("dm_data", ArrayType(new StructType()
      .add("dm_ctl_dc_curr", DoubleType, true)
      .add("dm_ctl_temp", IntegerType, true)
      .add("dm_ctl_volt", DoubleType, true)
      .add("dm_seq", IntegerType, true)
      .add("dm_spd", IntegerType, true)
      .add("dm_st", IntegerType, true)
      .add("dm_temp", IntegerType, true)
      .add("dm_torq", IntegerType, true)
    ), true)
}

object ParquetSchema {

}

将 JSON 写回 Kafka #

package ohmysummer.utils

import ohmysummer.pipeline.schema.ParquetSchema
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.functions.{to_json, col, struct}

object WriteParquet2Kafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local[2]")
      .appName("Write Parquet to Kafka using Structured Streaming")
      .config("spark.driver.host", "localhost")
      .getOrCreate()

    val parquetSchema = (new ParquetSchema).schema

    val ds: DataFrame = spark.readStream
      .schema(parquetSchema)
      .parquet("/Users/ohmycloud/work/cihon/gac/data/decoded/vintype=A5HEVAVNT/d=20180825")

    // 将 parquet 文件转成 json 格式
    import spark.implicits._

    // https://stackoverflow.com/questions/47907488/convert-a-spark-structured-streaming-dataframe-into-json
    val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )

    /*
    val console = df.writeStream
      .format("console")
      .option("truncate", false)
      .outputMode(OutputMode.Append())
      .start("/tmp/gac")

    console.awaitTermination()
  */

    val ddf = df
      .writeStream
      .format("kafka")
      .option("topic", "gac")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("checkpointLocation", "/tmp/gac")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    ddf.awaitTermination()
  }
}