sub trip

Use the exa command line instead of ls to list tree of code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
master ~/work/cihon/gac/socket-streaming> exa --tree socket-streaming/src 
socket-streaming/src
└── main
├── resources
│ ├── application.conf
│ ├── application.prd.conf
│ ├── application.test.conf
│ ├── log4j.properties
│ └── metrics.properties
└── scala
└── com
└── gac
└── xs6
└── bigdata
├── BigdataApplication.scala
├── conf
│ └── SparkConfiguration.scala
├── core
│ ├── Adapter.scala
│ └── impl
│ ├── AdapterImpl.scala
│ └── SubTripImpl.scala
├── model
│ ├── SourceData.scala
│ └── TripState.scala
├── module
│ └── MainModule.scala
├── pipeline
│ └── Source.scala
└── util
└── Json4sHelper.scala

package conf is configuration for Spark, package core is the core of subtrip. Adapter.scala defined various stream, well AdapterImpl.scala and SubTripImpl.scala in the package impl implement the logic of stream defined in the Adapter.scala.

  • application.conf
1
2
3
4
5
6
7
8
9
10
11
spark {
master = "local[4]"
streaming.batch.duration = 5000
eventLog.enabled = true
ui.enabled = true
ui.port = 4040
metrics.conf = metrics.properties
spark.cleaner.ttl = 3600
checkpoint.path = "/tmp/telematics"
spark.cleaner.referenceTracking.cleanCheckpoints = true
}
  • SparkConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.gac.xs6.bigdata.conf

import com.typesafe.config.ConfigFactory

/**
* Spark 配置信息
*/
class SparkConfiguration extends Serializable {
private val config = ConfigFactory.load()
lazy val sparkConf = config.getConfig("spark")
lazy val sparkMaster = sparkConf.getString("master")
lazy val sparkUIEnabled = sparkConf.getBoolean("ui.enabled")
lazy val sparkStreamingBatchDuration = sparkConf.getLong("streaming.batch.duration")
lazy val checkPointPath = sparkConf.getString("checkpoint.path")
lazy val stopperPort = sparkConf.getInt("stopper.port")
lazy val ttl = sparkConf.getString("spark.cleaner.ttl")
lazy val cleanCheckpoints = sparkConf.getString("spark.cleaner.referenceTracking.cleanCheckpoints")
}
  • model/SourceData
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.gac.xs6.bigdata.model

/**
* 数据源
* @param vin 车架号
* @param createTime 信号发生时间
* @param mileage 当前里程
*/
case class SourceData (
vin: String,
createTime: Long,
mileage: Long
)
  • model/TripState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.gac.xs6.bigdata.model

/**
* 行程划分结果
* @param vin 车架号
* @param tripStartTime 行程开始时间
* @param tripEndTime 行程结束时间
* @param startMileage 开始里程
* @param tripDistance 行驶距离
* @param tripStatus 行程状态
*/
case class TripState (
vin: String,
var tripStartTime: Long,
var tripEndTime: Long,
var startMileage: Long,
var tripDistance: Long,
var tripStatus: String = "0"
)
  • module
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.gac.xs6.bigdata.module

import com.datastax.spark.connector.util.Logging
import com.gac.xs6.bigdata.BigdataApplication
import com.gac.xs6.bigdata.core.impl.{AdapterImpl, SubTripImpl}
import com.google.inject.{AbstractModule, Provides, Singleton}
import com.gac.xs6.bigdata.conf.SparkConfiguration
import com.gac.xs6.bigdata.core.{Adapter, SubTrip}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats
import com.gac.xs6.bigdata.pipeline.DStreamSource
import com.gac.xs6.bigdata.util.Json4sHelper._

object MainModule extends AbstractModule with Logging {

override def configure(): Unit = {
logInfo("starting main injection module")
bind(classOf[SparkConfiguration]).asEagerSingleton()

bind(classOf[BigdataApplication])
bind(classOf[Adapter]).toInstance(AdapterImpl)
bind(classOf[SubTrip]).toInstance(SubTripImpl)
}

@Provides
@Singleton
def sparkContext(sparkConf: SparkConfiguration): SparkContext = {
SparkContext.getOrCreate(new SparkConf(true)
.setAppName("socketStreaming")
.setMaster(sparkConf.sparkMaster)
.set("spark.cleaner.ttl", sparkConf.ttl) //显示生命定期清理RDD和checkpoint
.set("spark.cleaner.referenceTracking.cleanCheckpoints", sparkConf.cleanCheckpoints) //显示生命定期清理RDD和checkpoint
.set("spark.ui.enabled", sparkConf.sparkUIEnabled.toString)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
)
}

@Provides
@Singleton
def SocketDataSource: DStreamSource[String] = {
new DStreamSource[String] {
def stream(ssc: StreamingContext): DStream[String] = {
implicit val formats = DefaultFormats + StringToBigDecimal + StringToInt + StringToDouble + StringToInstant + StringToLong

val stream: DStream[String] = ssc.socketTextStream("localhost", 3333)
stream
}
}
}
}
  • pipeline/Source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.gac.xs6.bigdata.pipeline

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

trait DStreamSource[M] {
def stream(ssc: StreamingContext): DStream[M]
}

trait RDDSource[M] {
def rdd(sc: SparkContext): RDD[M]
}
  • util/Json4sHelper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.gac.xs6.bigdata.util

import java.util.Locale

import org.joda.time.Instant
import org.joda.time.format.DateTimeFormat
import org.json4s.JsonAST.JString
import org.json4s.{CustomSerializer, DefaultFormats}

import scala.util.Try

/**
* Created by azhe on 18-4-3.
*/
object Json4sHelper {
object StringToBigDecimal extends CustomSerializer[BigDecimal](format => (
{
case JString(x) => Try{
BigDecimal(x.toDouble)
}.recover {
case e: Exception => BigDecimal(0) //适用于OBDData中所有的Decimal类型数据
}.get },
{ case x: BigDecimal => JString(x.toString) }
))

object StringToDouble extends CustomSerializer[Double](format => (
{
case JString(x) => Try{
x.toDouble
}.recover {
case e: Exception => "0.0".toDouble
}.get },
{ case x: Double => JString(x.toString) }
))

object StringToInt extends CustomSerializer[Int](format => (
{
case JString(x) => Try{
x.toInt
}.recover {
case e: Exception => "0".toInt
}.get },
{ case x: Int => JString(x.toString) }
))

object StringToLong extends CustomSerializer[Long](format => (
{
case JString(x) => Try{
x.toLong
}.recover {
case e: Exception => "0".toLong
}.get },
{ case x: Long => JString(x.toString) }
))

val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.CHINA)
val dateTimeFormatter2 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss SSS").withLocale(Locale.CHINA)

object StringToInstant extends CustomSerializer[Instant](format => ( {
case JString(t) => Try{
Instant.parse(t)
}.recover {
case e: IllegalArgumentException => Instant.parse(t, dateTimeFormatter)
}.recover {
case e: IllegalArgumentException => Instant.parse(t, dateTimeFormatter2)
}.recover {
case e: IllegalArgumentException => Instant.parse("1970-01-01 00:00:00",DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC())
}.get
}, {
case x: Instant => JString(x.toString)
}))

val CihonFormats = DefaultFormats + StringToBigDecimal + StringToInt + StringToDouble + StringToInstant+StringToLong
}
  • BigdataApplication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.gac.xs6.bigdata

import javax.inject.{Inject, Singleton}
import com.datastax.spark.connector.util.Logging
import com.google.inject.Guice
import com.typesafe.config.ConfigFactory
import com.gac.xs6.bigdata.BigdataApplication.Params
import com.gac.xs6.bigdata.conf.SparkConfiguration
import com.gac.xs6.bigdata.core.{Adapter, SubTrip}
import com.gac.xs6.bigdata.model.SourceData
import com.gac.xs6.bigdata.module.MainModule
import com.gac.xs6.bigdata.pipeline.DStreamSource
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scopt.OptionParser
import org.apache.spark.SparkContext

object BigdataApplication extends App with Logging {
logInfo("socket streaming started")

val parser = new OptionParser[Params]("BigdataApplication") {
head("socket streaming")

opt[String]('c', "com/xs/telematics/conf")
.text("config.resource for gac")
.action((x, c) => c.copy(conf = x))

help("help").text("prints this usage text")
}

parser.parse(args, Params()) match {
case Some(params) =>
val injector = Guice.createInjector(MainModule)
val runner = injector.getInstance(classOf[BigdataApplication])
ConfigFactory.invalidateCaches()
runner.run(params)
case _ => sys.exit(1)
}

case class Params(conf: String = "application.conf")

}

@Singleton
class BigdataApplication @Inject()(sparkConf: SparkConfiguration,
sparkContext: SparkContext,
source: DStreamSource[String],
adapter: Adapter,
trip: SubTrip
) extends Serializable with Logging {

private def createNewStreamingContext: StreamingContext = {
val ssc = new StreamingContext(sparkContext = sparkContext, Seconds(30))
ssc.checkpoint(sparkConf.checkPointPath)

sys.addShutdownHook {
logInfo("Gracefully stopping Spark Streaming Application")
ssc.stop(stopSparkContext = true, stopGracefully = true)
logInfo("Application stopped")
}

val dStream: DStream[String] = source.stream(ssc)
val sourceDstream: DStream[(String,SourceData)] = adapter.extract(dStream)

val subTrip = trip.aggregate(sourceDstream)
subTrip.print()
ssc
}

def run(params: Params): Unit = {

val ssc = StreamingContext.getOrCreate(sparkConf.checkPointPath, () => createNewStreamingContext)

ssc.start()
ssc.awaitTermination()
}
}

For the whole code, please refer to GitHub. Any suggestion is welcome.