Spark Structured Streaming 官方例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package ohmysummer

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._


/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
*
* Usage: MapGroupsWithState <hostname> <port>
* <hostname> and <port> describe the TCP server that Structured Streaming
* would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example sql.streaming.StructuredSessionization
* localhost 9999`
*/
object StructuredSessionization {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: StructuredSessionization <hostname> <port>")
System.exit(1)
}

val host = args(0)
val port = args(1).toInt

val spark = SparkSession
.builder
.master("local[2]")
.appName("StructuredSessionization")
.getOrCreate()

import spark.implicits._

// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()

// 将行分割成单词,将单词当作事件的 sessionId
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) => // 模式匹配
line.split(" ").map(word => Event(sessionId = word, timestamp))
}

// Sessionize the events. 追踪事件的数量, session 会话的开始和结束时间戳,并报告会话更新.
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

// If timed out, then remove session and send final update
if (state.hasTimedOut) {
val finalUpdate =
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
state.remove()
finalUpdate
} else {
// Update start and end timestamps in session
val timestamps = events.map(_.timestamp.getTime).toSeq
val updatedSession = if (state.exists) {
val oldSession = state.get
SessionInfo(
oldSession.numEvents + timestamps.size,
oldSession.startTimestampMs,
math.max(oldSession.endTimestampMs, timestamps.max))
} else {
SessionInfo(timestamps.size, timestamps.min, timestamps.max)
}
state.update(updatedSession)

// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds")
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
}
}

// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()

query.awaitTermination()
}
}
/** 用户自定义数据类型,表示输入事件 */
case class Event(sessionId: String, timestamp: Timestamp)

/**
* 用户定义数据类型,用于存储 session 信息, 作为 mapGroupsWithState 中的 state
*
* @param numEvents session 中收到的 event 总数
* @param startTimestampMs 会话开始时,接收到的第一个 event timestamp
* @param endTimestampMs 会话超时前,接收到的最后一个 event timestamp
*/
case class SessionInfo(
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long) {

/** 第一个和最后一个 event 之间, session 会话的持续时间 */
def durationMs: Long = endTimestampMs - startTimestampMs
}

/**
* 用户定义数据类型, 表示由 mapGroupsWithState 返回的 update 信息
*
* @param id session 会话的 Id
* @param durationMs 会话活跃时长,即, 从第一个 event 到它超时
* @param numEvents session 会话活跃期间接收到的 events 数量
* @param expired session 是活跃还是超时
*/
case class SessionUpdate(
id: String,
durationMs: Long,
numEvents: Int,
expired: Boolean)

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
+-----+----------+---------+-------+
| id|durationMs|numEvents|expired|
+-----+----------+---------+-------+
|hello| 0| 1| false|
|world| 43| 2| true|
+-----+----------+---------+-------+

-------------------------------------------
Batch: 14
-------------------------------------------
+-----+----------+---------+-------+
| id|durationMs|numEvents|expired|
+-----+----------+---------+-------+
|hello| 0| 1| true|
|world| 0| 1| false|
+-----+----------+---------+-------+

-------------------------------------------
Batch: 15
-------------------------------------------
+-----+----------+---------+-------+
| id|durationMs|numEvents|expired|
+-----+----------+---------+-------+
|world| 23| 2| false|
+-----+----------+---------+-------+