sink-to-hbase-using-structured-streaming

编译 SHC

截至目前, Structured Streaming 中的 Sink 还不支持 HBase! Stackoverflow 上有一个解决方案是用第三方的 shc自定义自己的 HBase Sink。有人按照第二个答案说搞不出来, 我也照着做了一遍, 发现是真的! 继续往下翻, 发现 github 上有个人做了一丢丢的修改, 于是我照抄了过来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.apache.spark.sql.execution.datasources.hbase

import org.apache.spark.sql.{DataFrame, SQLContext, Row}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.catalyst.CatalystTypeConverters

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {

val schema = data.schema

val res = data.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}

val df = data.sparkSession.createDataFrame(res, schema)

df.write
.options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase").save()
}
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
}

def shortName(): String = "hbase"
}

把上面的代码起个名字叫 HBaseSinkProvider.scala, 放在 shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase 目录下:

img

这个 SHC 也是大坑,

1
git clone https://github.com/hortonworks-spark/shc.git

下载完这货用 IDEA 打开之后, 到处都是红色的波浪线。pom 文件也红色感叹号!但是它就是能够编译起来:

  • 编译
1
mvn package -DskipTests

甚至运行 test:

  • 运行 Tests 和 Examples
1
mvn clean package test

按照文档 果然没错, 虽然编译测试用了 8 分钟。pom 文件我只该了一处, 就是 spark 的版本号, 我本地是 2.3.2, shc 它 bump 到了最新 2.4.0, 好像不改还不行!

按照 stackoverflow 这个答案所说, 结合 这个补充答案, 我重新编译了 shc。

HBase 配置

  • 版本

注意我们上面编译 SHC 的时候, pom 文件里面的 HBase 版本是 2.0.2, 所以我们也下载这个版本, 那其它的版本可以吗? 我不知道, 我只知道我使用了 1.2.0, 1.2.8 的都有问题:找不到某个类了, 类没有定义拉之类的。为了少惹麻烦, 我们还是用 2.0.2, 和 SHC 里面的保持一致。

  • 配置

hbase-env.sh 中导出 JAVA_HOME

1
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home

hbase-site.xml 中添加/修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<configuration>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>/Users/ohmycloud/opt/tmp/myhbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/Users/ohmycloud/opt/tmp/myhbase/zookeeper</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
</configuration>
  • ZooKeeper

写 HBase 的时候, 总是遇到一个报错:

The node /hbase is not in ZooKeeper. It should have been written by the master. Check the value configured in ‘zookeeper.znode.parent’.

原因是 ZooKeeper 里面没有 hbase 这个节点!由于我用的是 zookeeper 是用 docker 启的, 所以要查看这个 zookeeper 里面是不是有 hbase 节点, 还得多费点电:

1
2
3
4
 docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
561180d16b55 wurstmeister/kafka "start-kafka.sh" About an hour ago Up About an hour 0.0.0.0:9092->9092/tcp docker_kafka_1
61a36418ea7f wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About an hour ago Up About an hour 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp docker_zookeeper_1

看到 zookeeper 的 container id 为 61a36418ea7f, 我们直接进入到这个容器中:

1
2
docker exec -it 61a36418ea7f bash 
root@61a36418ea7f:/opt/zookeeper-3.4.9#

发现可吧, shell 界面变了, 我们进入了 zookeeper 容器中。

启动 zookeeper 命令行:

1
./bin/zkCli.sh

ls 一下, 列出里面的节点:

1
[log_dir_event_notification, isr_change_notification, zookeeper, admin, consumers, cluster, config, latest_producer_id_block, controller, brokers, controller_epoch]

发现没有 hbase! 那我们就手动创建一个, 还是在 zookeeper 命令行界面里面:

1
create /hbase ohmyhbase

然后重启 hbase:

1
2
sudo ./stop-hbase.sh
sudo ./start-hbase.sh

然后简单地创建一个测试表:

1
create "ohmytest", {NAME=>'info'}

Structurd Streaming 程序

这个程序就是个测试用的, 它读取本地的 Parquet 文件, 选取部分字段写到 HBase:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package ohmysummer.utils

import ohmysummer.pipeline.schema.ParquetSchema
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

/**
* 读取 parquet 文件写到 HBase
*/
object WriteParquet2Hbase {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.appName("Write Parquet to HBase using Structured Streaming")
.config("spark.driver.host", "localhost")
.getOrCreate()

val parquetSchema = (new ParquetSchema).schema

val ds: DataFrame = spark.readStream
.schema(parquetSchema)
.parquet("/Users/ohmycloud/work/cihon/gac/data/decoded/vintype=A5HEVAVNT/d=20180825")

import spark.implicits._

val df2: DataFrame = ds.select(
$"vin",
$"veh_spd",
$"veh_soc",
$"loc_lon84",
$"loc_lat84"
)

def catalog = s"""{
|"table":{"namespace":"default", "name":"ohmytest"},
|"rowkey":"vin",
|"columns":{
|"vin":{"cf":"rowkey", "col":"vin", "type":"string"},
|"veh_spd":{"cf":"info", "col":"veh_spd", "type":"string"},
|"veh_soc":{"cf":"info", "col":"veh_soc", "type":"string"},
|"loc_lon84":{"cf":"info", "col":"loc_lon84", "type":"string"},
|"loc_lat84":{"cf":"info", "col":"loc_lat84", "type":"string"}
|}
|}""".stripMargin

val ddf = df2.writeStream
.queryName("hbase writer")
.format("org.apache.spark.sql.execution.datasources.hbase.HBaseSinkProvider")
.option("checkpointLocation", "/tmp/gac")
.option("hbasecat", catalog)
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()

ddf.awaitTermination()
}
}

提交命令:

1
2
3
4
5
6
7
8
9
10
11
spark-submit \
--class ohmysummer.utils.WriteParquet2Hbase \
--jars /Users/ohmycloud/Downloads/shc/core/target/shc-core-1.1.3-2.3-s_2.11.jar \
--master local[2] \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 2 \
--files /Users/ohmycloud/opt/hbase-2.0.2/conf/hbase-site.xml
target/socket-structured-streaming-1.0-SNAPSHOT.jar

这一小段我运行了几十次, 每次都失败了, 就在半个小时前, 终于修复好了, 能写到 HBase 了, 因为它打印了一段这样的信息, 我就知道有普了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2018-12-11 17:19:30 INFO  MicroBatchExecution:54 - Streaming query made progress: {
"id" : "eb3f8490-eecb-407e-879f-c1e2404e9590",
"runId" : "b81a766e-3c8a-44eb-979d-9988c8af6f1a",
"name" : "hbase writer",
"timestamp" : "2018-12-11T09:19:30.002Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 6,
"triggerExecution" : 7
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/Users/ohmycloud/work/cihon/gac/data/decoded/vintype=A5HEVAVNT/d=20180825]",
"startOffset" : {
"logOffset" : 0
},
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.datasources.hbase.HBaseSink@5ab0cadb"
}
}

到 hbase shell 中 scan 一下, 看到结果了:

1
2
3
4
5
6
7
hbase(main):016:0> scan 'ohmytest'
ROW COLUMN+CELL
LMWHP1S88J1S00013 column=info:loc_lat84, timestamp=1544517115189, value=@7\x1A\xF5U\x82\x12\x94
LMWHP1S88J1S00013 column=info:loc_lon84, timestamp=1544517115189, value=@\x5C\x5C\x87L\x8F\xFB\x8B
LMWHP1S88J1S00013 column=info:veh_soc, timestamp=1544517115189, value=\x00\x00\x00U
LMWHP1S88J1S00013 column=info:veh_spd, timestamp=1544517115189, value=\x00\x00\x00\x00\x00\x00\x00\x00
hbase(main):017:0>

pom 文件

主要是 hbase 的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
<type>pom</type>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>