使用 Spark HBase Connector 读取 HBase

pom 依赖:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.hortonworks/shc-core -->
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.1-2.1-s_2.11</version>
</dependency>

部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package xxxx.xxxx

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* 提交方式
* spark2-submit --name HBASE-CONNECTOR --files /etc/hbase/conf/hbase-site.xml --class xxx.xxxx --master yarn --deploy-mode cluster --driver-memory 2g --driver-cores 2 --executor-memory 2g --executor-cores 1 --num-executors 2 wmanxiety-1.0-SNAPSHOT.jar --day 20180810 --repartition 500 --interval 7
*/

object MileageAnxiety {
def cat = s"""{
|"table":{"namespace":"default", "name":"trip_signal", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"rowkey" :{"cf":"rowkey", "col":"key", "type":"string"},
|"vin" :{"cf":"info", "col":"vin", "type":"string"},
|"tripStatus" :{"cf":"info", "col":"tripStatus", "type":"string"},
|"tripStartTime":{"cf":"info", "col":"tripStartTime", "type":"string"},
|"tripEndTime" :{"cf":"info", "col":"tripEndTime", "type":"string"},
|"tripDistance" :{"cf":"info", "col":"tripDistance", "type":"string"},
|"startSoc" :{"cf":"info", "col":"startSoc", "type":"string"},
|"endSoc" :{"cf":"info", "col":"endSoc", "type":"string"},
|"maxSpeed" :{"cf":"info", "col":"maxSpeed", "type":"string"},
|"startMileage" :{"cf":"info", "col":"startMileage", "type":"string"},
|"coordinate" :{"cf":"info", "col":"coordinate", "type":"string"}
|}
|}""".stripMargin

def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()

val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(cat)
df.show()
spark.stop()
}
}