五个用于提取和探索复杂数据类型的 Spark SQL Helper 实用程序函数

https://docs.databricks.com/_static/notebooks/complex-nested-structured.html

虽然这个深入的博客列出并解释了处理和处理复杂数据类型和格式的概念和动机,但这个笔记本示例通过几个具体示例检查了如何将它们应用于您在使用中可能遇到的数据类型案例。这个简短的笔记本教程展示了一些方法,您可以在其中探索和使用许多新的帮助程序 Spark SQL 实用程序函数和 API 作为org.apache.spark.sql.functions 包的一部分。特别是,它们在执行 Streaming ETL 时派上用场,其中数据是具有复杂和嵌套结构的 JSON 对象:嵌入为 JSON 的 Map 和 Structs:

_ get_json_object()
_ from_json()
_ to_json()
_ explode()
_ selectExpr()

本简短教程的内容是使用 Spark SQL 实用程序函数对嵌套 JSON 结构进行切片和切块的无数方法。

让我们创建一个带有属性和值的简单 JSON 模式,而不使用任何嵌套结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions

val jsonSchema = new StructType()
.add("battery_level", LongType)
.add("c02_level", LongType)
.add("cca3",StringType)
.add("cn", StringType)
.add("device_id", LongType)
.add("device_type", StringType)
.add("signal", LongType)
.add("ip", StringType)
.add("temp", LongType)
.add("timestamp", TimestampType)

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(battery_level,LongType,true), StructField(c02_level,LongType,true), StructField(cca3,StringType,true), StructField(cn,StringType,true), StructField(device_id,LongType,true), StructField(device_type,StringType,true), StructField(signal,LongType,true), StructField(ip,StringType,true), StructField(temp,LongType,true), StructField(timestamp,TimestampType,true))

使用上面的模式,创建一个表示为 Scala case 类的 Dataset,并生成与之关联的一些 JSON 数据。很可能,这个 JSON 也可能是从 Kafka 主题读取的设备事件流。请注意,case 类有两个字段:integer(作为设备ID)和 string(作为表示设备事件的 JSON 字符串)。

从上面的 schema 创建 Dataset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// define a case class
case class DeviceData (id: Int, device: String)
// create some sample data
val eventsDS = Seq (
(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
(2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
(3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""),
(10,"""{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""),
(11,"""{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""),
(12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""),
(13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""),
(14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""),
(15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""),
(16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""),
(17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""),
(18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""),
(19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }""")).toDF("id", "device").as[DeviceData]
defined class DeviceData
eventsDS: org.apache.spark.sql.Dataset[DeviceData] = [id: int, device: string]

我们的 Dataset 是 Scala case 类的集合,当显示为 DataFrame 时,有两列 (id, string)

1
display(eventsDS)
id device
0 {“device_id”: 0, “device_type”: “sensor-ipad”, “ip”: “68.161.225.1”, “cca3”: “USA”, “cn”: “United States”, “temp”: 25, “signal”: 23, “battery_level”: 8, “c02_level”: 917, “timestamp” :1475600496 }
1 {“device_id”: 1, “device_type”: “sensor-igauge”, “ip”: “213.161.254.1”, “cca3”: “NOR”, “cn”: “Norway”, “temp”: 30, “signal”: 18, “battery_level”: 6, “c02_level”: 1413, “timestamp” :1475600498 }
2 {“device_id”: 2, “device_type”: “sensor-ipad”, “ip”: “88.36.5.1”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 18, “signal”: 25, “battery_level”: 5, “c02_level”: 1372, “timestamp” :1475600500 }
3 {“device_id”: 3, “device_type”: “sensor-inest”, “ip”: “66.39.173.154”, “cca3”: “USA”, “cn”: “United States”, “temp”: 47, “signal”: 12, “battery_level”: 1, “c02_level”: 1447, “timestamp” :1475600502 }
4 {“device_id”: 4, “device_type”: “sensor-ipad”, “ip”: “203.82.41.9”, “cca3”: “PHL”, “cn”: “Philippines”, “temp”: 29, “signal”: 11, “battery_level”: 0, “c02_level”: 983, “timestamp” :1475600504 }
5 {“device_id”: 5, “device_type”: “sensor-istick”, “ip”: “204.116.105.67”, “cca3”: “USA”, “cn”: “United States”, “temp”: 50, “signal”: 16, “battery_level”: 8, “c02_level”: 1574, “timestamp” :1475600506 }
6 {“device_id”: 6, “device_type”: “sensor-ipad”, “ip”: “220.173.179.1”, “cca3”: “CHN”, “cn”: “China”, “temp”: 21, “signal”: 18, “battery_level”: 9, “c02_level”: 1249, “timestamp” :1475600508 }
7 {“device_id”: 7, “device_type”: “sensor-ipad”, “ip”: “118.23.68.227”, “cca3”: “JPN”, “cn”: “Japan”, “temp”: 27, “signal”: 15, “battery_level”: 0, “c02_level”: 1531, “timestamp” :1475600512 }
8 {“device_id”: 8, “device_type”: “sensor-inest”, “ip”: “208.109.163.218”, “cca3”: “USA”, “cn”: “United States”, “temp”: 40, “signal”: 16, “battery_level”: 9, “c02_level”: 1208, “timestamp” :1475600514 }
9 {“device_id”: 9, “device_type”: “sensor-ipad”, “ip”: “88.213.191.34”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 19, “signal”: 11, “battery_level”: 0, “c02_level”: 1171, “timestamp” :1475600516 }
10 {“device_id”: 10, “device_type”: “sensor-igauge”, “ip”: “68.28.91.22”, “cca3”: “USA”, “cn”: “United States”, “temp”: 32, “signal”: 26, “battery_level”: 7, “c02_level”: 886, “timestamp” :1475600518 }
11 {“device_id”: 11, “device_type”: “sensor-ipad”, “ip”: “59.144.114.250”, “cca3”: “IND”, “cn”: “India”, “temp”: 46, “signal”: 25, “battery_level”: 4, “c02_level”: 863, “timestamp” :1475600520 }
12 {“device_id”: 12, “device_type”: “sensor-igauge”, “ip”: “193.156.90.200”, “cca3”: “NOR”, “cn”: “Norway”, “temp”: 18, “signal”: 26, “battery_level”: 8, “c02_level”: 1220, “timestamp” :1475600522 }
13 {“device_id”: 13, “device_type”: “sensor-ipad”, “ip”: “67.185.72.1”, “cca3”: “USA”, “cn”: “United States”, “temp”: 34, “signal”: 20, “battery_level”: 8, “c02_level”: 1504, “timestamp” :1475600524 }
14 {“device_id”: 14, “device_type”: “sensor-inest”, “ip”: “68.85.85.106”, “cca3”: “USA”, “cn”: “United States”, “temp”: 39, “signal”: 17, “battery_level”: 8, “c02_level”: 831, “timestamp” :1475600526 }
15 {“device_id”: 15, “device_type”: “sensor-ipad”, “ip”: “161.188.212.254”, “cca3”: “USA”, “cn”: “United States”, “temp”: 27, “signal”: 26, “battery_level”: 5, “c02_level”: 1378, “timestamp” :1475600528 }
16 {“device_id”: 16, “device_type”: “sensor-igauge”, “ip”: “221.3.128.242”, “cca3”: “CHN”, “cn”: “China”, “temp”: 10, “signal”: 24, “battery_level”: 6, “c02_level”: 1423, “timestamp” :1475600530 }
17 {“device_id”: 17, “device_type”: “sensor-ipad”, “ip”: “64.124.180.215”, “cca3”: “USA”, “cn”: “United States”, “temp”: 38, “signal”: 17, “battery_level”: 9, “c02_level”: 1304, “timestamp” :1475600532 }
18 {“device_id”: 18, “device_type”: “sensor-igauge”, “ip”: “66.153.162.66”, “cca3”: “USA”, “cn”: “United States”, “temp”: 26, “signal”: 10, “battery_level”: 0, “c02_level”: 902, “timestamp” :1475600534 }
19 {“device_id”: 19, “device_type”: “sensor-ipad”, “ip”: “193.200.142.254”, “cca3”: “AUT”, “cn”: “Austria”, “temp”: 32, “signal”: 27, “battery_level”: 5, “c02_level”: 1282, “timestamp” :1475600536 }

打印 schema 将测试为两列,类型为 integerstring,反映了 Scala case 类。

1
2
3
4
eventsDS.printSchema
root
|-- id: integer (nullable = false)
|-- device: string (nullable = true)

如何使用 get_json_object()

此方法基于指定的 JSON 路径从 JSON 字符串中提取 JSON 对象,并返回 JSON 字符串作为提取的 JSON 对象。以上面 Dataset 的小例子为例,我们希望从中提取构成 JSON 对象字符串的数据值部分。假设您只想提取 id,device_type,ip 和 CCA3 代码。这是使用 get_json_object() 快速完成的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
val eventsFromJSONDF = Seq (
(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
(2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
(3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }""")).toDF("id", "json")

eventsFromJSONDF: org.apache.spark.sql.DataFrame = [id: int, json: string]
1
display(eventsFromJSONDF)
id json
0 {“device_id”: 0, “device_type”: “sensor-ipad”, “ip”: “68.161.225.1”, “cca3”: “USA”, “cn”: “United States”, “temp”: 25, “signal”: 23, “battery_level”: 8, “c02_level”: 917, “timestamp” :1475600496 }
1 {“device_id”: 1, “device_type”: “sensor-igauge”, “ip”: “213.161.254.1”, “cca3”: “NOR”, “cn”: “Norway”, “temp”: 30, “signal”: 18, “battery_level”: 6, “c02_level”: 1413, “timestamp” :1475600498 }
2 {“device_id”: 2, “device_type”: “sensor-ipad”, “ip”: “88.36.5.1”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 18, “signal”: 25, “battery_level”: 5, “c02_level”: 1372, “timestamp” :1475600500 }
3 {“device_id”: 3, “device_type”: “sensor-inest”, “ip”: “66.39.173.154”, “cca3”: “USA”, “cn”: “United States”, “temp”: 47, “signal”: 12, “battery_level”: 1, “c02_level”: 1447, “timestamp” :1475600502 }
4 {“device_id”: 4, “device_type”: “sensor-ipad”, “ip”: “203.82.41.9”, “cca3”: “PHL”, “cn”: “Philippines”, “temp”: 29, “signal”: 11, “battery_level”: 0, “c02_level”: 983, “timestamp” :1475600504 }
5 {“device_id”: 5, “device_type”: “sensor-istick”, “ip”: “204.116.105.67”, “cca3”: “USA”, “cn”: “United States”, “temp”: 50, “signal”: 16, “battery_level”: 8, “c02_level”: 1574, “timestamp” :1475600506 }
6 {“device_id”: 6, “device_type”: “sensor-ipad”, “ip”: “220.173.179.1”, “cca3”: “CHN”, “cn”: “China”, “temp”: 21, “signal”: 18, “battery_level”: 9, “c02_level”: 1249, “timestamp” :1475600508 }
7 {“device_id”: 7, “device_type”: “sensor-ipad”, “ip”: “118.23.68.227”, “cca3”: “JPN”, “cn”: “Japan”, “temp”: 27, “signal”: 15, “battery_level”: 0, “c02_level”: 1531, “timestamp” :1475600512 }
8 {“device_id”: 8, “device_type”: “sensor-inest”, “ip”: “208.109.163.218”, “cca3”: “USA”, “cn”: “United States”, “temp”: 40, “signal”: 16, “battery_level”: 9, “c02_level”: 1208, “timestamp” :1475600514 }
9 {“device_id”: 9, “device_type”: “sensor-ipad”, “ip”: “88.213.191.34”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 19, “signal”: 11, “battery_level”: 0, “c02_level”: 1171, “timestamp” :1475600516 }
1
2
3
4
val jsDF = eventsFromJSONDF.select($"id", get_json_object($"json", "$.device_type").alias("device_type"),
get_json_object($"json", "$.ip").alias("ip"),
get_json_object($"json", "$.cca3").alias("cca3"))
jsDF: org.apache.spark.sql.DataFrame = [id: int, device_type: string ... 2 more fields]
1
display(jsDF)
id device_type ip cca3
0 sensor-ipad 68.161.225.1 USA
1 sensor-igauge 213.161.254.1 NOR
2 sensor-ipad 88.36.5.1 ITA
3 sensor-inest 66.39.173.154 USA
4 sensor-ipad 203.82.41.9 PHL
5 sensor-istick 204.116.105.67 USA
6 sensor-ipad 220.173.179.1 CHN
7 sensor-ipad 118.23.68.227 JPN
8 sensor-inest 208.109.163.218 USA
9 sensor-ipad 88.213.191.34 ITA

如何使用 from_json()

get_json_object() 的变体,此函数使用 schema 提取单个列。在 select() Dataset API 调用中使用 from_json() 辅助函数,我可以将数据的属性和值从 JSON 字符串提取或解码为 DataFrame 作为列,由 schema 指定。在使用 schema 时,我将此 JSON 中所有关联的属性和值归为一类,以表示为实体设备。因此,您不仅可以使用 device.attribute 来检索其各自的值,还可以使用 * 表示法检索所有值。

在下面的例子中:

  • 使用上面的 schema 从 JSON 字符串中提取属性和值,并将它们表示为单个列作为 devices 的一部分
  • select() 所有列
  • 使用 . 记号过滤所需的属性

将 JSON 字符串中的数据提取到其各自的 DataFrame 列后,您可以应用 DataFrame/Dataset API 调用来选择,过滤和后续显示,令您满意。

1
2
3
4
val devicesDF = eventsDS.select(from_json($"device", jsonSchema) as "devices")
.select($"devices.*")
.filter($"devices.temp" > 10 and $"devices.signal" > 15)
devicesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
1
display(devicesDF)

todo

1
2
val devicesUSDF = devicesDF.select($"*").where($"cca3" === "USA").orderBy($"signal".desc, $"temp".desc)
devicesUSDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
1
display(devicesUSDF)

如何使用 to_json()

现在,让我们反过来:您可以使用 to_json() 将我们过滤的设备转换或编码为 JSON 字符串。也就是说,将 JSON 结构转换为字符串。结果可以重新发布到例如 Kafka 或作为 parquet 文件保存在磁盘上。要了解如何写入 Kafka 和其他接收器,请阅读此博客和我们关于结构化流博客的系列。

1
2
val stringJsonDF = eventsDS.select(to_json(struct($"*"))).toDF("devices")
stringJsonDF: org.apache.spark.sql.DataFrame = [devices: string]
1
display(stringJsonDF)
devices
{“id”:0,”device”:”{\”device_id\”: 0, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”68.161.225.1\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 25, \”signal\”: 23, \”battery_level\”: 8, \”c02_level\”: 917, \”timestamp\” :1475600496 }”}
{“id”:1,”device”:”{\”device_id\”: 1, \”device_type\”: \”sensor-igauge\”, \”ip\”: \”213.161.254.1\”, \”cca3\”: \”NOR\”, \”cn\”: \”Norway\”, \”temp\”: 30, \”signal\”: 18, \”battery_level\”: 6, \”c02_level\”: 1413, \”timestamp\” :1475600498 }”}
{“id”:2,”device”:”{\”device_id\”: 2, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”88.36.5.1\”, \”cca3\”: \”ITA\”, \”cn\”: \”Italy\”, \”temp\”: 18, \”signal\”: 25, \”battery_level\”: 5, \”c02_level\”: 1372, \”timestamp\” :1475600500 }”}
{“id”:3,”device”:”{\”device_id\”: 3, \”device_type\”: \”sensor-inest\”, \”ip\”: \”66.39.173.154\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 47, \”signal\”: 12, \”battery_level\”: 1, \”c02_level\”: 1447, \”timestamp\” :1475600502 }”}
{“id”:4,”device”:”{\”device_id\”: 4, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”203.82.41.9\”, \”cca3\”: \”PHL\”, \”cn\”: \”Philippines\”, \”temp\”: 29, \”signal\”: 11, \”battery_level\”: 0, \”c02_level\”: 983, \”timestamp\” :1475600504 }”}
{“id”:5,”device”:”{\”device_id\”: 5, \”device_type\”: \”sensor-istick\”, \”ip\”: \”204.116.105.67\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 50, \”signal\”: 16, \”battery_level\”: 8, \”c02_level\”: 1574, \”timestamp\” :1475600506 }”}
{“id”:6,”device”:”{\”device_id\”: 6, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”220.173.179.1\”, \”cca3\”: \”CHN\”, \”cn\”: \”China\”, \”temp\”: 21, \”signal\”: 18, \”battery_level\”: 9, \”c02_level\”: 1249, \”timestamp\” :1475600508 }”}
{“id”:7,”device”:”{\”device_id\”: 7, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”118.23.68.227\”, \”cca3\”: \”JPN\”, \”cn\”: \”Japan\”, \”temp\”: 27, \”signal\”: 15, \”battery_level\”: 0, \”c02_level\”: 1531, \”timestamp\” :1475600512 }”}
{“id”:8,”device”:” {\”device_id\”: 8, \”device_type\”: \”sensor-inest\”, \”ip\”: \”208.109.163.218\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 40, \”signal\”: 16, \”battery_level\”: 9, \”c02_level\”: 1208, \”timestamp\” :1475600514 }”}
{“id”:9,”device”:”{\”device_id\”: 9, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”88.213.191.34\”, \”cca3\”: \”ITA\”, \”cn\”: \”Italy\”, \”temp\”: 19, \”signal\”: 11, \”battery_level\”: 0, \”c02_level\”: 1171, \”timestamp\” :1475600516 }”}
{“id”:10,”device”:”{\”device_id\”: 10, \”device_type\”: \”sensor-igauge\”, \”ip\”: \”68.28.91.22\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 32, \”signal\”: 26, \”battery_level\”: 7, \”c02_level\”: 886, \”timestamp\” :1475600518 }”}
{“id”:11,”device”:”{\”device_id\”: 11, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”59.144.114.250\”, \”cca3\”: \”IND\”, \”cn\”: \”India\”, \”temp\”: 46, \”signal\”: 25, \”battery_level\”: 4, \”c02_level\”: 863, \”timestamp\” :1475600520 }”}
{“id”:12,”device”:”{\”device_id\”: 12, \”device_type\”: \”sensor-igauge\”, \”ip\”: \”193.156.90.200\”, \”cca3\”: \”NOR\”, \”cn\”: \”Norway\”, \”temp\”: 18, \”signal\”: 26, \”battery_level\”: 8, \”c02_level\”: 1220, \”timestamp\” :1475600522 }”}
{“id”:13,”device”:”{\”device_id\”: 13, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”67.185.72.1\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 34, \”signal\”: 20, \”battery_level\”: 8, \”c02_level\”: 1504, \”timestamp\” :1475600524 }”}
{“id”:14,”device”:”{\”device_id\”: 14, \”device_type\”: \”sensor-inest\”, \”ip\”: \”68.85.85.106\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 39, \”signal\”: 17, \”battery_level\”: 8, \”c02_level\”: 831, \”timestamp\” :1475600526 }”}
{“id”:15,”device”:”{\”device_id\”: 15, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”161.188.212.254\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 27, \”signal\”: 26, \”battery_level\”: 5, \”c02_level\”: 1378, \”timestamp\” :1475600528 }”}
{“id”:16,”device”:”{\”device_id\”: 16, \”device_type\”: \”sensor-igauge\”, \”ip\”: \”221.3.128.242\”, \”cca3\”: \”CHN\”, \”cn\”: \”China\”, \”temp\”: 10, \”signal\”: 24, \”battery_level\”: 6, \”c02_level\”: 1423, \”timestamp\” :1475600530 }”}
{“id”:17,”device”:”{\”device_id\”: 17, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”64.124.180.215\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 38, \”signal\”: 17, \”battery_level\”: 9, \”c02_level\”: 1304, \”timestamp\” :1475600532 }”}
{“id”:18,”device”:”{\”device_id\”: 18, \”device_type\”: \”sensor-igauge\”, \”ip\”: \”66.153.162.66\”, \”cca3\”: \”USA\”, \”cn\”: \”United States\”, \”temp\”: 26, \”signal\”: 10, \”battery_level\”: 0, \”c02_level\”: 902, \”timestamp\” :1475600534 }”}
{“id”:19,”device”:”{\”device_id\”: 19, \”device_type\”: \”sensor-ipad\”, \”ip\”: \”193.200.142.254\”, \”cca3\”: \”AUT\”, \”cn\”: \”Austria\”, \”temp\”: 32, \”signal\”: 27, \”battery_level\”: 5, \”c02_level\”: 1282, \”timestamp\” :1475600536 }”}

假设你有一个在指定端口运行的 Kafka 集群和相应的主题,让我们写一下 Kafka 主题。

1
2
3
4
5
6
7
stringJsonDF.write
.format("kafka")
.option("kafka.bootstrap.servers", "your_host_name:9092")
.option("topic", "iot-devices")
.save()

org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;

如何使用 selectExpr()

将列转换或编码为 JSON 对象作为字符串的另一种方法是使用 selectExpr() 实用程序函数。例如,我可以将 DataFrame 的 “device” 列从上面转换为 JSON 字符串

1
2
val stringsDF = eventsDS.selectExpr("CAST(id AS INT)", "CAST(device AS STRING)")
stringsDF: org.apache.spark.sql.DataFrame = [id: int, device: string]
1
2
3
4
stringsDF.printSchema
root
|-- id: integer (nullable = false)
|-- device: string (nullable = true)
1
display(stringsDF)
id device
0 {“device_id”: 0, “device_type”: “sensor-ipad”, “ip”: “68.161.225.1”, “cca3”: “USA”, “cn”: “United States”, “temp”: 25, “signal”: 23, “battery_level”: 8, “c02_level”: 917, “timestamp” :1475600496 }
1 {“device_id”: 1, “device_type”: “sensor-igauge”, “ip”: “213.161.254.1”, “cca3”: “NOR”, “cn”: “Norway”, “temp”: 30, “signal”: 18, “battery_level”: 6, “c02_level”: 1413, “timestamp” :1475600498 }
2 {“device_id”: 2, “device_type”: “sensor-ipad”, “ip”: “88.36.5.1”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 18, “signal”: 25, “battery_level”: 5, “c02_level”: 1372, “timestamp” :1475600500 }
3 {“device_id”: 3, “device_type”: “sensor-inest”, “ip”: “66.39.173.154”, “cca3”: “USA”, “cn”: “United States”, “temp”: 47, “signal”: 12, “battery_level”: 1, “c02_level”: 1447, “timestamp” :1475600502 }
4 {“device_id”: 4, “device_type”: “sensor-ipad”, “ip”: “203.82.41.9”, “cca3”: “PHL”, “cn”: “Philippines”, “temp”: 29, “signal”: 11, “battery_level”: 0, “c02_level”: 983, “timestamp” :1475600504 }
5 {“device_id”: 5, “device_type”: “sensor-istick”, “ip”: “204.116.105.67”, “cca3”: “USA”, “cn”: “United States”, “temp”: 50, “signal”: 16, “battery_level”: 8, “c02_level”: 1574, “timestamp” :1475600506 }
6 {“device_id”: 6, “device_type”: “sensor-ipad”, “ip”: “220.173.179.1”, “cca3”: “CHN”, “cn”: “China”, “temp”: 21, “signal”: 18, “battery_level”: 9, “c02_level”: 1249, “timestamp” :1475600508 }
7 {“device_id”: 7, “device_type”: “sensor-ipad”, “ip”: “118.23.68.227”, “cca3”: “JPN”, “cn”: “Japan”, “temp”: 27, “signal”: 15, “battery_level”: 0, “c02_level”: 1531, “timestamp” :1475600512 }
8 {“device_id”: 8, “device_type”: “sensor-inest”, “ip”: “208.109.163.218”, “cca3”: “USA”, “cn”: “United States”, “temp”: 40, “signal”: 16, “battery_level”: 9, “c02_level”: 1208, “timestamp” :1475600514 }
9 {“device_id”: 9, “device_type”: “sensor-ipad”, “ip”: “88.213.191.34”, “cca3”: “ITA”, “cn”: “Italy”, “temp”: 19, “signal”: 11, “battery_level”: 0, “c02_level”: 1171, “timestamp” :1475600516 }
10 {“device_id”: 10, “device_type”: “sensor-igauge”, “ip”: “68.28.91.22”, “cca3”: “USA”, “cn”: “United States”, “temp”: 32, “signal”: 26, “battery_level”: 7, “c02_level”: 886, “timestamp” :1475600518 }
11 {“device_id”: 11, “device_type”: “sensor-ipad”, “ip”: “59.144.114.250”, “cca3”: “IND”, “cn”: “India”, “temp”: 46, “signal”: 25, “battery_level”: 4, “c02_level”: 863, “timestamp” :1475600520 }
12 {“device_id”: 12, “device_type”: “sensor-igauge”, “ip”: “193.156.90.200”, “cca3”: “NOR”, “cn”: “Norway”, “temp”: 18, “signal”: 26, “battery_level”: 8, “c02_level”: 1220, “timestamp” :1475600522 }
13 {“device_id”: 13, “device_type”: “sensor-ipad”, “ip”: “67.185.72.1”, “cca3”: “USA”, “cn”: “United States”, “temp”: 34, “signal”: 20, “battery_level”: 8, “c02_level”: 1504, “timestamp” :1475600524 }
14 {“device_id”: 14, “device_type”: “sensor-inest”, “ip”: “68.85.85.106”, “cca3”: “USA”, “cn”: “United States”, “temp”: 39, “signal”: 17, “battery_level”: 8, “c02_level”: 831, “timestamp” :1475600526 }
15 {“device_id”: 15, “device_type”: “sensor-ipad”, “ip”: “161.188.212.254”, “cca3”: “USA”, “cn”: “United States”, “temp”: 27, “signal”: 26, “battery_level”: 5, “c02_level”: 1378, “timestamp” :1475600528 }
16 {“device_id”: 16, “device_type”: “sensor-igauge”, “ip”: “221.3.128.242”, “cca3”: “CHN”, “cn”: “China”, “temp”: 10, “signal”: 24, “battery_level”: 6, “c02_level”: 1423, “timestamp” :1475600530 }
17 {“device_id”: 17, “device_type”: “sensor-ipad”, “ip”: “64.124.180.215”, “cca3”: “USA”, “cn”: “United States”, “temp”: 38, “signal”: 17, “battery_level”: 9, “c02_level”: 1304, “timestamp” :1475600532 }
18 {“device_id”: 18, “device_type”: “sensor-igauge”, “ip”: “66.153.162.66”, “cca3”: “USA”, “cn”: “United States”, “temp”: 26, “signal”: 10, “battery_level”: 0, “c02_level”: 902, “timestamp” :1475600534 }
19 {“device_id”: 19, “device_type”: “sensor-ipad”, “ip”: “193.200.142.254”, “cca3”: “AUT”, “cn”: “Austria”, “temp”: 32, “signal”: 27, “battery_level”: 5, “c02_level”: 1282, “timestamp” :1475600536 }

selectExpr() 的另一个用途是它的功能,正如函数名所示,它将表达式作为参数并将它们转换为相应的列。例如,假设我想表达c02水平和温度比。

1
display(devicesDF.selectExpr("c02_level", "round(c02_level/temp) as ratio_c02_temperature").orderBy($"ratio_c02_temperature" desc))
c02_level ratio_c02_temperature
1372 76
1220 68
1249 59
1378 51
1413 47
1504 44
1282 40
917 37
1304 34
1574 31
1208 30
886 28
831 21
863 19

上述查询可以像在 DataFrame API 中一样在 Spark SQL 中轻松表达。 selectExpr() 的强大之处在于处理或处理数值。让我们尝试创建一个临时视图并表达相同的查询,除了这次我们使用 SQL。

1
devicesDF.createOrReplaceTempView("devicesDFT")

请注意,cmd 42 的输出与 cmd 38 没有区别。两者都使用相同的 Spark SQL 引擎的 Catalyst 并生成等效的底层紧凑代码。

1
2
3
4
%sql select c02_level, 
round(c02_level/temp) as ratio_c02_temperature
from devicesDFT
order by ratio_c02_temperature desc
c02_level ratio_c02_temperature
1372 76
1220 68
1249 59
1378 51
1413 47
1504 44
1282 40
917 37
1304 34
1574 31
1208 30
886 28
831 21
863 19

要验证所有字符串转换是否保留在上面的 DataFrame stringJsonDF 中,让我们将其保存为 blob 存储为 Parquet。

1
2
3
4
5
stringJsonDF
.write
.mode("overwrite")
.format("parquet")
.save("/tmp/iot")

检查是否所有文件都已写入。

1
%fs ls /tmp/iot
path name size
dbfs:/tmp/iot/_SUCCESS _SUCCESS 0
dbfs:/tmp/iot/part-00000-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00000-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1567
dbfs:/tmp/iot/part-00001-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00001-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1598
dbfs:/tmp/iot/part-00002-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00002-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1576
dbfs:/tmp/iot/part-00003-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00003-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1602
dbfs:/tmp/iot/part-00004-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00004-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1574
dbfs:/tmp/iot/part-00005-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00005-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1618
dbfs:/tmp/iot/part-00006-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00006-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1589
dbfs:/tmp/iot/part-00007-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,part-00007-5ad82ac7-eea2-4439-9635-0dac9279a742.gz.parquet,1624

现在让我们验证保存设备是什么,因为从上面编码的每个字符串都是实际的字符串。

1
2
val parquetDF = spark.read.parquet("/tmp/iot")
parquetDF: org.apache.spark.sql.DataFrame = [devices: string]

让我们检查一下模式,以确保所写的内容与读取的内容没有什么不同,即JSON字符串。

1
2
3
parquetDF.printSchema
root
|-- devices: string (nullable = true)
devices
“{“”id””:17,””device””:””{\””device_id\””: 17, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””64.124.180.215\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 38, \””signal\””: 17, \””battery_level\””: 9, \””c02_level\””: 1304, \””timestamp\”” :1475600532 }””}”
“{“”id””:18,””device””:””{\””device_id\””: 18, \””device_type\””: \””sensor-igauge\””, \””ip\””: \””66.153.162.66\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 26, \””signal\””: 10, \””battery_level\””: 0, \””c02_level\””: 902, \””timestamp\”” :1475600534 }””}”
“{“”id””:19,””device””:””{\””device_id\””: 19, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””193.200.142.254\””, \””cca3\””: \””AUT\””, \””cn\””: \””Austria\””, \””temp\””: 32, \””signal\””: 27, \””battery_level\””: 5, \””c02_level\””: 1282, \””timestamp\”” :1475600536 }””}”
“{“”id””:12,””device””:””{\””device_id\””: 12, \””device_type\””: \””sensor-igauge\””, \””ip\””: \””193.156.90.200\””, \””cca3\””: \””NOR\””, \””cn\””: \””Norway\””, \””temp\””: 18, \””signal\””: 26, \””battery_level\””: 8, \””c02_level\””: 1220, \””timestamp\”” :1475600522 }””}”
“{“”id””:13,””device””:””{\””device_id\””: 13, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””67.185.72.1\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 34, \””signal\””: 20, \””battery_level\””: 8, \””c02_level\””: 1504, \””timestamp\”” :1475600524 }””}”
“{“”id””:14,””device””:””{\””device_id\””: 14, \””device_type\””: \””sensor-inest\””, \””ip\””: \””68.85.85.106\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 39, \””signal\””: 17, \””battery_level\””: 8, \””c02_level\””: 831, \””timestamp\”” :1475600526 }””}”
“{“”id””:7,””device””:””{\””device_id\””: 7, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””118.23.68.227\””, \””cca3\””: \””JPN\””, \””cn\””: \””Japan\””, \””temp\””: 27, \””signal\””: 15, \””battery_level\””: 0, \””c02_level\””: 1531, \””timestamp\”” :1475600512 }””}”
“{“”id””:8,””device””:”” {\””device_id\””: 8, \””device_type\””: \””sensor-inest\””, \””ip\””: \””208.109.163.218\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 40, \””signal\””: 16, \””battery_level\””: 9, \””c02_level\””: 1208, \””timestamp\”” :1475600514 }””}”
“{“”id””:9,””device””:””{\””device_id\””: 9, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””88.213.191.34\””, \””cca3\””: \””ITA\””, \””cn\””: \””Italy\””, \””temp\””: 19, \””signal\””: 11, \””battery_level\””: 0, \””c02_level\””: 1171, \””timestamp\”” :1475600516 }””}”
“{“”id””:2,””device””:””{\””device_id\””: 2, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””88.36.5.1\””, \””cca3\””: \””ITA\””, \””cn\””: \””Italy\””, \””temp\””: 18, \””signal\””: 25, \””battery_level\””: 5, \””c02_level\””: 1372, \””timestamp\”” :1475600500 }””}”
“{“”id””:3,””device””:””{\””device_id\””: 3, \””device_type\””: \””sensor-inest\””, \””ip\””: \””66.39.173.154\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 47, \””signal\””: 12, \””battery_level\””: 1, \””c02_level\””: 1447, \””timestamp\”” :1475600502 }””}”
“{“”id””:4,””device””:””{\””device_id\””: 4, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””203.82.41.9\””, \””cca3\””: \””PHL\””, \””cn\””: \””Philippines\””, \””temp\””: 29, \””signal\””: 11, \””battery_level\””: 0, \””c02_level\””: 983, \””timestamp\”” :1475600504 }””}”
“{“”id””:15,””device””:””{\””device_id\””: 15, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””161.188.212.254\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 27, \””signal\””: 26, \””battery_level\””: 5, \””c02_level\””: 1378, \””timestamp\”” :1475600528 }””}”
“{“”id””:16,””device””:””{\””device_id\””: 16, \””device_type\””: \””sensor-igauge\””, \””ip\””: \””221.3.128.242\””, \””cca3\””: \””CHN\””, \””cn\””: \””China\””, \””temp\””: 10, \””signal\””: 24, \””battery_level\””: 6, \””c02_level\””: 1423, \””timestamp\”” :1475600530 }””}”
“{“”id””:5,””device””:””{\””device_id\””: 5, \””device_type\””: \””sensor-istick\””, \””ip\””: \””204.116.105.67\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 50, \””signal\””: 16, \””battery_level\””: 8, \””c02_level\””: 1574, \””timestamp\”” :1475600506 }””}”
“{“”id””:6,””device””:””{\””device_id\””: 6, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””220.173.179.1\””, \””cca3\””: \””CHN\””, \””cn\””: \””China\””, \””temp\””: 21, \””signal\””: 18, \””battery_level\””: 9, \””c02_level\””: 1249, \””timestamp\”” :1475600508 }””}”
“{“”id””:10,””device””:””{\””device_id\””: 10, \””device_type\””: \””sensor-igauge\””, \””ip\””: \””68.28.91.22\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 32, \””signal\””: 26, \””battery_level\””: 7, \””c02_level\””: 886, \””timestamp\”” :1475600518 }””}”
“{“”id””:11,””device””:””{\””device_id\””: 11, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””59.144.114.250\””, \””cca3\””: \””IND\””, \””cn\””: \””India\””, \””temp\””: 46, \””signal\””: 25, \””battery_level\””: 4, \””c02_level\””: 863, \””timestamp\”” :1475600520 }””}”
“{“”id””:0,””device””:””{\””device_id\””: 0, \””device_type\””: \””sensor-ipad\””, \””ip\””: \””68.161.225.1\””, \””cca3\””: \””USA\””, \””cn\””: \””United States\””, \””temp\””: 25, \””signal\””: 23, \””battery_level\””: 8, \””c02_level\””: 917, \””timestamp\”” :1475600496 }””}”
“{“”id””:1,””device””:””{\””device_id\””: 1, \””device_type\””: \””sensor-igauge\””, \””ip\””: \””213.161.254.1\””, \””cca3\””: \””NOR\””, \””cn\””: \””Norway\””, \””temp\””: 30, \””signal\””: 18, \””battery_level\””: 6, \””c02_level\””: 1413, \””timestamp\”” :1475600498 }””}”

到目前为止,本教程已探索了使用 get_json_object()from_json()to_json()selectExpr()explode() 辅助函数处理不太复杂的 JSON 对象的方法。

让我们将焦点转向更嵌套的结构,并检查应用于复杂 JSON 的这些相同 API 如何简单。

嵌套结构

假设您的 JSON 嵌套结构可能包含 Maps 以及嵌套 JSON,这并不是不合理的。为了说明,我们使用由复杂和嵌套数据类型组成的单个字符串,包括 Map。在现实生活中,这可能是设备事件的读数,具有危险的二氧化碳排放水平或高温读数,需要网络运营中心(NOC)通知某些立即行动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.spark.sql.types._

val schema = new StructType()
.add("dc_id", StringType) // data center where data was posted to Kafka cluster
.add("source", // info about the source of alarm
MapType( // define this as a Map(Key->value)
StringType,
new StructType()
.add("description", StringType)
.add("ip", StringType)
.add("id", LongType)
.add("temp", LongType)
.add("c02_level", LongType)
.add("geo",
new StructType()
.add("lat", DoubleType)
.add("long", DoubleType)
)
)
)
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(dc_id,StringType,true), StructField(source,MapType(StringType,StructType(StructField(description,StringType,true), StructField(ip,StringType,true), StructField(id,LongType,true), StructField(temp,LongType,true), StructField(c02_level,LongType,true), StructField(geo,StructType(StructField(lat,DoubleType,true), StructField(long,DoubleType,true)),true)),true),true))

让我们创建一个复杂数据类型的复杂 JSON。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//create a single entry with id and its complex and nested data types

val dataDS = Seq("""
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}""").toDS()
// should only be one item
dataDS.count()
dataDS: org.apache.spark.sql.Dataset[String] = [value: string]
res15: Long = 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}

我们来处理吧。请注意,我们有嵌套结构 geo

1
2
3
4
5
val df = spark                  // spark session 
.read // get DataFrameReader
.schema(schema) // use the defined schema above and read format as JSON
.json(dataDS.rdd) // RDD[String]
df: org.apache.spark.sql.DataFrame = [dc_id: string, source: map<string,struct<description:string,ip:string,id:bigint,temp:bigint,c02_level:bigint,geo:struct<lat:double,long:double>>>]

我们来看看它的嵌套和复杂 schema。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.printSchema
root
|-- dc_id: string (nullable = true)
|-- source: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- description: string (nullable = true)
| | |-- ip: string (nullable = true)
| | |-- id: long (nullable = true)
| | |-- temp: long (nullable = true)
| | |-- c02_level: long (nullable = true)
| | |-- geo: struct (nullable = true)
| | | |-- lat: double (nullable = true)
| | | |-- long: double (nullable = true)
1
2
dc_id, source
dc-101,{"sensor-igauge":{"description":"Sensor attached to the container ceilings","ip":"68.28.91.22","id":"10","temp":"35","c02_level":"1475","geo":{"lat":38,"long":97}},"sensor-ipad":{"description":"Sensor ipad attached to carbon cylinders","ip":"67.185.72.1","id":"13","temp":"34","c02_level":"1370","geo":{"lat":47.41,"long":-122}},"sensor-inest":{"description":"Sensor attached to the factory ceilings","ip":"208.109.163.218","id":"8","temp":"40","c02_level":"1346","geo":{"lat":33.61,"long":-111.89}},"sensor-istick":{"description":"Sensor embedded in exhaust pipes in the ceilings","ip":"204.116.105.67","id":"5","temp":"40","c02_level":"1574","geo":{"lat":35.93,"long":-85.46}}}

如何使用 explode()

explode() 函数用于显示如何提取嵌套结构。另外,当我们从复杂的 JSON 结构中提取属性和值时,我们看到它与 to_json()from_json() 函数一起工作时会有更多的亮点。所以偶尔,你会想要使用 explode(),以及 to_json()from_json() 函数。这是我们做的一个案例。
explode() 函数为给定 map 列中的每个元素创建一个新行。在这种情况下,map 列是 source。请注意,对于 map 中的每个键值,您都有一个相应的行,在本例中为四个。

1
2
3
// select from DataFrame with a single entry, and explode its column source, which is Map, with nested structure.
val explodedDF = df.select($"dc_id", explode($"source"))
display(explodedDF)
dc_id key value
dc-101 sensor-igauge {“description”:”Sensor attached to the container ceilings”,”ip”:”68.28.91.22”,”id”:”10”,”temp”:”35”,”c02_level”:”1475”,”geo”:{“lat”:38,”long”:97}}
dc-101 sensor-ipad {“description”:”Sensor ipad attached to carbon cylinders”,”ip”:”67.185.72.1”,”id”:”13”,”temp”:”34”,”c02_level”:”1370”,”geo”:{“lat”:47.41,”long”:-122}}
dc-101 sensor-inest {“description”:”Sensor attached to the factory ceilings”,”ip”:”208.109.163.218”,”id”:”8”,”temp”:”40”,”c02_level”:”1346”,”geo”:{“lat”:33.61,”long”:-111.89}}
dc-101 sensor-istick {“description”:”Sensor embedded in exhaust pipes in the ceilings”,”ip”:”204.116.105.67”,”id”:”5”,”temp”:”40”,”c02_level”:”1574”,”geo”:{“lat”:35.93,”long”:-85.46}}

查看架构时,请注意现在已扩展 source

1
2
3
4
5
6
7
8
9
10
11
12
13
explodedDF.printSchema
root
|-- dc_id: string (nullable = true)
|-- key: string (nullable = false)
|-- value: struct (nullable = true)
| |-- description: string (nullable = true)
| |-- ip: string (nullable = true)
| |-- id: long (nullable = true)
| |-- temp: long (nullable = true)
| |-- c02_level: long (nullable = true)
| |-- geo: struct (nullable = true)
| | |-- lat: double (nullable = true)
| | |-- long: double (nullable = true)

与复杂数据类型聚合的单个字符串,包括 Map。这可能是需要网络运营中心(NOC)注意行动的记录,因为温度和二氧化碳水平都是惊人的。

让我们使用 Map 访问我们的爆炸数据中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//case class to denote our desired Scala object
case class DeviceAlert(dcId: String, deviceType:String, ip:String, deviceId:Long, temp:Long, c02_level: Long, lat: Double, lon: Double)
//access all values using getItem() method on value, by providing the "key," which is attribute in our JSON object.
val notifydevicesDS = explodedDF.select( $"dc_id" as "dcId",
$"key" as "deviceType",
'value.getItem("ip") as 'ip,
'value.getItem("id") as 'deviceId,
'value.getItem("c02_level") as 'c02_level,
'value.getItem("temp") as 'temp,
'value.getItem("geo").getItem("lat") as 'lat, //note embedded level requires yet another level of fetching.
'value.getItem("geo").getItem("long") as 'lon)
.as[DeviceAlert] // return as a Dataset
defined class DeviceAlert
notifydevicesDS: org.apache.spark.sql.Dataset[DeviceAlert] = [dcId: string, deviceType: string ... 6 more fields]
1
2
3
4
5
6
7
8
9
10
notifydevicesDS.printSchema
root
|-- dcId: string (nullable = true)
|-- deviceType: string (nullable = false)
|-- ip: string (nullable = true)
|-- deviceId: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- temp: long (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)

假设作为 ETL 的一部分,您需要根据某些警报条件通知或发送警报。一种方法是在迭代过滤数据集上编写用户功能并发送单独的通知。在其他情况下,您可以将消息作为附加选项发送到 Kafka 主题。
将嵌套的 JSON 分解为简单的 case 类之后,我们就可以向 NOC 发送警报以进行操作。在这方面是使用 foreach() DataFrame方法。但要做到这一点,我们需要一个高级功能;给定一个 case 类,它可以提取其警报属性来发出警报。虽然这个简单的示例写入 stdout,但在实际场景中,您需要通过 SNMP 或 HTTP POST 或某些 API 向 PagerAlert 发送警报。

警报通知功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// define a Scala Notification Object
object DeviceNOCAlerts {

def sendTwilio(message: String): Unit = {
//TODO: fill as necessary
println("Twilio:" + message)
}

def sendSNMP(message: String): Unit = {
//TODO: fill as necessary
println("SNMP:" + message)
}

def sendKafka(message: String): Unit = {
//TODO: fill as necessary
println("KAFKA:" + message)
}
}
def logAlerts(log: java.io.PrintStream = Console.out, deviceAlert: DeviceAlert, alert: String, notify: String ="twilio"): Unit = {
val message = "[***ALERT***: %s; data_center: %s, device_name: %s, temperature: %d; device_id: %d ; ip: %s ; c02: %d]" format(alert, deviceAlert.dcId, deviceAlert.deviceType,deviceAlert.temp, deviceAlert.deviceId, deviceAlert.ip, deviceAlert.c02_level)
//default log to Stderr/Stdout
log.println(message)
// use an appropriate notification method
val notifyFunc = notify match {
case "twilio" => DeviceNOCAlerts.sendTwilio _
case "snmp" => DeviceNOCAlerts.sendSNMP _
case "kafka" => DeviceNOCAlerts.sendKafka _
}
//send the appropriate alert
notifyFunc(message)
}
defined object DeviceNOCAlerts
logAlerts: (log: java.io.PrintStream, deviceAlert: DeviceAlert, alert: String, notify: String)Unit

迭代警报设备并采取行动

1
notifydevicesDS.foreach(d => logAlerts(Console.err, d, "ACTION NEED! HIGH TEPERATURE AND C02 LEVLES", "kafka"))

要查看消息的记录位置,请转到“集群” - >“Spark” - >“日志”。

将警报作为 JSON 发送到 Apache Kafka 主题

如果您想将这些设备的警报写入 Kafka 主题,监视订阅者正在等待事件采取行动,该怎么办?

这是一个关于 Kafka 主题的 nofity 监听器的简单方法:“device_alerts”。要进一步阅读如何使用 Apache Kafka 详细介绍 Structured Streaming,请阅读我们关于 Structure Streaming 的博客系列的第3部分。
注意:请参阅上面我们探讨的 selectExpr() 函数的另一种用法。

1
2
3
4
5
6
7
8
9
val deviceAlertQuery = notifydevicesDS
.selectExpr("CAST(dcId AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("toipic", "device_alerts")
.start()

org.apache.spark.sql.AnalysisException: Undefined function: 'to_json'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

嵌套的设备数据

让我们看看 Nest 的读数中另一个复杂的现实数据。 Nest 设备向其收集器发出许多 JSON 事件。该收集器可以位于附近的数据中心,邻域中央数据收集器或聚合器,或者它可以是安装在家中的设备,其定期将设备读数发送到通过安全的互联网连接连接的中央数据中心。对于幻觉,我已经限制了一些属性,但它仍然显示了如何处理复杂数据 - 以及提取相关属性。

让我们先定义它复杂的模式。仔细观察,您会发现它与我们上面定义的模式没有什么不同,除了它不是一个 map 而是三个 map:恒温器,摄像头和烟雾警报器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.apache.spark.sql.types._

// a bit longish, nested, and convuloted JSON schema :)
val nestSchema2 = new StructType()
.add("devices",
new StructType()
.add("thermostats", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("locale", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("last_connection", StringType)
.add("is_online", BooleanType)
.add("can_cool", BooleanType)
.add("can_heat", BooleanType)
.add("is_using_emergency_heat", BooleanType)
.add("has_fan", BooleanType)
.add("fan_timer_active", BooleanType)
.add("fan_timer_timeout", StringType)
.add("temperature_scale", StringType)
.add("target_temperature_f", DoubleType)
.add("target_temperature_high_f", DoubleType)
.add("target_temperature_low_f", DoubleType)
.add("eco_temperature_high_f", DoubleType)
.add("eco_temperature_low_f", DoubleType)
.add("away_temperature_high_f", DoubleType)
.add("away_temperature_low_f", DoubleType)
.add("hvac_mode", StringType)
.add("humidity", LongType)
.add("hvac_state", StringType)
.add("is_locked", StringType)
.add("locked_temp_min_f", DoubleType)
.add("locked_temp_max_f", DoubleType)))
.add("smoke_co_alarms", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("locale", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("last_connection", StringType)
.add("is_online", BooleanType)
.add("battery_health", StringType)
.add("co_alarm_state", StringType)
.add("smoke_alarm_state", StringType)
.add("is_manual_test_active", BooleanType)
.add("last_manual_test_time", StringType)
.add("ui_color_state", StringType)))
.add("cameras", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("is_online", BooleanType)
.add("is_streaming", BooleanType)
.add("is_audio_input_enabled", BooleanType)
.add("last_is_online_change", StringType)
.add("is_video_history_enabled", BooleanType)
.add("web_url", StringType)
.add("app_url", StringType)
.add("is_public_share_enabled", BooleanType)
.add("activity_zones",
new StructType()
.add("name", StringType)
.add("id", LongType))
.add("last_event", StringType))))
import org.apache.spark.sql.types._
nestSchema2: org.apache.spark.sql.types.StructType = StructType(StructField(devices,StructType(StructField(thermostats,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(can_cool,BooleanType,true), StructField(can_heat,BooleanType,true), StructField(is_using_emergency_heat,BooleanType,true), StructField(has_fan,BooleanType,true), StructField(fan_timer_active,BooleanType,true), StructField(fan_timer_timeout,StringType,true), StructField(temperature_scale,StringType,true), StructField(target_temperature_f,DoubleType,true), StructField(target_temperature_high_f,DoubleType,true), StructField(target_temperature_low_f,DoubleType,true), StructField(eco_temperature_high_f,DoubleType,true), StructField(eco_temperature_low_f,DoubleType,true), StructField(away_temperature_high_f,DoubleType,true), StructField(away_temperature_low_f,DoubleType,true), StructField(hvac_mode,StringType,true), StructField(humidity,LongType,true), StructField(hvac_state,StringType,true), StructField(is_locked,StringType,true), StructField(locked_temp_min_f,DoubleType,true), StructField(locked_temp_max_f,DoubleType,true)),true),true), StructField(smoke_co_alarms,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(battery_health,StringType,true), StructField(co_alarm_state,StringType,true), StructField(smoke_alarm_state,StringType,true), StructField(is_manual_test_active,BooleanType,true), StructField(last_manual_test_time,StringType,true), StructField(ui_color_state,StringType,true)),true),true), StructField(cameras,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(is_online,BooleanType,true), StructField(is_streaming,BooleanType,true), StructField(is_audio_input_enabled,BooleanType,true), StructField(last_is_online_change,StringType,true), StructField(is_video_history_enabled,BooleanType,true), StructField(web_url,StringType,true), StructField(app_url,StringType,true), StructField(is_public_share_enabled,BooleanType,true), StructField(activity_zones,StructType(StructField(name,StringType,true), StructField(id,LongType,true)),true), StructField(last_event,StringType,true)),true),true)),true))

通过创建一个简单的数据集,您可以使用所有数据集方法来执行 ETL,使用上面的实用程序函数:from_json()to_json()explode()selectExpr()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
val nestDataDS2 = Seq("""{
"devices": {
"thermostats": {
"peyiJNo0IldT2YlIVtYaGQ": {
"device_id": "peyiJNo0IldT2YlIVtYaGQ",
"locale": "en-US",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Hallway Upstairs",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"can_cool": true,
"can_heat": true,
"is_using_emergency_heat": true,
"has_fan": true,
"fan_timer_active": true,
"fan_timer_timeout": "2016-10-31T23:59:59.000Z",
"temperature_scale": "F",
"target_temperature_f": 72,
"target_temperature_high_f": 80,
"target_temperature_low_f": 65,
"eco_temperature_high_f": 80,
"eco_temperature_low_f": 65,
"away_temperature_high_f": 80,
"away_temperature_low_f": 65,
"hvac_mode": "heat",
"humidity": 40,
"hvac_state": "heating",
"is_locked": true,
"locked_temp_min_f": 65,
"locked_temp_max_f": 80
}
},
"smoke_co_alarms": {
"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
"device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
"locale": "en-US",
"software_version": "1.01",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Jane's Room",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"battery_health": "ok",
"co_alarm_state": "ok",
"smoke_alarm_state": "ok",
"is_manual_test_active": true,
"last_manual_test_time": "2016-10-31T23:59:59.000Z",
"ui_color_state": "gray"
}
},
"cameras": {
"awJo6rH0IldT2YlIVtYaGQ": {
"device_id": "awJo6rH",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Foyer",
"is_online": true,
"is_streaming": true,
"is_audio_input_enabled": true,
"last_is_online_change": "2016-12-29T18:42:00.000Z",
"is_video_history_enabled": true,
"web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
"app_url": "nestmobile://cameras/device_id?auth=access_token",
"is_public_share_enabled": true,
"activity_zones": { "name": "Walkway", "id": 244083 },
"last_event": "2016-10-31T23:59:59.000Z"
}
}
}
}""").toDS
nestDataDS2: org.apache.spark.sql.Dataset[String] = [value: string]

让我们从这个单一的嵌套结构创建一个 DataFrame,并使用上述所有实用程序函数来处理和提取相关属性

1
2
3
4
5
val nestDF2 = spark                            // spark session 
.read // get DataFrameReader
.schema(nestSchema2) // use the defined schema above and read format as JSON
.json(nestDataDS2.rdd)
nestDF2: org.apache.spark.sql.DataFrame = [devices: struct<thermostats: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,can_cool:boolean,can_heat:boolean,is_using_emergency_heat:boolean,has_fan:boolean,fan_timer_active:boolean,fan_timer_timeout:string,temperature_scale:string,target_temperature_f:double,target_temperature_high_f:double,target_temperature_low_f:double,eco_temperature_high_f:double,eco_temperature_low_f:double,away_temperature_high_f:double,away_temperature_low_f:double,hvac_mode:string,humidity:bigint,hvac_state:string,... 3 more fields>>, smoke_co_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>> ... 1 more field>]
1
{"thermostats":{"peyiJNo0IldT2YlIVtYaGQ":{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72,"target_temperature_high_f":80,"target_temperature_low_f":65,"eco_temperature_high_f":80,"eco_temperature_low_f":65,"away_temperature_high_f":80,"away_temperature_low_f":65,"hvac_mode":"heat","humidity":"40","hvac_state":"heating","is_locked":"true","locked_temp_min_f":65,"locked_temp_max_f":80}},"smoke_co_alarms":{"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{"device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale":"en-US","software_version":"1.01","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Jane's Room","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"battery_health":"ok","co_alarm_state":"ok","smoke_alarm_state":"ok","is_manual_test_active":true,"last_manual_test_time":"2016-10-31T23:59:59.000Z","ui_color_state":"gray"}},"cameras":{"awJo6rH0IldT2YlIVtYaGQ":{"device_id":"awJo6rH","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Foyer","is_online":true,"is_streaming":true,"is_audio_input_enabled":true,"last_is_online_change":"2016-12-29T18:42:00.000Z","is_video_history_enabled":true,"web_url":"https://home.nest.com/cameras/device_id?auth=access_token","app_url":"nestmobile://cameras/device_id?auth=access_token","is_public_share_enabled":true,"activity_zones":{"name":"Walkway","id":"244083"},"last_event":"2016-10-31T23:59:59.000Z"}}}

如上所述将上面的整个 JSON 对象转换为 JSON 字符串。

1
2
val stringJsonDF = nestDF2.select(to_json(struct($"*"))).toDF("nestDevice")
stringJsonDF: org.apache.spark.sql.DataFrame = [nestDevice: string]
1
2
3
stringJsonDF.printSchema
root
|-- nestDevice: string (nullable = true)
1
2
display(stringJsonDF)
{"devices":{"thermostats":{"peyiJNo0IldT2YlIVtYaGQ":{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72.0,"target_temperature_high_f":80.0,"target_temperature_low_f":65.0,"eco_temperature_high_f":80.0,"eco_temperature_low_f":65.0,"away_temperature_high_f":80.0,"away_temperature_low_f":65.0,"hvac_mode":"heat","humidity":40,"hvac_state":"heating","is_locked":"true","locked_temp_min_f":65.0,"locked_temp_max_f":80.0}},"smoke_co_alarms":{"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{"device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale":"en-US","software_version":"1.01","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7...

给定具有三个映射的嵌套 JSON 对象,您可以获取单个映射作为 columnn,然后使用 explode() 访问它的属性。

1
2
3
4
val mapColumnsDF = nestDF2.select($"devices".getItem("smoke_co_alarms").alias ("smoke_alarms"),
$"devices".getItem("cameras").alias ("cameras"),
$"devices".getItem("thermostats").alias ("thermostats"))
mapColumnsDF: org.apache.spark.sql.DataFrame = [smoke_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>>, cameras: map<string,struct<device_id:string,software_version:string,structure_id:string,where_name:string,is_online:boolean,is_streaming:boolean,is_audio_input_enabled:boolean,last_is_online_change:string,is_video_history_enabled:boolean,web_url:string,app_url:string,is_public_share_enabled:boolean,activity_zones:struct<name:string,id:bigint>,last_event:string>> ... 1 more field]
smoke_alarms cameras thermostats
{“RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs”:{“device_id”:”RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs”,”locale”:”en-US”,”software_version”:”1.01”,”structure_id”:”VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw”,”where_name”:”Jane’s Room”,”last_connection”:”2016-10-31T23:59:59.000Z”,”is_online”:true,”battery_health”:”ok”,”co_alarm_state”:”ok”,”smoke_alarm_state”:”ok”,”is_manual_test_active”:true,”last_manual_test_time”:”2016-10-31T23:59:59.000Z”,”ui_color_state”:”gray”}}” “{“awJo6rH0IldT2YlIVtYaGQ”:{“device_id”:”awJo6rH”,”software_version”:”4.0”,”structure_id”:”VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw”,”where_name”:”Foyer”,”is_online”:true,”is_streaming”:true,”is_audio_input_enabled”:true,”last_is_online_change”:”2016-12-29T18:42:00.000Z”,”is_video_history_enabled”:true,”web_url”:”https://home.nest.com/cameras/device_id?auth=access_token","app_url":"nestmobile://cameras/device_id?auth=access_token","is_public_share_enabled":true,"activity_zones":{"name":"Walkway","id":"244083"},"last_event":"2016-10-31T23:59:59.000Z"}}" “{“peyiJNo0IldT2YlIVtYaGQ”:{“device_id”:”peyiJNo0IldT2YlIVtYaGQ”,”locale”:”en-US”,”software_version”:”4.0”,”structure_id”:”VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw”,”where_name”:”Hallway Upstairs”,”last_connection”:”2016-10-31T23:59:59.000Z”,”is_online”:true,”can_cool”:true,”can_heat”:true,”is_using_emergency_heat”:true,”has_fan”:true,”fan_timer_active”:true,”fan_timer_timeout”:”2016-10-31T23:59:59.000Z”,”temperature_scale”:”F”,”target_temperature_f”:72,”target_temperature_high_f”:80,”target_temperature_low_f”:65,”eco_temperature_high_f”:80,”eco_temperature_low_f”:65,”away_temperature_high_f”:80,”away_temperature_low_f”:65,”hvac_mode”:”heat”,”humidity”:”40”,”hvac_state”:”heating”,”is_locked”:”true”,”locked_temp_min_f”:65,”locked_temp_max_f”:80}}
1
2
3
4
5
6
7
val explodedThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
val explodedCamerasDF = mapColumnsDF.select(explode($"cameras"))
//or you could use the original nestDF2 and use the devices.X notation
val explodedSmokedAlarmsDF = nestDF2.select(explode($"devices.smoke_co_alarms"))
explodedThermostatsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 25 more fields>]
explodedCamerasDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, software_version: string ... 12 more fields>]
explodedSmokedAlarmsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 11 more fields>]
key value
peyiJNo0IldT2YlIVtYaGQ {“device_id”:”peyiJNo0IldT2YlIVtYaGQ”,”locale”:”en-US”,”software_version”:”4.0”,”structure_id”:”VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw”,”where_name”:”Hallway Upstairs”,”last_connection”:”2016-10-31T23:59:59.000Z”,”is_online”:true,”can_cool”:true,”can_heat”:true,”is_using_emergency_heat”:true,”has_fan”:true,”fan_timer_active”:true,”fan_timer_timeout”:”2016-10-31T23:59:59.000Z”,”temperature_scale”:”F”,”target_temperature_f”:72,”target_temperature_high_f”:80,”target_temperature_low_f”:65,”eco_temperature_high_f”:80,”eco_temperature_low_f”:65,”away_temperature_high_f”:80,”away_temperature_low_f”:65,”hvac_mode”:”heat”,”humidity”:”40”,”hvac_state”:”heating”,”is_locked”:”true”,”locked_temp_min_f”:65,”locked_temp_max_f”:80}

要从 map 中提取特定的单个字段,可以使用 getItem() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val thermostateDF = explodedThermostatsDF.select($"value".getItem("device_id").alias("device_id"), 
$"value".getItem("locale").alias("locale"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("last_connection").alias("last_connected"),
$"value".getItem("humidity").alias("humidity"),
$"value".getItem("target_temperature_f").alias("target_temperature_f"),
$"value".getItem("hvac_mode").alias("mode"),
$"value".getItem("software_version").alias("version"))

val cameraDF = explodedCamerasDF.select($"value".getItem("device_id").alias("device_id"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("software_version").alias("version"),
$"value".getItem("activity_zones").getItem("name").alias("name"),
$"value".getItem("activity_zones").getItem("id").alias("id"))

val smokedAlarmsDF = explodedSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("software_version").alias("version"),
$"value".getItem("last_connection").alias("last_connected"),
$"value".getItem("battery_health").alias("battery_health"))


thermostateDF: org.apache.spark.sql.DataFrame = [device_id: string, locale: string ... 6 more fields]
cameraDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields]
smokedAlarmsDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields]
  • display(thermostateDF)
device_id locale location last_connected humidity target_temperature_f mode version
peyiJNo0IldT2YlIVtYaGQ en-US Hallway Upstairs 2016-10-31T23:59:59.000Z 40 72 heat 4.0
  • display(cameraDF)
device_id location version name id
awJo6rH Foyer 4.0 Walkway 244083
  • display(smokedAlarmsDF)
device_id location version last_connected battery_health
RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs Jane’s Room 1.01 2016-10-31T23:59:59.000Z ok

让我们在列版本上 join 两个 DataFrame。

  • display(joineDFs)
1
2
3
val joineDFs = thermostateDF.join(cameraDF, "version")

joineDFs: org.apache.spark.sql.DataFrame = [version: string, device_id: string ... 10 more fields]
version device_id locale location last_connected humidity target_temperature_f mode device_id location name id
4.0 peyiJNo0IldT2YlIVtYaGQ en-US Hallway Upstairs 2016-10-31T23:59:59.000Z 40 72 heat awJo6rH Foyer Walkway 244083

总结

这个简短教程的重点是演示如何轻松使用实用程序函数从复杂的嵌套结构中提取 JSON 属性。一旦您将所需的值展开或展平或解析到相应的数据框或数据集中,您就可以像使用相应的 API 一样轻松地提取和查询它们,就像使用任何DataFrame 或 Dataset 一样。查看我们的结构化流体系列第3部分,其中展示了如何从 Apache Kafka 读取 Nest 设备日志并对其进行一些 ETL。