使用 Spark 读取 HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package wmstat.trip

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scopt.OptionParser
import wmutils.WmTimeUtil._
import scala.collection.mutable.ArrayBuffer

/**
* 提交方式
* spark2-submit --name HBASE-CONNECTOR --files /etc/hbase/conf/hbase-site.xml --class wmhbase.MileageAnxiety --master yarn --deploy-mode client --driver-memory 2g --driver-cores 2 --executor-memory 2g --executor-cores 1 --num-executors 2 wmanxiety-1.0-SNAPSHOT.jar --day 20180810 --repartition 500 --interval 7
*/

object MileageAnxiety {
def cat = s"""{
|"table":{"namespace":"default", "name":"trip_signal", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"rowkey" :{"cf":"rowkey", "col":"key", "type":"string"},
|"vin" :{"cf":"info", "col":"vin", "type":"string"},
|"tripStatus" :{"cf":"info", "col":"tripStatus", "type":"string"},
|"tripStartTime":{"cf":"info", "col":"tripStartTime", "type":"string"},
|"tripEndTime" :{"cf":"info", "col":"tripEndTime", "type":"string"},
|"tripDistance" :{"cf":"info", "col":"tripDistance", "type":"string"},
|"startSoc" :{"cf":"info", "col":"startSoc", "type":"string"},
|"endSoc" :{"cf":"info", "col":"endSoc", "type":"string"},
|"maxSpeed" :{"cf":"info", "col":"maxSpeed", "type":"string"},
|"startMileage" :{"cf":"info", "col":"startMileage", "type":"string"},
|"coordinate" :{"cf":"info", "col":"coordinate", "type":"string"}
|}
|}""".stripMargin

def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()

case class Params(day: String = "", repartition: Int = 200, interval: Int = 7) // 设置日期和分区和间隔的默认值

// 定义一个命令行解析器
val parser = new OptionParser[Params]("MileageAnxiety") { // OptionParser 用于解析命名行参数, 生成 help 信息
head("MileageAnxiety: pull data from HBase") // 打印用途信息

opt[String]('d', "day")
.text("fill data on demand day, yyyymmss format")
.action((x, c) => c.copy(day = x))

opt[Int]('r', "repartition")
.text("the partition of dataframe")
.action((x, c) => c.copy(repartition = x))

opt[Int]('i', "interval")
.text("the interval of days, 7,30 or 90")
.action((x, c) => c.copy(interval = x))

help("help").text("prints this usage text")
}

// 解析给定的 args
parser.parse(args, Params()) match { // 解析命令行参数
case Some(params) => run(params) // 如果传参, 将参数传递给 run 方法
case _ => sys.exit(1) // 没有传就退出
}

def run(params: Params): Unit = {
println(params)

val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(cat)
df.show()

var dtype: Int = 0
params.interval match {
case 7 => dtype = 0
case 30 => dtype = 1
case 90 => dtype = 2
case _ => dtype = 3
}

val arrayWeek: ArrayBuffer[String] = lastestNdays(params.day, params.interval)
val function: (String => Boolean) = (col: String) => arrayWeek.contains( col.split("_")(1).substring(0,8))
val udfFiltering = udf(function)

// 筛选 Row Key
val dfWeek = df.repartition(params.repartition).filter(udfFiltering(col("ROWKEY")))

// 里程值
val dfTrip: DataFrame = dfWeek.select("rowkey", "vin", "tripDistance", "tripStartTime", "tripEndTime")
.withColumn("driverTime", $"tripEndTime" - $"tripStartTime")
.groupBy("vin")
.agg(
sum("tripDistance") as "mileage", // 每辆车的里程
sum($"driverTime") as "driverTime", // 每辆车的行驶时长
count($"rowkey") as "tripCount" // 每辆车的行程次数
)
.agg(
sum("mileage") as "totalMileage", // 全国所有车辆周里程总和
sum("driverTime") as "totalDriverTime", // 全国所有车辆行驶时长总和
sum("tripCount") as "totalTripCount", // 全国所有车辆行程次数总和
countDistinct("vin") as "totalVin" // 全国所有车辆总和
).withColumn("dtype", lit(dtype)) // 0-最近1周, 1-最近一个月, 2-最近3个月
.withColumn("dtime", lit(arrayWeek(0))) // 昨天的日期或命令行指定的日期

val dfDao:TripAnalysis = new TripDao()
dfDao.trip2db(dfTrip)
spark.stop()
}
}
}