读取 Parquet 并写回 Kafka

读取 parquet 文件发到 Kafka

定义 schema

根据

1
2
val df = spark.read.parquet("xxxx/part-*")
df.printSchema

得到的 schema 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
root
|-- alm_dm_list: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- alm_esd_list: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- alm_others_list: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- data_batt_sc_volt_lowest: double (nullable = true)
|-- veh_dcdcst: integer (nullable = true)
|-- alm_common_dcdc_st: integer (nullable = true)
|-- dm_cnt: integer (nullable = true)
|-- alm_common_temp_diff: integer (nullable = true)
|-- data_batt_temp_probe_highest_seq: integer (nullable = true)
|-- alm_common_dm_temp: integer (nullable = true)
|-- alm_common_soc_high: integer (nullable = true)
|-- alm_eng_cnt: integer (nullable = true)
|-- alm_common_esd_high: integer (nullable = true)
|-- alm_common_esd_unmatch: integer (nullable = true)
|-- alm_common_soc_low: integer (nullable = true)
|-- alm_esd_cnt: integer (nullable = true)
|-- veh_curr: double (nullable = true)
|-- esd_temp_data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- esd_temp_probe_cnt: integer (nullable = true)
| | |-- esd_temp_probe_list: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- esd_temp_subsys_seq: integer (nullable = true)
|-- esd_temp_subsys_cnt: integer (nullable = true)
|-- veh_pedal_deep: integer (nullable = true)
|-- esd_volt_data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- esd_curr: double (nullable = true)
| | |-- esd_frame_sc_cnt: integer (nullable = true)
| | |-- esd_frame_sc_list: array (nullable = true)
| | | |-- element: double (containsNull = true)
| | |-- esd_frame_start: integer (nullable = true)
| | |-- esd_sc_cnt: integer (nullable = true)
| | |-- esd_volt: double (nullable = true)
| | |-- esd_volt_subsys_seq: integer (nullable = true)
|-- alm_others_cnt: integer (nullable = true)
|-- data_batt_sc_volt_highest: double (nullable = true)
|-- veh_chargest: integer (nullable = true)
|-- alm_common_esd_charge_over: integer (nullable = true)
|-- ts: long (nullable = true)
|-- alm_common_sc_consistency: integer (nullable = true)
|-- eng_consumption: double (nullable = true)
|-- veh_pedalst: integer (nullable = true)
|-- data_batt_subsys_temp_lowest_seq: integer (nullable = true)
|-- loc_lon84: double (nullable = true)
|-- esd_volt_subsys_cnt: integer (nullable = true)
|-- data_batt_temp_lowestest: integer (nullable = true)
|-- data_batt_subsys_volt_lowest_seq: integer (nullable = true)
|-- alm_common_brk: integer (nullable = true)
|-- alm_common_esd_low: integer (nullable = true)
|-- veh_insulation: integer (nullable = true)
|-- veh_volt: double (nullable = true)
|-- alm_common_temp_high: integer (nullable = true)
|-- alm_dm_cnt: integer (nullable = true)
|-- alm_lvl_higest: integer (nullable = true)
|-- vin: string (nullable = true)
|-- veh_odo: double (nullable = true)
|-- data_batt_subsys_temp_highest_seq: integer (nullable = true)
|-- data_batt_sc_volt_lowest_seq: integer (nullable = true)
|-- loc_lat84: double (nullable = true)
|-- eng_st: integer (nullable = true)
|-- veh_soc: integer (nullable = true)
|-- data_batt_subsys_volt_highest_seq: integer (nullable = true)
|-- eng_spd: integer (nullable = true)
|-- alm_common_insulation: integer (nullable = true)
|-- alm_common_soc_hop: integer (nullable = true)
|-- alm_common_flag: string (nullable = true)
|-- alm_common_dcdc_temp: integer (nullable = true)
|-- data_batt_temp_probe_lowest_seq: integer (nullable = true)
|-- veh_runmode: integer (nullable = true)
|-- veh_st: integer (nullable = true)
|-- veh_spd: double (nullable = true)
|-- data_batt_temp_highest: integer (nullable = true)
|-- loc_st: integer (nullable = true)
|-- alm_common_sc_low: integer (nullable = true)
|-- veh_gear: string (nullable = true)
|-- alm_common_dmc_temp: integer (nullable = true)
|-- alm_common_sc_high: integer (nullable = true)
|-- data_batt_sc_volt_highest_seq: integer (nullable = true)
|-- alm_common_hvil_st: integer (nullable = true)
|-- dm_data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dm_ctl_dc_curr: double (nullable = true)
| | |-- dm_ctl_temp: integer (nullable = true)
| | |-- dm_ctl_volt: double (nullable = true)
| | |-- dm_seq: integer (nullable = true)
| | |-- dm_spd: integer (nullable = true)
| | |-- dm_st: integer (nullable = true)
| | |-- dm_temp: integer (nullable = true)
| | |-- dm_torq: integer (nullable = true)

从而构造出我们在 JSON 转换时用到的 schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package ohmysummer.pipeline.schema

import org.apache.spark.sql.types._

class ParquetSchema {
val schema = new StructType()
.add("alm_dm_list", ArrayType(IntegerType, true), true)
.add("alm_esd_list", ArrayType(IntegerType, true), true)
.add("alm_others_list", ArrayType(IntegerType, true), true)
.add("data_batt_sc_volt_lowest", DoubleType, true)
.add("veh_dcdcst", IntegerType, true)
.add("alm_common_dcdc_st", IntegerType, true)
.add("dm_cnt", IntegerType, true)
.add("alm_common_temp_diff", IntegerType, true)
.add("data_batt_temp_probe_highest_seq", IntegerType, true)
.add("alm_common_dm_temp", IntegerType, true)
.add("alm_common_soc_high", IntegerType, true)
.add("alm_eng_cnt", IntegerType, true)
.add("alm_common_esd_high", IntegerType, true)
.add("alm_common_esd_unmatch", IntegerType, true)
.add("alm_common_soc_low", IntegerType, true)
.add("alm_esd_cnt", IntegerType, true)
.add("veh_curr",DoubleType, true)
.add("esd_temp_data", ArrayType(new StructType()
.add("esd_temp_probe_cnt", IntegerType, true)
.add("esd_temp_probe_list", ArrayType(IntegerType, true), true)
.add("esd_temp_subsys_seq", IntegerType, true)
), true)
.add("esd_temp_subsys_cnt", IntegerType, true)
.add("veh_pedal_deep", IntegerType, true)
.add("esd_volt_data", ArrayType(new StructType()
.add("esd_curr", DoubleType, true)
.add("esd_frame_sc_cnt", IntegerType, true)
.add("esd_frame_sc_list", ArrayType(DoubleType, true), true)
.add("esd_frame_start", IntegerType, true)
.add("esd_sc_cnt", IntegerType, true)
.add("esd_volt", DoubleType, true)
.add("esd_volt_subsys_seq", IntegerType, true)
), true)
.add("alm_others_cnt", IntegerType, true)
.add("data_batt_sc_volt_highest", DoubleType, true)
.add("veh_chargest", IntegerType, true)
.add("alm_common_esd_charge_over", IntegerType, true)
.add("ts", LongType, true)
.add("alm_common_sc_consistency", IntegerType, true)
.add("eng_consumption", DoubleType, true)
.add("veh_pedalst", IntegerType, true)
.add("data_batt_subsys_temp_lowest_seq", IntegerType, true)
.add("loc_lon84", DoubleType, true)
.add("esd_volt_subsys_cnt", IntegerType, true)
.add("data_batt_temp_lowestest", IntegerType, true)
.add("data_batt_subsys_volt_lowest_seq", IntegerType, true)
.add("alm_common_brk", IntegerType, true)
.add("alm_common_esd_low", IntegerType, true)
.add("veh_insulation", IntegerType, true)
.add("veh_volt", DoubleType, true)
.add("alm_common_temp_high", IntegerType, true)
.add("alm_dm_cnt", IntegerType, true)
.add("alm_lvl_higest", IntegerType, true)
.add("vin", StringType, true)
.add("veh_odo", DoubleType, true)
.add("data_batt_subsys_temp_highest_seq", IntegerType, true)
.add("data_batt_sc_volt_lowest_seq", IntegerType, true)
.add("loc_lat84", DoubleType, true)
.add("eng_st", IntegerType, true)
.add("veh_soc", IntegerType, true)
.add("data_batt_subsys_volt_highest_seq", IntegerType, true)
.add("eng_spd", IntegerType, true)
.add("alm_common_insulation", IntegerType, true)
.add("alm_common_soc_hop", IntegerType, true)
.add("alm_common_flag", StringType, true)
.add("alm_common_dcdc_temp", IntegerType, true)
.add("data_batt_temp_probe_lowest_seq", IntegerType, true)
.add("veh_runmode", IntegerType, true)
.add("veh_st", IntegerType, true)
.add("veh_spd", DoubleType, true)
.add("data_batt_temp_highest", IntegerType, true)
.add("loc_st", IntegerType, true)
.add("alm_common_sc_low", IntegerType, true)
.add("veh_gear", StringType, true)
.add("alm_common_dmc_temp", IntegerType, true)
.add("alm_common_sc_high", IntegerType, true)
.add("data_batt_sc_volt_highest_seq", IntegerType, true)
.add("alm_common_hvil_st", IntegerType, true)
.add("dm_data", ArrayType(new StructType()
.add("dm_ctl_dc_curr", DoubleType, true)
.add("dm_ctl_temp", IntegerType, true)
.add("dm_ctl_volt", DoubleType, true)
.add("dm_seq", IntegerType, true)
.add("dm_spd", IntegerType, true)
.add("dm_st", IntegerType, true)
.add("dm_temp", IntegerType, true)
.add("dm_torq", IntegerType, true)
), true)
}

object ParquetSchema {

}

将 JSON 写回 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package ohmysummer.utils

import ohmysummer.pipeline.schema.ParquetSchema
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.functions.{to_json, col, struct}

object WriteParquet2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.appName("Write Parquet to Kafka using Structured Streaming")
.config("spark.driver.host", "localhost")
.getOrCreate()

val parquetSchema = (new ParquetSchema).schema

val ds: DataFrame = spark.readStream
.schema(parquetSchema)
.parquet("/Users/ohmycloud/work/cihon/gac/data/decoded/vintype=A5HEVAVNT/d=20180825")

// 将 parquet 文件转成 json 格式
import spark.implicits._

// https://stackoverflow.com/questions/47907488/convert-a-spark-structured-streaming-dataframe-into-json
val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )

/*
val console = df.writeStream
.format("console")
.option("truncate", false)
.outputMode(OutputMode.Append())
.start("/tmp/gac")

console.awaitTermination()
*/

val ddf = df
.writeStream
.format("kafka")
.option("topic", "gac")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/gac")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()

ddf.awaitTermination()
}
}