Apache Spark 2.4.0 特性 - bucket pruning
— 焉知非鱼https://www.waitingforcode.com/apache-spark-sql/apache-spark-2.4.0-features-bucket-pruning/read
桶定义 #
均衡的分区让我们更快地处理数据。例如,我们可以收集物联网事件,并把它们按照日期分区并存储在树一样的结构中:
/events/2018/10/29
/events/2018/10/30
/events/2018/10/31
如果我们想按物联网设备号划分相同的数据,我们该怎么办? 从技术上讲,这是可行的,但从概念上讲,它可能不如基于日期的分区有效。 设备密钥是具有非常高的基数(可能的唯一值的数量)的值,我们最终将得到一棵具有数百或数千个子目录的树。 对于不是分区的最佳选择的值问题,解决方案之一是桶存储,也称为群集。
桶是“分区内的分区”。 区别在于桶的数量是固定的。 在大多数情况下,这些值会使用基于哈希的简单策略分配给存储桶。 您可以在下面找到 Apache Spark 中存储桶的示例:
"Spark" should "create buckets in partitions for orders Dataset" in {
val tableName = s"orders${System.currentTimeMillis()}"
val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1")).toDF("order_id", "user_id")
orders.write.mode(SaveMode.Overwrite).bucketBy(2, "user_id").saveAsTable(tableName)
val metadata = TestedSparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
metadata.bucketSpec shouldBe defined
metadata.bucketSpec.get.numBuckets shouldEqual 2
metadata.bucketSpec.get.bucketColumnNames(0) shouldEqual "user_id"
}
在分布式数据处理框架中,这种方法往往有助于避免洗牌阶段。例如,当桶用于2个数据集与 Spark SQL 中排序合并连接时,因为这两个数据集已经可以位于同一个分区中, 因此洗牌可能没有必要。当然,这两个数据集必须具有相同的分区数并使用哈希分区算法。
桶剪枝实现 #
在 Apache Spark 2.4.0 之前,当桶列之一参与查询时,Spark 引擎并没有作出任何优化。毕竟,由于桶存储是确定性的,引擎只能读取桶文件存储筛选值。
该特性作为新的私有方法被加入 FileSourceStrategy
中 ,只有当给定的数据集仅具有1个桶列并且至少具有2个桶时才调用该方法:
private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
在方法中,Apache Spark 通过调用这两个方法之一来判断查询执行中所涉及到的桶:getBucketSetFromIterable
或 getBucketSetFromValue
。他们根据所定义的过滤方法,可以是相等或 “is in” 约束使用。桶 id 编号内置于 BucketingUtils.getBucketIdFromValue(bucketColumn: Attribute, numBuckets: Int, value: Any)
方法中,并返回之前方法的结果。
过滤后的的桶作为位集传递给 FileSourceScanExec
, 使用它们来过滤掉不存储该查询数据的桶文件:
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
f => bucketSet.get(f._1)
}
} else {
filesGroupedToBuckets
}
val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil))
}
桶剪枝例子 #
为了在实战中看到优化,我们将使用与这篇文章中的第一节相同的例子,即订单表:
"Spark 2.4.0" should "not read buckets filtered out" in {
val tableName = s"orders${System.currentTimeMillis()}"
val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1"), (5L, "user4"), (6L, "user5"))
.toDF("order_id", "user_id")
orders.write.mode(SaveMode.Overwrite).bucketBy(3, "user_id").saveAsTable(tableName)
val filteredBuckets = TestedSparkSession.sql(s"SELECT * FROM ${tableName} WHERE user_id = 'user1'")
filteredBuckets.queryExecution.executedPlan.toString() should include("SelectedBucketsCount: 1 out of 3")
}
正如你看到的,这些断言检查物理计划中是否包含“SelectedBucketsCount”文本。它在释放的 2.4.0 版本中加入,以指示桶修剪功能。
桶剪枝只是 Apache Spark 2.4.0 的新功能之一。它有助于仅处理带有过滤条目的桶,并且因此减少处理的分区的数目。实现由传递一个持有的所有可处理桶ID以负责扫描文件集合的操作的位集组成。