Spark 中的 --files 参数与 ConfigFactory 工厂方法

Spark 中的 –files 参数与 ConfigFactory 工厂方法

scala 对象

以前有个大数据项目做小程序统计,读取 HDFS 上的 Parquet 文件,统计完毕后,将结果写入到 MySQL 数据库。首先想到的是将 MySQL 的配置写在代码里面:

1
2
3
4
val jdbcUrl  = "jdbc:mysql://127.0.0.1:6606/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
val user = "root"
val password = "averyloooooongword"
val driver = "com.mysql.jdbc.Driver"

properties 文件

如果是测试,生产环境各有一套,那上面的代码就要分别复制俩份,不便于维护!后来知道了可以把配置放在 resources 目录下, 针对本地,测试和生产环境,分别创建不同的 properties 文件:

1
2
3
4
conf.properties  
conf_product.properties
env.properties
local.properties

例如其中的 conf.properties 内容如下:

1
2
3
4
5
6
7
8
9
10
#  测试环境配置

## 数据库配置
jdbc.url=jdbc:mysql://10.0.0.11:3306/ald_xinen_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false
jdbc.user=aldwx
jdbc.pwd=123456
jdbc.driver=com.mysql.jdbc.Driver

# parquet 文件目录
tongji.parquet=hdfs://10.0.0.212:9000/ald_log_parquet

然后在代码里面读取 resource 文件中的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 根据 key 获取 properties 文件中的 value
* @param key properties 文件中等号左边的键
* @return 返回 properties 文件中等号右边的值
*/
public static String getProperty(String key) {
Properties properties = new Properties();
InputStream in = ConfigurationUtil.class.getClassLoader().getResourceAsStream(getEnvProperty("env.conf"));
try {
properties.load(in);
in.close();
} catch (IOException e) {
e.printStackTrace();
}
return (String) properties.get(key);
}

这样解决了多个环境中配置不同的问题,只需要复制多个 properties 文件,根据需要修改就行。但是这种方法不是最优的,因为配置不是结构化的,而是通过注释分割了不同的配置。

conf 文件

resources 目录下的文件如下:

1
2
3
4
5
application.conf              
application.production.conf
application.local.conf
log4j.properties
metrics.properties

ConfigFactory 工厂方法默认会读取 resources 目录下面名为 application.conf 的文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Spark 相关配置
spark {
master = "local[2]"
streaming.batch.duration = 5001 // Would normally be `ms` in config but Spark just wants the Long
eventLog.enabled = true
ui.enabled = true
ui.port = 4040
metrics.conf = metrics.properties
checkpoint.path = "/tmp/checkpoint/telematics-local"
stopper.port = 12345
spark.cleaner.ttl = 3600
spark.cleaner.referenceTracking.cleanCheckpoints = true
}

# Kafka 相关配置
kafka {

metadata.broker.list = "localhost:9092"
zookeeper.connect = "localhost:2181"

topic.dtcdata {
name = "dc-diagnostic-report"
partition.num = 1
replication.factor = 1
}

group.id = "group-rds"
timeOut = "3000"
bufferSize = "100"
clientId = "telematics"
key.serializer.class = "kafka.serializer.StringEncoder"
serializer.class = "com.wm.dtc.pipeline.kafka.SourceDataSerializer"
// serializer.class = "kafka.serializer.DefaultEncoder"
}

# MySQL 配置
mysql {
dataSource.maxLifetime = 800000
dataSource.idleTimeout = 600000
dataSource.maximumPoolSize = 10
dataSource.cachePrepStmts = true
dataSource.prepStmtCacheSize = 250
dataSource.prepStmtCacheSqlLimit = 204800
dataSource.useServerPrepStmts = true
dataSource.useLocalSessionState = true
dataSource.rewriteBatchedStatements = true
dataSource.cacheResultSetMetadata = true
dataSource.cacheServerConfiguration = true
dataSource.elideSetAutoCommits = true
dataSource.maintainTimeStats = false

jdbcUrl="jdbc:mysql://127.0.0.1:6606/wmdtc?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
jdbcDriver="com.mysql.jdbc.Driver"
dataSource.user="root"
dataSource.password="123456"
}

为了验证,我创建了一个 Object 对象:

1
2
3
4
5
6
7
8
9
10
11
package allinone
import com.typesafe.config.ConfigFactory
import scopt.OptionParser

object SparkFilesArgs extends App {
val config = ConfigFactory.load()
val sparkConf = config.getConfig("spark")
val sparkMaster = sparkConf.getString("master")
val sparkDuration = sparkConf.getLong("streaming.batch.duration")
println(sparkMaster, sparkDuration)
}

如果我直接运行就会打印:

1
(local[2],5001)

确实是 application.conf 文件中 Spark 的配置。

但是生产环境我们打算使用另外一个配置文件 application.production.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
spark {
master = "yarn"
streaming.batch.duration = 5002
eventLog.enabled=true
ui.enabled = true
ui.port = 4040
metrics.conf = metrics.properties
checkpoint.path = "/tmp/telematics"
stopper.port = 12345
spark.cleaner.ttl = 3600
spark.cleaner.referenceTracking.cleanCheckpoints = true

trajectory.path = "hdfs://CRRCNameservice/road_matching/output/road_match_result"
city.path = "hdfs://CRRCNameservice/user/root/telematics/data/city.csv"
}

##cassandra相关配置
cassandra {
keyspace = wmdtc
cardata.name = can_signal
trip.name = trip
latest.name = latest
latest.interval = 15000

connection.host = "WMBigdata2,WMBigdata3,WMBigdata4,WMBigdata5,WMBigdata6"
write.consistency_level = LOCAL_ONE
read.consistency_level = LOCAL_ONE
concurrent.writes = 24
batch.size.bytes = 65536
batch.grouping.buffer.size = 1000
connection.keep_alive_ms = 300000
auth.username = cihon
auth.password = cihon
}

kafka {
metadata.broker.list = "WMBigdata2:9092,WMBigdata3:9092,WMBigdata4:9092,WMBigdata5:9092,WMBigdata6:9092"
zookeeper.connect = "WMBigdata2:2181,WMBigdata3:2181,WMBigdata4:2181"

topic.obddata {
name = "wmdtc"
}

group.id = "can_signal"
timeOut = "3000"
bufferSize = "100"
clientId = "telematics"

key.serializer.class = "kafka.serializer.StringEncoder"
serializer.class = "com.wm.telematics.pipeline.kafka.SourceDataSerializer"

}

akka {
loglevel = INFO
stdout-loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]
}

##geoService接口URL
webservice {
url = "http://101.201.108.155:8088/map/roadmessage"
}

##geoService相关配置
geoservice {
timeout = 3
useRealData = false
}

既然 ConfigFactory 方法默认读取 application.conf 文件,但是

1
val config = ConfigFactory.load()

相当于:

1
val config = ConfigFactory.load("application.conf")

但是 load 方法也接受参数:resourceBasename:

1
val config = ConfigFactory.load("application.production") // 加载生产环境的配置

这样在代码里面通过加载不同的配置文件实现本地、测试、生产环境的切换和部署,但是在代码里面读取配置还是不够优美!所以我们有 Spark 的 --files 命令行选项。顾名思义,显而易见,也正如官网所描述的那样, --files 参数后面的值是逗号分割的文本文件, 里面有一个 .conf 文件, load 方法会加载 --files 选项传递过来的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/sh

CONF_DIR=/root/telematics/resources
APP_CONF=application.production.conf
EXECUTOR_JMX_PORT=23339
DRIVER_JMX_PORT=2340

spark-submit \
--name WM_telematics \
--class allinone.SparkFilesArgs \
--master local[*] \
--deploy-mode client \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 1g \
--executor-cores 3 \
--num-executors 3 \
--conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf spark.executor.memoryOverhead=4096 \
--conf spark.driver.memoryOverhead=2048 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.network.timeout=1800s \
--conf spark.scheduler.executorTaskBlacklistTime=30000 \
--conf spark.core.connection.ack.wait.timeout=300s \
--files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
/Users/ohmycloud/work/cihon/sxw/all-in-one/target/allinone-1.0-SNAPSHOT.jar

它打印:

1
(local[*],5002)

因为我在命令行选项中指定了 master 为 local[*], 配置文件为 application.production.conf

resource not found on classpath: application.conf

本地 localhost

jar 包里面我把 application.conf 给删除了,用 --files 传参数给 spark-submit 的方式,但是报:在 classpath 下找不到 application.conf 这个文件了。

cat spark-submit.sh:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
--class $1 \
--master local[2] \
--deploy-mode client \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 4 \
--conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf spark.yarn.executor.memoryOverhead=1024 \
--conf spark.yarn.driver.memoryOverhead=1024 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.submit.waitAppCompletion=false \
--files $CONF_DIR/$APP_CONF \
/Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

原因是 application.conf 文件所在的路径 /Users/ohmycloud/work/cihon/gac/sources 不在 classpath 里面!

使用

1
--driver-class-path /Users/ohmycloud/work/cihon/gac/sources

而非

1
--driver-class-path /Users/ohmycloud/work/cihon/gac/sources/application.conf

来添加 class path。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
--class $1 \
--master local[2] \
--deploy-mode client \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 4 \
--conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf spark.yarn.executor.memoryOverhead=1024 \
--conf spark.yarn.driver.memoryOverhead=1024 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.submit.waitAppCompletion=false \
--driver-class-path /Users/ohmycloud/work/cihon/gac/sources \
--files $CONF_DIR/$APP_CONF \
/Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

yarn 模式

yarn 模式下,不需要添加 driver-class-path 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/sh

CONF_DIR=/root/resources
APP_CONF=application.test.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark2-submit \
--class $1 \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 4 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
--conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.driver.memoryOverhead=1024 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.submit.waitAppCompletion=false \
--files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
target/socket-structured-streaming-1.0-SNAPSHOT.jar

但是实际上, 后来发现有时候不行,所以最好还是加上 driver-class-path!

Attention

attention

我在一个离线程序中给配置文件起了一个不带 application 的名字后,程序就报【找不到某个键了】,该成 application.conf 之后就可以了。

References

Using typesafe config with Spark on Yarn
Externalize properties – typesafe config
Spark Context and Spark Configuration
How to specify custom conf file for Spark Standalone’s master?
Scala Load Configuration With PureConfig
Example: Running a Spark application with optional parameters