Wait the light to fall

使用 Spark 读取 HBase

焉知非鱼
package wmstat.trip

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scopt.OptionParser
import wmutils.WmTimeUtil._
import scala.collection.mutable.ArrayBuffer

/**
  * 提交方式
  * spark2-submit  --name HBASE-CONNECTOR --files /etc/hbase/conf/hbase-site.xml --class wmhbase.MileageAnxiety  --master yarn    --deploy-mode client  --driver-memory 2g    --driver-cores 2    --executor-memory 2g    --executor-cores 1    --num-executors 2 wmanxiety-1.0-SNAPSHOT.jar --day 20180810 --repartition 500 --interval 7
  */

object MileageAnxiety {
  def cat = s"""{
               |"table":{"namespace":"default", "name":"trip_signal", "tableCoder":"PrimitiveType"},
               |"rowkey":"key",
               |"columns":{
               |"rowkey"       :{"cf":"rowkey", "col":"key",            "type":"string"},
               |"vin"          :{"cf":"info",   "col":"vin",            "type":"string"},
               |"tripStatus"   :{"cf":"info",   "col":"tripStatus",     "type":"string"},
               |"tripStartTime":{"cf":"info",   "col":"tripStartTime",  "type":"string"},
               |"tripEndTime"  :{"cf":"info",   "col":"tripEndTime",    "type":"string"},
               |"tripDistance" :{"cf":"info",   "col":"tripDistance",   "type":"string"},
               |"startSoc"     :{"cf":"info",   "col":"startSoc",       "type":"string"},
               |"endSoc"       :{"cf":"info",   "col":"endSoc",         "type":"string"},
               |"maxSpeed"     :{"cf":"info",   "col":"maxSpeed",       "type":"string"},
               |"startMileage" :{"cf":"info",   "col":"startMileage",   "type":"string"},
               |"coordinate"   :{"cf":"info",   "col":"coordinate",     "type":"string"}
               |}
               |}""".stripMargin

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local[2]")
      .getOrCreate()

    case class Params(day: String = "", repartition: Int = 200, interval: Int = 7) // 设置日期和分区和间隔的默认值

    // 定义一个命令行解析器
    val parser = new OptionParser[Params]("MileageAnxiety") { // OptionParser 用于解析命名行参数, 生成 help 信息
      head("MileageAnxiety: pull data from HBase") // 打印用途信息

      opt[String]('d', "day")
        .text("fill data on demand day, yyyymmss format")
        .action((x, c) => c.copy(day = x))

      opt[Int]('r', "repartition")
        .text("the partition of dataframe")
        .action((x, c) => c.copy(repartition = x))

      opt[Int]('i', "interval")
        .text("the interval of days, 7,30 or 90")
        .action((x, c) => c.copy(interval = x))

      help("help").text("prints this usage text")
    }

    // 解析给定的 args
    parser.parse(args, Params()) match { // 解析命令行参数
      case Some(params) => run(params)   // 如果传参, 将参数传递给 run 方法
      case _ => sys.exit(1)              // 没有传就退出
    }

    def run(params: Params): Unit = {
      println(params)

    val sc = spark.sparkContext
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._

    def withCatalog(cat: String): DataFrame = {
      sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog -> cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
    }

    val df = withCatalog(cat)
      df.show()

    var dtype: Int = 0
    params.interval match {
      case 7  => dtype = 0
      case 30 => dtype = 1
      case 90 => dtype = 2
      case _  => dtype = 3
    }

    val arrayWeek: ArrayBuffer[String] =  lastestNdays(params.day, params.interval)
    val function: (String => Boolean) = (col: String) => arrayWeek.contains( col.split("_")(1).substring(0,8))
    val udfFiltering = udf(function)

    // 筛选 Row Key
    val dfWeek = df.repartition(params.repartition).filter(udfFiltering(col("ROWKEY")))

    // 里程值
    val dfTrip: DataFrame = dfWeek.select("rowkey", "vin", "tripDistance", "tripStartTime", "tripEndTime")
      .withColumn("driverTime", $"tripEndTime" - $"tripStartTime")
      .groupBy("vin")
      .agg(
        sum("tripDistance") as "mileage", // 每辆车的里程
        sum($"driverTime") as "driverTime",            // 每辆车的行驶时长
        count($"rowkey") as "tripCount"                // 每辆车的行程次数
      )
      .agg(
        sum("mileage") as "totalMileage",        // 全国所有车辆周里程总和
        sum("driverTime") as "totalDriverTime",  // 全国所有车辆行驶时长总和
        sum("tripCount") as "totalTripCount",   // 全国所有车辆行程次数总和
        countDistinct("vin") as "totalVin"      // 全国所有车辆总和
      ).withColumn("dtype", lit(dtype))            // 0-最近1周, 1-最近一个月, 2-最近3个月
       .withColumn("dtime", lit(arrayWeek(0)))     // 昨天的日期或命令行指定的日期

     val dfDao:TripAnalysis = new TripDao()
     dfDao.trip2db(dfTrip)
     spark.stop()
  }
  }
}