Wait the light to fall

Apache Spark 2.4.0 特性 - foreachBatch

焉知非鱼

当我第一次听说 foreachBatch 功能时,我以为这是结构化流模块中 foreachPartition 的实现。但是,经过一些分析,我发现我错了,因为此新功能解决了其他但也很重要的问题。您会发现更多。

在 Apache Spark 2.4.0 功能系列的这一新文章中,我将展示 foreachBatch 方法的实现。在第一部分中,我将简要介绍有关此功能的要点。我还将在其中添加有关实现的一些详细信息。在接下来的2部分中,我将展示.foreachBatch 数据接收器解决的问题。

定义 #

在 2.4.0 发行版之前,foreach 是我们可以放置一些自定义逻辑的单一接收器。它很容易使用,因为它看起来像包装在类中的 foreach 循环。另外,foreach 接收器非常适合连续执行,因为我们将重点放在每次一行所带来的信息上。但是,由于基于微批处理的管道的适应性通常更差一些,因为我们经常需要对整个累积的微批处理进行某些处理。

2.4.0 版本使用新的 org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink 接收器解决了微批处理的这些问题。它的主要思想很简单。引擎累积在给定的微批次中处理的数据,并将其作为数据集传递到接收器。这不仅意味着您可以对整个数据应用一种逻辑,而且还可以在流传输管道中执行一些纯批处理,例如将数据写入不可流式的数据存储中。

除了具体的数据集之外,foreachBatch 消费者方法还接受一个名为 batchId 的属性。此参数包含生成数据集的微批处理的 ID。您可以使用此属性实现一次传递语义,因为默认情况下,引擎以至少一次语义运行。

最后,由于 ForeachBatchSink 解决了微批次问题,因此您不能将其与连续触发器一起使用。在 DataStreamWriter 内部进行的简单检查显示:

} else if (source == "foreachBatch") {
  assertNotPartitioned("foreachBatch")
  if (trigger.isInstanceOf[ContinuousTrigger]) {
    throw new AnalysisException("'foreachBatch' is not supported with continuous trigger")
  }

您还可以看到接收器不支持分区管道(.partitionBy(...))。 如果您对引入此逻辑的任务感兴趣,可以在“另请阅读”部分中找到一个链接。

用例:不流式接收器 #

我已经在上一节中提到了 foreachBatch 的第一个用例。 当您要将处理后的数据保存到关系数据库或键值存储之类的不可流式接收器中时,此新接收器很有用。 为了简单起见,我将使用内存中单例键值存储:

"foreachBatch" should "save the data into a key-value memory store" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  inputStream.addData(1, 2, 3, 4)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1, 2, 3, 4)
        Thread.sleep(1000L)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(2000L))
    .foreachBatch((dataset, batchId) => {
      dataset.foreachPartition(rows => {
        rows.foreach(row => {
          InMemoryKeyedStore.addValue(s"batch_${batchId}_${row.getAs[Int]("number")}", "")
        })
      })
    })
    .start()
 
  query.awaitTermination(20000L)
 
  // According to the defined timeout, we should have at least 10 processed batches
  val distinctKeys = InMemoryKeyedStore.allValues.keys.map(key => key.dropRight(2)).toSeq.distinct
  distinctKeys should contain atLeastOneElementOf(Seq("batch_0", "batch_1", "batch_2", "batch_3", "batch_4", "batch_5",
  "batch_6", "batch_7", "batch_8", "batch_9"))
}

当然,您应该注意传递语义。 如果您的接收器是基于键的接收器,并且您的处理始终生成相同的数据,那么您是安全的。 即使您需要重新处理记录,也总是会在存储中得到相同的结果。 如果不是这种情况,则应在写入数据之前添加一个额外的步骤来对数据删除重复数据。

用例:来自单个源的多个接收器(又名副输出) #

第二个用例与 Apache Beam 中的副输出非常相似,因为 foreachBatch 提供了将微批处理写入不同位置的可能性。 您只必须记住将批处理数据集缓存在接收器中。 否则,您可能会完全重新计算它。 下一个代码段提供了使用Apache Spark foreachBatch 接收器的副输出实现的示例:

"foreachBatch" should "generate 2 outputs" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  inputStream.addData(1, 2, 3, 4)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1, 2, 3, 4)
        Thread.sleep(1000L)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(2000L))
    .foreachBatch((dataset, batchId) => {
      dataset.persist()
      dataset.write.mode(SaveMode.Overwrite).json(s"/tmp/spark/side-output/json/batch_${batchId}")
      dataset.write.mode(SaveMode.Overwrite).parquet(s"/tmp/spark/side-output/parquet/batch_${batchId}")
      dataset.unpersist()
    })
    .start()
 
  query.awaitTermination(20000L)
 
  def jsonBatch(batchNr: Int) = s"/tmp/spark/side-output/json/batch_${batchNr}"
  val jsonFiles = FileUtils.getFile(new File("/tmp/spark/side-output/json")).listFiles().toSeq.map(file => file.getAbsolutePath)
  jsonFiles should contain allElementsOf (Seq(jsonBatch(0), jsonBatch(1), jsonBatch(2), jsonBatch(3),
    jsonBatch(4), jsonBatch(5), jsonBatch(6), jsonBatch(7), jsonBatch(8)))
  def parquetBatch(batchNr: Int) = s"/tmp/spark/side-output/parquet/batch_${batchNr}"
  val parquetFiles = FileUtils.getFile(new File("/tmp/spark/side-output/parquet")).listFiles.toSeq.map(file => file.getAbsolutePath)
  parquetFiles should contain allElementsOf(Seq(parquetBatch(0), parquetBatch(1), parquetBatch(2), parquetBatch(3),
    parquetBatch(4), parquetBatch(5), parquetBatch(6), parquetBatch(7), parquetBatch(8)))
}

该代码看起来与上一节中的代码相似。 它还会将数据写入不可流式接收器。 区别在于,使用相同的输出将数据以2种不同的格式写入2个不同的位置。 如果您想为多个工作流准备数据,这将很有用。 就我而言,我为事务和 Parquet 生成了 JSON 文件,用于分析目的。

根据官方文档,重要的是缓存和取消持久化批处理数据集,以确保引擎不会每次都重新计算它。

foreachBatch 接收器是结构化流模块中的缺失部分。 2.4.0 版本中添加的此功能是流和批处理世界之间的桥梁。 如本文所示,它有助于将流数据集成到管道的批处理部分中。 现在,Apache Spark 不再需要手动创建“批处理”,而是为我们完成了它,并在 foreachBatch 接收器中公开了对整个 Dataset API 的访问。