用 parquet 数据模拟实时数据流

用 parquet 数据模拟实时数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import ohmysummer.conf.{KafkaConfiguration, UriConfiguration}
import ohmysummer.pipeline.schema.{EnterpriseSchema, NationSchema}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions.{col, struct, to_json}

/**
* 读取 parquet 文件转为 JSON 后写到 HDFS, 在用命令行将 JSON 数据逐行发到 Kakfa 模拟实时流
*/
object WriteEnterprise2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.appName("Write Enterprise Parquet to Kafka")
.getOrCreate()

val parquetSchema = (new EnterpriseSchema).schema
val parqurtUri = (new UriConfiguration).xs6enterprise
val topics = (new KafkaConfiguration).topics
val bootstrap_servers = (new KafkaConfiguration).bootstrap_servers

import spark.implicits._
val ds: DataFrame = spark.readStream
.schema(parquetSchema)
.parquet(parqurtUri)
.filter(($"timestamp" isNotNull) && ($"timestamp" > 956678797000L) && ($"timestamp" < 1924876800000L) )

val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )
.filter($"key" isNotNull)

// 将 parquet 写为 json
val jdf = df
.writeStream
.format("json")
.option("path", "/tmp/json/nation")
.option("checkpointLocation", "/tmp/write-json2hdfs")
.start()

jdf.awaitTermination()

}
}

再将 JSON 数据逐行发到 Kafka 的不同 topic:

1
hdfs dfs -cat hdfs://xxxxxx/json/test.json | while read -r LINE; do echo $LINE | sed "s/\"}$/\",\"partition\":$(( ( RANDOM % 5 )  + 1 ))}/"; sleep 1; done  | kt produce -topic xs6-nation-test -brokers "dn03,nn01,nn02" ; sleep 0.1; done