Wait the light to fall

Querying Parquet With Millisecond Latency

焉知非鱼

Querying Parquet With Millisecond Latency

原文链接: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/

以毫秒级的延迟查询 Parquet

注:本文最初发表于 InfluxData 博客

我们相信,直接查询 Apache Parquet 文件中的数据可以达到与大多数专门文件格式类似或更好的存储效率和查询性能。虽然这需要大量的工程努力,但 Parquet 的开放格式和广泛的生态系统支持的好处使它成为一类广泛的数据系统的明显选择。

在这篇文章中,我们解释了快速查询以 Parquet 格式存储的数据所需的几种高级技术,我们在 Apache Arrow Rust Parquet 阅读器中实现了这些技术。这些技术结合在一起,使得 Rust 的实现成为查询 Parquet 文件的最快实现之一,即使不是最快的,也是最快的–无论是在本地磁盘还是远程对象存储。它能够在几毫秒内查询到数 GB 的 Parquet 文件。

我们要感谢 InfluxData 对这项工作的支持。InfluxData 对开源软件有着深刻而持续的承诺,它赞助了我们撰写这篇博文的大部分时间,以及作为构建 InfluxDB IOx 存储引擎一部分的许多贡献。

背景介绍

Apache Parquet 是一种越来越流行的用于存储分析数据集的开放格式,并且已经成为低成本、与 DBMS 无关的数据存储的事实标准。Parquet 最初是为 Hadoop 生态系统创建的,现在由于其令人信服的组合,Parquet 的范围广泛地扩展到整个数据分析生态系统。

  • 高压缩率
  • 对 S3 等商品 blob 存储的适应性
  • 广泛的生态系统和工具支持
  • 可在许多不同的平台和工具上移植
  • 支持任意的结构化数据

越来越多的其他系统,如 DuckDBRedshift,允许直接查询存储在 Parquet 中的数据,但与他们的原生(自定义)文件格式相比,支持通常仍是次要考虑因素。这样的格式包括 DuckDB .duckdb 文件格式、Apache IOT TsFileGorilla 格式以及其他。

这是第一次,访问同样复杂的查询技术,以前只在闭源的商业实现中可用,现在可以作为开放源码。所需的工程能力来自于具有全球贡献者社区的大型、运行良好的开源项目,如 Apache ArrowApache Impala

Parquet 文件格式

在深入了解从 Parquet 中有效读取数据的细节之前,了解文件的布局很重要。该文件格式经过精心设计,可以快速定位所需信息,跳过不相关的部分,并有效解码剩下的部分。

  • Parquet 文件中的数据被分成水平的切片,称为 RowGroup
  • 每个 RowGroup 包含模式中每一列的单个 ColumnChunk

例如,下图说明了一个 Parquet 文件,其中有三列 “A”、“B” 和 “C” 存储在两个 RowGroup 中,总共有 6 个 ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     1    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 1  ColumnChunk 2 ColumnChunk 3  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     2    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 4  ColumnChunk 5 ColumnChunk 6  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

ColumnChunk 的逻辑值使用许多可用的编码之一写入一个或多个数据页,并按顺序附加在文件中。在 Parquet 文件的末尾有一个页脚,它包含了重要的元数据,如:。

  • 文件的模式信息,如列名和类型
  • 文件中的 RowGroupColumnChunk 的位置

页脚也可能包含其他专门的数据结构。

  • 每个 ColumnChunk 的可选统计数据,包括最小/最大值和 null 值个数
  • 可选的指向 OffsetIndexes 的指针,包含每个单独页面的位置
  • 可选的指向 ColumnIndex 的指针,包含每个页面的行数和汇总统计。
  • 可选的指向 BloomFilterData 的指针,它可以快速检查一个值是否存在于 ColumnChunk 中。

例如,上图中 2 个 Row Group 和 6 个 ColumnChunk 的逻辑结构可以存储在一个 Parquet 文件中,如下图所示(不按比例)。ColumnChunk 的页面在前面,后面是页脚。数据、编码方案的有效性以及 Parquet 编码器的设置决定了每个 ColumnChunk 所需的页面数量和大小。在本例中,ColumnChunk 1 需要 2 页,而 ColumnChunk 6 只需要 1 页。除了其他信息外,页脚还包含每个数据页的位置和列的类型。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 1 ("A")             ◀─┃─ ─ ─│
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 1 ("A")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 2 ("B")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 3 ("C")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 4 ("A")             ◀─┃─ ─ ─│─ ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 5 ("B")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 6 ("C")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃     │  │
┃┃Footer                                        ┃ ┃
┃┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃     │  │
┃┃ ┃File Metadata                             ┃ ┃ ┃
┃┃ ┃ Schema, etc                              ┃ ┃ ┃     │  │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 1 Metadata              ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ Location of ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ first Data  ┣ ─ ─ ╋ ╋ ╋ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Page, row   ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┃Column "B" Metadata┃ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ sizes,      ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃ values, etc ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃        │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 2 Metadata              ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Location of ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ first Data  ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ Page, row   ┣ ─ ─ ╋ ╋ ╋ ─ ─ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "B" Metadata┃ sizes,      ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ values, etc ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃
┃┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃
┃┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

在创建 Parquet 文件时,有许多重要的标准需要考虑,如如何优化数据的顺序/集群,并将其结构化为 RowGroup 和 Data Page。这种"物理设计"的考虑很复杂,值得自己写一系列的文章,在这篇博文中不涉及。相反,我们专注于如何使用可用的结构来使查询变得非常快。

优化查询

在任何查询处理系统中,以下技术通常可以提高性能:

    1. 减少必须从二级存储中传输处理的数据(减少 I/O)
    1. 减少解码数据的计算负荷(减少 CPU)。
    1. 对数据的读取和解码进行交错/管道化处理(提高并行性)

同样的原则也适用于查询 Parquet 文件,我们将在下面介绍。

解码优化

Parquet 通过使用复杂的编码技术,如运行长度压缩、字典编码、delta 编码等,实现了惊人的压缩率。因此,由 CPU 负责的解码任务可以主导查询延迟。Parquet 读取器可以使用一些技术来改善这一任务的延迟和吞吐量,正如我们在 Rust 实现中所做的那样。

矢量解码

大多数分析系统一次将多个值解码成列式内存格式,如 Apache Arrow,而不是逐行处理数据。这通常被称为矢量或列式处理,它是有益的,因为它:

  • 摊薄调度开销,以便在被解码的列的类型上进行切换
  • 通过从一个 ColumnChunk 中读取连续的值来提高缓存的位置性
  • 通常允许在一条指令中解码多个值
  • 用一个大的分配来避免许多小的堆分配,对于可变长度的类型,如字符串和字节数组,产生了显著的节省

因此,Rust Parquet 阅读器实现了专门的解码器,用于将 Parquet 直接读成列式内存格式(Arrow Arrays)。

流式解码

ColumnChunk 之间,哪些行被存储在哪些 Pages 中没有关系。例如,第 10,000 行的逻辑值可能在 A 列的第一页和 B 列的第三页。

矢量解码最简单的方法,也是最初在 Parquet 解码器中经常实现的方法,就是一次对整个 RowGroup(或 ColumnChunk)进行解码。

然而,鉴于 Parquet 的高压缩率,一个 RowGroup 很可能包含数百万行。一次性解码这么多行是不理想的,因为它:

  • 需要大量的中间 RAM:典型的内存格式的优化处理,如 Apache Arrow,需要比其 Parquet 编码的形式多得多。
  • 增加了查询延迟。后续的处理步骤(如过滤或聚合)只有在整个 RowGroup(或 ColumnChunk)被解码后才能开始。

因此,最好的 Parquet 读取器支持"流式"数据输出,按要求产生可配置大小的行批。批的大小必须足够大,以摊销解码的开销,但又要足够小,以便有效地使用内存,并允许下游处理在随后的批次解码时同时开始。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃ Data Page for ColumnChunk 1 │◀┃─                   ┌── ─── ─── ─── ─── ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┏━━━━━━━┓        ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃     ┃       ┃      │                   │
┃ Data Page for ColumnChunk 1 │ ┃ │   ┃       ┃   ─ ▶│ │   │ │   │ │   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃  ─ ─┃       ┃─ ┤   │  ─ ─   ─ ─   ─ ─  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┃       ┃           A    B     C   │
┃ Data Page for ColumnChunk 2 │◀┃─    ┗━━━━━━━┛  │   └── ─── ─── ─── ─── ┘
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │    Parquet
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃      Decoder   │            ...
┃ Data Page for ColumnChunk 3 │ ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                │   ┌── ─── ─── ─── ─── ┐
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │                    ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃ Data Page for ColumnChunk 3 │◀┃─               │   │                   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                 ─ ▶│ │   │ │   │ │   │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    │  ─ ─   ─ ─   ─ ─  │
┃ Data Page for ColumnChunk 3 │ ┃                         A    B     C   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    └── ─── ─── ─── ─── ┘
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

      Parquet file                                    Smaller in memory
                                                         batches for
                                                         processing

虽然流媒体的解释并不复杂,但解码的有状态性,特别是跨多列和任意嵌套的数据,其中行和值之间的关系并不固定,需要复杂的中间缓冲和大量的工程努力来正确处理。

字典保存

字典编码,也被称为分类编码,是一种技术,列中的每个值不直接存储,而是在一个单独的列表中存储一个索引,称为"字典"。这种技术对于有重复值的列来说,实现了第三种正常形式的许多好处(低基数),对于像 “City” 这样的字符串列特别有效。

ColumnChunk 中的第一页可以选择是一个字典页,包含一个列的类型的值列表。在这个 ColumnChunk 中的后续页面可以对这个字典的索引进行编码,而不是直接对值进行编码。

考虑到这种编码的有效性,如果 Parquet 解码器只是简单地将字典数据解码为本地类型,那么它就会低效地反复复制相同的值,这对字符串数据来说尤其是灾难性的。为了有效地处理字典编码的数据,在解码时必须保留编码。方便的是,许多列式格式,如 Arrow DictionaryArray,支持这种兼容的编码。

在向 Arrow 数组读取数据时,保留字典编码可以极大地提高性能,在某些情况下可以超过 60 倍,而且使用的内存也大大减少。

保存字典的主要复杂因素是,字典是按 ColumnChunk 存储的,因此字典在 RowGroup 之间发生变化。阅读器必须为跨越多个 RowGroup 的批次自动重新计算字典,同时还要优化批次大小均匀地划分为每个 RowGroup 的行数的情况。此外,一个列可能只有部分的字典编码,这使得实现更加复杂。关于这种技术及其复杂性的更多信息可以在关于将这种技术应用于 C++ Parquet 阅读器的博文中找到。

投影下推

最基本的 Parquet 优化,也是最常见的 Parquet 文件的优化,是投影下推,它可以减少 I/O 和 CPU 的需求。投影在这里意味着"选择一些但不是所有的列"。鉴于 Parquet 是如何组织数据的,只读取和解码被引用列所需的 ColumnChunk 是很简单的。

例如,考虑一个 SQL 查询,其形式为

SELECT B from table where A > 35

这个查询只需要 A 列和 B 列的数据(而不是 C 列),而且这个投影可以"向下推"给 Parquet 阅读器。

具体来说,使用页脚中的信息,Parquet 阅读器可以完全跳过获取(I/O)和解码(CPU)存储 C 列数据的数据页(在我们的例子中是 ColumnChunk 3 和 ColumnChunk 6)。

                             ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ┌─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 2 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       │     ┃ Data Page for ColumnChunk 3 ("C") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
   A query that        │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
  accesses only        │     ┃ Data Page for ColumnChunk 3 ("C") ┃
 columns A and B       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
can read only the      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
 relevant pages,  ─────┤     ┃ Data Page for ColumnChunk 3 ("C") ┃
skipping any Data      │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
Page for column C      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 4 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       └─────▶ Data Page for ColumnChunk 5 ("B") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                             ┃ Data Page for ColumnChunk 6 ("C") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

谓词下推

与投影下推类似,谓词下推也避免了从 Parquet 文件中获取和解码数据,但它是使用过滤器表达式。这种技术通常需要与查询引擎(如 DataFusion)更紧密地结合,以确定有效的谓词并在扫描过程中评估它们。不幸的是,如果没有精心设计的 API,Parquet 解码器和查询引擎最终可能会紧密耦合,阻止重复使用(例如,在 Cloudera Parquet Predicate Pushdown 文档中有不同的 Impala 和 Spark 实现)。Rust Parquet 阅读器使用 RowSelection API 来避免这种耦合。

RowGroup 修剪

许多基于 Parquet 的查询引擎所支持的最简单形式的谓词下推,使用存储在页脚的统计数据来跳过整个 RowGroup。我们称这种操作为 RowGroup 修剪,它类似于许多经典的数据仓库系统中的分区修剪

对于上面的查询例子,如果某个特定的 RowGroup 中 A 的最大值小于 35,解码器可以跳过从整个 RowGroup 中获取和解码任何 ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃Row Group 1 Metadata                      ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "A" Metadata    Min:0 Max:15   ┃◀╋ ┐
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       Using the min
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ │     and max values
┃ ┃Column "B" Metadata                   ┃ ┃       from the
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ │     metadata,
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       RowGroup 1  can
┃ ┃Column "C" Metadata                   ┃ ┃ ├ ─ ─ be entirely
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       skipped
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │     (pruned) when
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓       searching for
┃Row Group 2 Metadata                      ┃ │     rows with A >
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       35,
┃ ┃Column "A" Metadata   Min:10 Max:50   ┃◀╋ ┘
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "B" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "C" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

请注意,对最小值和最大值的修剪对许多数据布局和列类型是有效的,但不是所有的。具体来说,它对具有许多不同的伪随机值的列(如标识符或 uuid)并不有效。值得庆幸的是,对于这种用例,Parquet 也支持每一个 ColumnChunk 布隆过滤器。我们正在积极努力,在 Apache Rust 的实现中增加对布隆过滤器的支持。

页面修剪

一个更复杂的谓词下推形式使用页脚元数据中的可选页面索引来排除整个数据页。解码器只对其他列的相应行进行解码,经常跳过整个页面。

由于各种原因,不同 ColumnChunk 中的页面经常包含不同数量的行,这一事实使这种优化变得复杂。虽然页面索引可以从一列中识别出所需的页面,但从一列中修剪一个页面并不能立即排除其他列中的整个页面。

页面修剪的过程如下:

  • 使用谓词与页面索引相结合来确定要跳过的页面
  • 使用偏移索引来确定哪些行的范围对应于未跳过的页面
  • 计算非跳过页的范围的交集,并只对这些行进行解码

最后一点的实现是非常不容易的,特别是对于嵌套的列表来说,一行可能对应多个值。幸运的是,Rust Parquet 阅读器在内部隐藏了这种复杂性,并且可以解码任意的 RowSelection

例如,要扫描存储在 5 个数据页中的 A 列和 B 列,如下所示:

如果谓词是 A > 35:

  • 第1页使用页面索引(最大值为20)进行修剪,留下一个[200->起]的 RowSelection
  • Parquet 阅读器完全跳过第3页(因为它的最后一行索引是99)。
  • (只)通过读取第 2、4、5 页来读取相关的行。

如果谓词是 A > 35 AND B = "F",那么页面索引就更有效了:

  • 使用 A > 35,产生的 RowSelection 为 [200->onwards],如前所述。
  • 使用 B = "F",在 B 的剩余的第4页和第5页,产生一个 [100-244] 的 RowSelection
  • 将这两个 RowSelection 相交,得到一个合并的 RowSelection [200-244]。
  • Parquet 阅读器只对第2页和第4页的这50行进行解码。
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
┃  │  │              │     │  │              │     ┃
┃     │              │  │     │     Page     │  │
   │  │              │     │  │      3       │     ┃
┃     │              │  │     │   min: "A"   │  │  ┃
┃  │  │              │     │  │   max: "C"   │     ┃
┃     │     Page     │  │     │ first_row: 0 │  │
   │  │      1       │     │  │              │     ┃
┃     │   min: 10    │  │     └──────────────┘  │  ┃
┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
┃     │ first_row: 0 │  │     │              │  │
   │  │              │     │  │     Page     │     ┃
┃     │              │  │     │      4       │  │  ┃
┃  │  │              │     │  │   min: "D"   │     ┃
┃     │              │  │     │   max: "G"   │  │
   │  │              │     │  │first_row: 100│     ┃
┃     └──────────────┘  │     │              │  │  ┃
┃  │  ┌──────────────┐     │  │              │     ┃
┃     │              │  │     └──────────────┘  │
   │  │     Page     │     │  ┌──────────────┐     ┃
┃     │      2       │  │     │              │  │  ┃
┃  │  │   min: 30    │     │  │     Page     │     ┃
┃     │   max: 40    │  │     │      5       │  │
   │  │first_row: 200│     │  │   min: "H"   │     ┃
┃     │              │  │     │   max: "Z"   │  │  ┃
┃  │  │              │     │  │first_row: 250│     ┃
┃     └──────────────┘  │     │              │  │
   │                       │  └──────────────┘     ┃
┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃       ColumnChunk            ColumnChunk         ┃
┃            A                      B
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛

PARQUET-1404 中跟踪了从 Arrow C++ 以及扩展的 pyarrow/pandas 读取和写入这些索引的支持。

延迟物化

前面两种形式的谓词下推只在解码值之前对为 RowGroupColumnChunk 和 Data Pages 存储的元数据进行操作。然而,同样的技术也可以扩展到一个或多个列的值,在解码后但在解码其他列之前,这通常被称为"延迟物化"。

这种技术在以下情况下特别有效。

  • 谓词是非常有选择性的,即过滤掉大量的行
  • 每一行都很大,要么是由于宽行(例如 JSON blobs),要么是许多列
  • 被选中的数据是聚在一起的
  • 谓词所需的列是相对便宜的解码,例如 PrimitiveArray/DictionaryArray

SPARK-36527Impala 中对这种技术的好处有更多讨论。

例如,给定上面的谓词 A > 35 AND B = "F",其中引擎使用页面索引来确定在 [100-244] 的 RowSelection 中只有50行可以匹配,使用延迟物化,Parquet 解码器:

  • 对A列的50个值进行解码
  • 对这50个值进行 A > 35 的评估
  • 在这种情况下,只有5行通过,结果是 RowSelection:
  • RowSelection[205-206]
  • RowSelection[238-240]
  • 对于这些选择,只对 B 列的5行进行解码
  Row Index
             ┌────────────────────┐            ┌────────────────────┐
       200   │         30         │            │        "F"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       205   │         37         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       206   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       238   │         36         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       239   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             ├────────────────────┤            ├────────────────────┤
       240   │         40         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
      244    │         26         │            │        "D"         │
             └────────────────────┘            └────────────────────┘


                   Column A                          Column B
                    Values                            Values

在某些情况下,例如我们的例子,B 存储单字符值,延迟物化机械的成本可能超过解码的节省。然而,当上面列出的一些条件得到满足时,节省的费用就会很可观。查询引擎必须决定哪些谓词要下推,以何种顺序应用它们以获得最佳结果。

虽然这不在本文的范围内,但同样的技术可以应用于多个谓词以及多列的谓词。更多信息请参见 Parquet 板块中的 RowFilter 接口,以及 DataFusion 中的 row_filter 实现。

I/O下推

虽然 Parquet 是为在 HDFS 分布式文件系统上的高效访问而设计的,但它与 AWS S3 等商品 blob 存储系统配合得非常好,因为它们具有非常相似的特性。

  • 相对较慢的"随机访问"读取:在每次请求中读取大的(MBs)数据段比为较小的部分发出许多请求要有效得多。
  • 检索第一个字节之前有很大的延迟
  • 每个请求的成本很高。通常按请求收费,而不考虑读取的字节数,这激励了更少的请求,每次读取大的连续的数据部分。

为了从这类系统中获得最佳的读取效果,Parquet 读取器必须:

    1. 尽量减少 I/O 请求的数量,同时应用各种下推技术以避免获取大量未使用的数据。
    1. 与适当的任务调度机制相结合,对获取的数据进行交错的 I/O 和处理,以避免管道瓶颈。

由于这些都是工程和集成方面的重大挑战,许多 Parquet 阅读器仍然需要将文件完整地取到本地存储中。

为了处理这些文件而获取整个文件并不理想,原因有以下几点:

    1. 高延时。在获取整个文件之前不能开始解码(Parquet 元数据在文件的末尾,所以解码器必须在解码其他部分之前看到末尾)。
    1. 浪费的工作。取回整个文件可以获取所有必要的数据,但也可能有很多不必要的数据,这些数据在读取页脚后会被跳过。这不必要地增加了成本。
    1. 需要昂贵的"本地附加"存储(或内存)。许多云环境不提供具有本地附加存储的计算资源–它们要么依赖昂贵的网络块存储,如 AWS EBS,要么就是将本地存储限制在某些类别的虚拟机上。

为了避免缓冲整个文件,需要一个复杂的 Parquet 解码器,它与 I/O 子系统集成,可以在最初获取和解码元数据,然后对相关的数据块进行范围化的获取,与 Parquet 数据的解码交错进行。这种优化需要精心设计,以便从对象存储中获取足够大的数据块,使每个请求的开销不至于支配减少传输字节的收益。SPARK-36529 更详细地描述了顺序处理的挑战。

                       ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
                                                                │
                       │
               Step 1: Fetch                                    │
 Parquet       Parquet metadata
 file on ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▼━━━━━━━┓
 Remote  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
 Object  ┃      ▒▒▒data▒▒▒          ▒▒▒data▒▒▒               ░metadata░ ┃
  Store  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
         ┗━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
                     │                     └ ─ ─ ─
                                                  │
                     │                   Step 2: Fetch only
                      ─ ─ ─ ─ ─ ─ ─ ─ ─ relevant data blocks

这张图中没有包括凝聚请求和确保实际执行所需的最小请求量等细节。

Rust Parquet crate 提供了一个异步 Parquet 阅读器,可以有效地从任何支持范围请求的 AsyncFileReader 中读取数据。

  • 高效地从任何支持范围请求的存储介质中读取数据
  • 与 Rust 的 future 生态系统集成,以避免在网络 I/O 上等待的阻塞线程,并轻松地可以交错使用 CPU 和网络
  • 同时请求多个范围,以允许实现凝聚相邻的范围,并行获取范围,等等。
  • 使用前面描述的下推技术,尽可能地消除获取数据的现象
  • 与Apache Arrow object_store crate 轻松集成,你可以在这里阅读更多信息

为了给人一种可能的感觉,下图显示了从远程文件获取页脚元数据的时间线,使用该元数据来确定要读取哪些数据页,然后同时获取数据和解码。这个过程往往必须同时对多个文件进行,以匹配网络延迟、带宽和可用的 CPU。

                           begin
          metadata        read of   end read
            read  ─ ─ ─ ┐   data    of data          │
 begin    complete         block     block
read of                 │   │        │               │
metadata  ─ ─ ─ ┐                                       At any time, there are
             │          │   │        │               │     multiple network
             │  ▼       ▼   ▼        ▼                  requests outstanding to
  file 1     │ ░░░░░░░░░░   ▒▒▒read▒▒▒   ▒▒▒read▒▒▒  │    hide the individual
             │ ░░░read░░░   ▒▒▒data▒▒▒   ▒▒▒data▒▒▒        request latency
             │ ░metadata░                         ▓▓decode▓▓
             │ ░░░░░░░░░░                         ▓▓▓data▓▓▓
             │                                       │
             │
             │ ░░░░░░░░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒read▒▒▒▒│▒▒▒▒▒▒▒▒▒▒▒▒▒▒
   file 2    │ ░░░read░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒data▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
             │ ░metadata░                            │              ▓▓▓▓▓decode▓▓▓▓▓▓
             │ ░░░░░░░░░░                                           ▓▓▓▓▓▓data▓▓▓▓▓▓▓
             │                                       │
             │
             │                                     ░░│░░░░░░░  ▒▒▒read▒▒▒  ▒▒▒▒read▒▒▒▒▒
   file 3    │                                     ░░░read░░░  ▒▒▒data▒▒▒  ▒▒▒▒data▒▒▒▒▒      ...
             │                                     ░m│tadata░            ▓▓decode▓▓
             │                                     ░░░░░░░░░░            ▓▓▓data▓▓▓
             └───────────────────────────────────────┼──────────────────────────────▶Time


                                                     │

总结

我们希望你喜欢阅读 Parquet 文件格式,以及用于快速查询 Parquet 文件的各种技术。

我们相信,大多数 Parquet 的开源实现之所以不具备本帖所描述的功能的广度,是因为它需要付出巨大的努力,而这在以前只有资金雄厚的商业企业才可能做到,它们的实现是闭源的。

然而,随着 Apache Arrow 社区的发展和质量的提高,无论是 Rust 的从业者还是更广泛的 Arrow 社区,我们能够合作并建立一个尖端的开源实现是令人振奋和无比满意的。本博客中描述的技术是许多工程师的贡献,他们分布在公司、业余爱好者和世界上的几个存储库中,特别是 Apache Arrow DataFusionApache ArrowApache Arrow Ballista

如果你有兴趣加入 DataFusion 社区,请与我们联系