Wait the light to fall

Apache Spark 2.4.0 特性 - bucket pruning

焉知非鱼

https://www.waitingforcode.com/apache-spark-sql/apache-spark-2.4.0-features-bucket-pruning/read

桶定义 #

均衡的分区让我们更快地处理数据。例如,我们可以收集物联网事件,并把它们按照日期分区并存储在树一样的结构中:

/events/2018/10/29
/events/2018/10/30
/events/2018/10/31

如果我们想按物联网设备号划分相同的数据,我们该怎么办? 从技术上讲,这是可行的,但从概念上讲,它可能不如基于日期的分区有效。 设备密钥是具有非常高的基数(可能的唯一值的数量)的值,我们最终将得到一棵具有数百或数千个子目录的树。 对于不是分区的最佳选择的值问题,解决方案之一是桶存储,也称为群集。

桶是“分区内的分区”。 区别在于桶的数量是固定的。 在大多数情况下,这些值会使用基于哈希的简单策略分配给存储桶。 您可以在下面找到 Apache Spark 中存储桶的示例:

"Spark" should "create buckets in partitions for orders Dataset" in {
  val tableName = s"orders${System.currentTimeMillis()}"
  val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1")).toDF("order_id", "user_id")
 
  orders.write.mode(SaveMode.Overwrite).bucketBy(2, "user_id").saveAsTable(tableName)
 
  val metadata = TestedSparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
  metadata.bucketSpec shouldBe defined
  metadata.bucketSpec.get.numBuckets shouldEqual 2
  metadata.bucketSpec.get.bucketColumnNames(0) shouldEqual "user_id"
}

在分布式数据处理框架中,这种方法往往有助于避免洗牌阶段。例如,当桶用于2个数据集与 Spark SQL 中排序合并连接时,因为这两个数据集已经可以位于同一个分区中, 因此洗牌可能没有必要。当然,这两个数据集必须具有相同的分区数并使用哈希分区算法。

桶剪枝实现 #

在 Apache Spark 2.4.0 之前,当桶列之一参与查询时,Spark 引擎并没有作出任何优化。毕竟,由于桶存储是确定性的,引擎只能读取桶文件存储筛选值。

该特性作为新的私有方法被加入 FileSourceStrategy 中 ,只有当给定的数据集仅具有1个桶列并且至少具有2个桶时才调用该方法:

private def genBucketSet(
    normalizedFilters: Seq[Expression],
    bucketSpec: BucketSpec): Option[BitSet] = {

在方法中,Apache Spark 通过调用这两个方法之一来判断查询执行中所涉及到的桶:getBucketSetFromIterablegetBucketSetFromValue。他们根据所定义的过滤方法,可以是相等或 “is in” 约束使用。桶 id 编号内置于 BucketingUtils.getBucketIdFromValue(bucketColumn: Attribute, numBuckets: Int, value: Any) 方法中,并返回之前方法的结果。

过滤后的的桶作为位集传递给 FileSourceScanExec, 使用它们来过滤掉不存储该查询数据的桶文件:

val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
      val bucketSet = optionalBucketSet.get
      filesGroupedToBuckets.filter {
        f => bucketSet.get(f._1)
      }
    } else {
      filesGroupedToBuckets
    }
    val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
      FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil))
    }

桶剪枝例子 #

为了在实战中看到优化,我们将使用与这篇文章中的第一节相同的例子,即订单表:

"Spark 2.4.0" should "not read buckets filtered out" in {
  val tableName = s"orders${System.currentTimeMillis()}"
  val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1"), (5L, "user4"), (6L, "user5"))
    .toDF("order_id", "user_id")
 
  orders.write.mode(SaveMode.Overwrite).bucketBy(3, "user_id").saveAsTable(tableName)
 
  val filteredBuckets = TestedSparkSession.sql(s"SELECT * FROM ${tableName} WHERE user_id = 'user1'")
 
  filteredBuckets.queryExecution.executedPlan.toString() should include("SelectedBucketsCount: 1 out of 3")
}

正如你看到的,这些断言检查物理计划中是否包含“SelectedBucketsCount”文本。它在释放的 2.4.0 版本中加入,以指示桶修剪功能。

桶剪枝只是 Apache Spark 2.4.0 的新功能之一。它有助于仅处理带有过滤条目的桶,并且因此减少处理的分区的数目。实现由传递一个持有的所有可处理桶ID以负责扫描文件集合的操作的位集组成。