Wait the light to fall

用 parquet 数据模拟实时数据流

焉知非鱼

用 parquet 数据模拟实时数据流

import ohmysummer.conf.{KafkaConfiguration, UriConfiguration}
import ohmysummer.pipeline.schema.{EnterpriseSchema, NationSchema}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions.{col, struct, to_json}

/**
  * 读取 parquet 文件转为 JSON 后写到 HDFS, 在用命令行将 JSON 数据逐行发到 Kakfa 模拟实时流
  */
object WriteEnterprise2Kafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local[2]")
      .appName("Write Enterprise Parquet to Kafka")
      .getOrCreate()

    val parquetSchema = (new EnterpriseSchema).schema
    val parqurtUri = (new UriConfiguration).xs6enterprise
    val topics = (new KafkaConfiguration).topics
    val bootstrap_servers =  (new KafkaConfiguration).bootstrap_servers

    import spark.implicits._
    val ds: DataFrame = spark.readStream
      .schema(parquetSchema)
      .parquet(parqurtUri)
      .filter(($"timestamp" isNotNull) && ($"timestamp" > 956678797000L) && ($"timestamp" < 1924876800000L) )

    val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )
      .filter($"key" isNotNull)

    // 将 parquet 写为 json
    val jdf = df
      .writeStream
      .format("json")
      .option("path", "/tmp/json/nation")
      .option("checkpointLocation", "/tmp/write-json2hdfs")
      .start()

    jdf.awaitTermination()
  
  }
}

再将 JSON 数据逐行发到 Kafka 的不同 topic:

hdfs dfs -cat hdfs://xxxxxx/json/test.json | while read -r LINE; do echo $LINE | sed "s/\"}$/\",\"partition\":$(( ( RANDOM % 5 )  + 1 ))}/"; sleep 1; done  | kt produce -topic xs6-nation-test -brokers "dn03,nn01,nn02" ; sleep 0.1; done