Wait the light to fall

在结构化流中使用复杂数据格式

焉知非鱼

在本系列结构化流博客文章的第1部分中,我们演示了使用结构化流编写端到端流式 ETL 管道是多么容易,它将 JSON CloudTrail 日志转换为 Parquet 表。 该博客强调,构建此类管道的主要挑战之一是从各种来源和复杂格式中读取和转换数据。 在这篇博文中,我们将更详细地研究这个问题,并展示如何使用 Apache Spark SQL 的内置函数来解决所有数据转换挑战。

具体来说,我们将讨论以下内容:

  • 有哪些不同的数据格式及其权衡
  • 如何使用Spark SQL轻松使用它们
  • 如何为您的用例选择正确的最终格式

Data sources and formats

数据以各种不同的格式提供。 电子表格可以用 XML,CSV,TSV 表示; 应用程序指标可以用原始文本或 JSON 写出。 每个用例都有针对它定制的特定数据格式。 在大数据领域,我们通常会遇到 Parquet,ORC,Avro,JSON,CSV,SQL 和 NoSQL 数据源以及纯文本文件等格式。 我们可以将这些数据格式大致分为三类:结构化,半结构化和非结构化数据。 让我们试着了解每个类别的好处和缺点。

img

结构化数据 #

结构化数据源定义数据的模式。利用有关底层数据的额外信息,结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使得从列的子集中提取值变得更加容易。首先逐行读取每个记录,然后从感兴趣的特定列中提取值可以读取比查询仅对一小部分列感兴趣时所需的更多数据。基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。例如,由于结构的刚性,演变模式可能具有挑战性。

非结构化数据 #

相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据(例如,CSV文件中的逗号),以定义数据的组织。报纸文章,医疗记录,图像blob,应用程序日志通常被视为非结构化数据。这些类型的源通常要求数据周围的上下文是可解析的。也就是说,您需要知道该文件是图像或是报纸文章。大多数数据源都是非结构化的。具有非结构化格式的成本是从这些数据源中提取价值变得麻烦,因为需要许多转换和特征提取技术来解释这些数据集

半结构化数据 #

半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。结果,每个数据记录都使用其架构信息进行扩充。 JSON和XML是很受欢迎的例子。半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。这些格式在许多应用程序中非常常见,因为存在许多用于处理这些记录的轻量级解析器,并且它们还具有人类可读的优点。但是,这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc查询而构建的。

与Spark SQL交换数据格式

在我们之前的博客文章中,我们讨论了如何将Cloudtrail Logs从JSON转换为Parquet,将我们的即席查询的运行时间缩短了10倍。 Spark SQL允许用户从批处理和流式查询中提取这些数据源类中的数据。 它本身支持以Parquet,ORC,JSON,CSV和文本格式读取和写入数据,并且Spark包中还存在大量其他连接器。 您还可以使用JDBC DataSource连接到SQL数据库。

Apache Spark可用于交换数据格式,如下所示:

events = spark.readStream \
  .format("json") \           # or parquet, kafka, orc...
  .option() \                 # format specific options
  .schema(my_schema) \        # required
  .load("path/to/data")

output =                    # perform your transformations

output.writeStream \          # write out your data 
  .format("parquet") \
  .start("path/to/write")

无论是批量数据还是流数据,我们都知道如何读取和写入不同的数据源和格式,但不同的源支持不同类型的模式和数据类型。 传统数据库仅支持原始数据类型,而像JSON这样的格式允许用户在列中嵌套对象,具有值数组或表示一组键值对。 用户通常必须介于这些数据类型之间以有效地存储和表示他们的数据。 幸运的是,Spark SQL可以轻松处理原始数据类型和复杂数据类型。 现在让我们快速概述一下如何从复杂数据类型转换为原始数据类型,反之亦然。

Transforming complex data types

img

在使用半结构化格式时,通常会有复杂的数据类型,如结构体,map和数组。例如,您可能正在将API请求记录到Web服务器。此API请求将包含HTTP标头,它将是一个字符串字符串 map。请求有效负载可以包含JSON形式的表单数据,其可以包含嵌套字段或数组。某些源或格式可能支持也可能不支持复杂数据类型。在将数据存储在特定数据类型中时,某些格式可能会提供性能优势。例如,使用Parquet时,所有struct列都将获得与顶级列相同的处理。因此,如果您在嵌套字段上有过滤器,您将获得与顶级列相同的好处。但是,map被视为两个数组列,因此您将无法获得有效的过滤语义。

让我们看一些关于Spark SQL如何允许您使用一些数据转换技术随意调整数据的示例。

Selecting from nested columns #

点(.)可用于访问结构体和 map 的嵌套列。

// input
{
  "a": {
     "b": 1
  }
}

Python: events.select(“a.b”) Scala: events.select(“a.b”) SQL: select a.b from events

// output
{
  "b": 1
}

Flattening structs #

星号(*)可用于选择结构体中的所有子字段。

// input
{
  "a": {
     "b": 1,
     "c": 2
  }
}

Python: events.select(“a.”) Scala: events.select(“a.”) SQL: select a.* from events

// output
{
  "b": 1,
  "c": 2
}

Nesting columns #

struct 函数或SQL中的括号可用于创建新结构体。

// input
{
  "a": 1,
  "b": 2,
  "c": 3
}

Python: events.select(struct(col(“a”).alias(“y”)).alias(“x”)) Scala: events.select(struct(‘a as ‘y) as ‘x) SQL: select named_struct(“y”, a) as x from events

// output
{
  "x": {
    "y": 1
  }
}

Nesting all columns #

星号(*)也可用于包含嵌套结构体中的所有列。

// input
{
  "a": 1,
  "b": 2
}

Python: events.select(struct("").alias(“x”)) Scala: events.select(struct("") as ‘x) SQL: select struct(*) as x from events

// output
{
  "x": {
    "a": 1,
    "b": 2
  }
}

Selecting a single array or map element #

getItem() 或方括号(即 [])可用于从数组或map中选择单个元素。

// input
{
  "a": [1, 2]
}

Python: events.select(col(“a”).getItem(0).alias(“x”)) Scala: events.select(‘a.getItem(0) as ‘x) SQL: select a[0] as x from events

// output
{ "x": 1 }
// input
{
  "a": {
    "b": 1
  }
}

Python: events.select(col(“a”).getItem(“b”).alias(“x”)) Scala: events.select(‘a.getItem(“b”) as ‘x) SQL: select a[‘b’] as x from events

// output
{ "x": 1 }

Creating a row for each array or map element #

explode() 可用于为数组中的每个元素或每个键值对创建一个新行。 这类似于 HiveQL 中的 LATERAL VIEW EXPLODE。

// input
{
  "a": [1, 2]
}

Python: events.select(explode(“a”).alias(“x”)) Scala: events.select(explode(‘a) as ‘x) SQL: select explode(a) as x from events

// output
[{ "x": 1 }, { "x": 2 }]
// input
{
  "a": {
    "b": 1,
    "c": 2
  }
}

Python: events.select(explode(“a”).alias(“x”, “y”)) Scala: events.select(explode(‘a) as Seq(“x”, “y”)) SQL: select explode(a) as (x, y) from events

// output
[{ "x": "b", "y": 1 }, { "x": "c", "y": 2 }]

Selecting one field from each item in an array #

在数组上使用点表示法时,我们返回一个新数组,其中从每个数组元素中选择了该字段。

// input
{
  "a": [
    {"b": 1},
    {"b": 2}
  ]
}

Python: events.select(“a.b”) Scala: events.select(“a.b”) SQL: select a.b from events

// output
{
  "b": [1, 2]
}

Power of to_json() and from_json() #

如果您真的想要保留列的复杂结构但是需要将其编码为字符串来存储它,该怎么办? 你注定了吗? 当然不是! Spark SQL提供了类似 to_json() 的函数来将结构编码为字符串,而 from_json() 则将结构作为复杂类型进行检索。 在读取或写入像Kafka这样的流式源时,使用JSON字符串作为列非常有用。 每个Kafka键值记录都会增加一些元数据,例如Kafka中的摄取时间戳,Kafka中的偏移量等。如果包含数据的“value”字段是JSON,则可以使用from_json() 来提取您的数据,丰富它,清理它,然后再将其下游推送到Kafka或将其写入文件。

Encode a struct as json #

to_json() 可用于将结构转换为JSON字符串。 在将数据写入Kafka时,如果要将多个列重新编码为单个列,此方法特别有用。 SQL中目前不提供此方法。

// input
{
  "a": {
    "b": 1
  }
}

Python: events.select(to_json(“a”).alias(“c”)) Scala: events.select(to_json(‘a) as ‘c)

// output
{
  "c": "{\"b\":1}"
}

Decode json column as a struct #

from_json() 可用于将带有JSON数据的字符串列转换为结构。 然后,您可以如上所述展平结构以具有单独的列。 SQL中目前不提供此方法。

/ input
{
  "a": "{\"b\":1}"
}

Python: schema = StructType().add(“b”, IntegerType()) events.select(from_json(“a”, schema).alias(“c”)) Scala: val schema = new StructType().add(“b”, IntegerType) events.select(from_json(‘a, schema) as ‘c)

// output
{
  "c": {
    "b": 1
  }
}

有时您可能希望将JSON字符串的一部分保留为JSON,以避免模式中的过多复杂性。

// input
{
  "a": "{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"
}

Python: schema = StructType().add(“b”, StructType().add(“x”, IntegerType()) .add(“y”, StringType())) events.select(from_json(“a”, schema).alias(“c”)) Scala: val schema = new StructType().add(“b”, new StructType().add(“x”, IntegerType) .add(“y”, StringType)) events.select(from_json(‘a, schema) as ‘c)

// output
{
  "c": {
    "b": {
      "x": 1,
      "y": "{\"z\":2}"
    }
  }
}

Parse a set of fields from a column containing JSON #

json_tuple() 可用于提取带有JSON数据的字符串列中可用的字段。

// input
{
  "a": "{\"b\":1}"
}

Python: events.select(json_tuple(“a”, “b”).alias(“c”)) Scala: events.select(json_tuple(‘a, “b”) as ‘c) SQL: select json_tuple(a, “b”) as c from events

// output
{ "c": 1 }

有时字符串列可能不会自我描述为JSON,但可能仍然具有格式良好的结构。 例如,它可以是使用特定Log4j格式生成的日志消息。 Spark SQL可以轻松地为您构建这些字符串!

Parse a well-formed string column #

regexp_extract() 可用于使用正则表达式解析字符串。

// input
[{ "a": "x: 1" }, { "a": "y: 2" }]

Python: events.select(regexp_extract(“a”, “([a-z]):”, 1).alias(“c”)) Scala: events.select(regexp_extract(‘a, “([a-z]):”, 1) as ‘c) SQL: select regexp_extract(a, “([a-z]):”, 1) as c from events

// output
[{ "c": "x" }, { "c": "y" }]

这是很多转变! 现在让我们看看一些现实生活中的用例,以便充分利用所有这些数据格式和数据处理功能。

利用所有这些力量

在Databricks,我们从我们的服务中收集日志,并在客户受到影响之前使用它们执行实时监控以检测问题。日志文件是非结构化文件,但它们是可解析的,因为它们具有明确定义的Log4j格式。我们运行日志收集器服务,将每个日志条目和有关JSON中的条目(例如源)的其他元数据发送到Kinesis。然后将这些JSON记录作为文件批量上载到S3。查询这些JSON日志以回答任何问题是繁琐的:这些文件包含重复项,并且对于回答任何查询,即使它涉及单个列,整个JSON记录也可能需要反序列化。

为解决此问题,我们运行一个管道来读取这些JSON记录并对元数据执行重复数据删除。现在我们留下原始日志记录,可以是JSON格式或非结构化文本。如果我们正在处理JSON,我们使用from_json()和上面描述的几个转换来格式化我们的数据。如果是文本,我们使用 regexp_extract() 等方法将我们的Log4j格式解析为更结构化的形式。完成所有转换和重组后,我们将记录保存在按日期划分的Parquet中。在回答诸如“我们在10:00-10:30之间看到的针对此特定服务的ERROR消息有多少”这样的问题时,这为我们提供了10-100倍的加速?加速可归因于:

  • 我们不会再为反序列化JSON记录付出代价了
  • 我们不必对原始日志消息执行复杂的字符串比较
  • 我们只需要在查询中提取两列:时间和日志级别

以下是我们在客户中看到的一些常见用例:

“我想用我的数据运行机器学习管道。我的数据已经过预处理,我将在整个管道中使用我的所有功能。“

当您访问整行数据时,Avro是一个不错的选择。

“我有一个IoT用例,我的传感器向我发送事件。对于每个事件,重要的元数据是不同的。“

如果您希望架构具有灵活性,可以考虑使用JSON来存储数据。

“我想在报纸文章或产品评论的情绪分析上训练语音识别算法。”

如果您的数据可能没有固定的架构,也没有固定的模式/结构,则可能更容易将其存储为纯文本文件。您可能还有一个管道,可以对此非结构化数据执行特征提取,并将其存储为Avro,以准备您的机器学习管道。

结论

在这篇博文中,我们讨论了Spark SQL如何允许您使用来自多种源和格式的数据,并轻松地执行这些数据格式之间的转换和交换。我们分享了如何在Databricks中策划我们的数据,并考虑了其他您可能希望以不同方式执行操作的生产用例。

Spark SQL以任何可能的格式为您提供访问数据的必要工具,并为下游应用程序做好准备,无论是流数据的低延迟还是旧历史数据的高吞吐量!

在本系列的未来博客文章中,我们将详细介绍:

  • 监控流媒体应用程序
  • 将结构化流与Apache Kafka集成
  • 使用结构化流式计算事件时间聚合

如果您想了解有关结构化流媒体的更多信息,请参阅以下几个有用的链接。

最后,尝试我们的示例笔记本,演示如何在Databricks中使用Python,Scala或SQL转换复杂数据类型: