Wait the light to fall

为什么你可能会误用 Spark Streaming API

焉知非鱼

免责声明:是的,我知道这个话题有些争议,而且我知道大多数信息都在 Spark 文档中针对其 Streaming API 进行了介绍,但是当我看到这个错误发生了很多遍之后,我感到写这篇博客的冲动很强烈。

我经常会看到来自 Spark Streaming 的新手在 StackOverflow 上提问,大致如下所示:

问题:“我正在尝试执行 XYZ,但无法正常工作,该怎么办? 这是我的代码:”

val sparkContext = new SparkContext("MyApp")
val streamingContext = new StreamingContext(sparkContext, Seconds(4))

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream.foreachRDD { rdd => 
  // Process RDD Here
}

嗯,好吧,这是怎么了? #

当我开始学习 Spark 时,我的第一个着陆点是有关 RDD(弹性分布式数据集)如何工作的解释。通常的例子是单词统计,其中所有操作都在 RDD 上执行。我认为可以肯定的是,这是许多其他学习 Spark 的人的切入点(尽管如今 DataFrame\Sets 已成为初学者的首选方法)。

当人们飞跃使用 Spark Streaming 时,可能有点不清楚 DStream 的附加抽象意味着什么。这导致许多人寻求他们可以掌握的东西,他们遇到的最熟悉的方法是 foreachRDD,该方法将 RDD 作为输入并产生 Unit(典型的副作用方法的结果)。然后,他们可以再次在他们已经感到满意和理解的 RDD 级别上工作。这完全遗漏了 DStreams 的要点,这就是为什么我想简要了解一下我们可以在 DStream 本身上做些什么而无需探究底层 RDD 的原因。

进入 DStream #

DStream 是微批次上的 Spark 抽象。它使用流媒体资源,例如网络套接字,Kafka 或 Kinesis(等等)为我们提供了连续的数据流,我们在分配给 StreamingContext 的每个批处理间隔读取该数据流。

为了使用 DStream API,我们必须了解抽象的工作原理。 DStream 基本上是 RDD 的序列。在给定的批处理间隔下,将消耗一个 RDD,并将其传递给我们提供给 DStream 的所有转换。当我们这样做时:

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream
 .flatMap(_.split(" "))
 .filter(_ == "")
 .map((word, 1L))
 .count

这意味着我们也将 flatMapfiltermap 和计数应用到基础 RDD 本身上! DStream 上的这些转换至少和 RDD 一样多,这些是我们应在 Streaming 应用程序中使用的转换。 DStreams 的转换下的 Spark Streaming 文档页面上有所有操作的完整列表。

对键值对执行更多操作 #

类似于 PairRDDFunctions(在RDD内对(隐式)对上的转换)进行转换,我们具有许多此类方法的对等 PairDStreamFunctions,主要是:

  • CombineByKey-使用自定义函数合并 DStream RDD 中每个键的元素。
  • groupByKey-通过在每个 RDD 上应用 groupByKey 来返回新的 DStream。
  • mapValues-通过将 map 函数应用于 “this” DStream 中每个键值对的值而返回新的 DStream,而无需更改键。
  • mapWithState-通过将函数应用于此流的每个键值元素,同时为每个唯一键维护一些状态数据,来返回 MapWithStateDStream。
  • reduceByKey-通过对每个 RDD 应用 reduceByKey 返回一个新的 DStream。使用提供的 reduce 函数合并每个键的值。 org.apache.spark.Partitioner 用于控制每个 RDD 的分区。

还有更多让您享受和利用。

棒极了!那么,为什么我还需要 foreachRDD? #

与 RDD 相似,当 Spark 构建其执行图时,我们会区分常规转换和输出转换。前者在构建图时会懒惰地进行计算,而后者在图的实现过程中会发挥作用。如果我们的 DStream 图仅应用了常规转换,则在运行时会得到一个异常,说明未定义输出转换。

当我们完成数据集的提取和转换后,foreachRDD 非常有用,现在我们希望将其加载到外部源。假设我想将转换后的消息发送到 RabbitMQ 作为流的一部分,我将迭代底层的 RDD 分区并发送每条消息:

transformedDataStream.
  foreachRDD { rdd: RDD[String] =>
    val rabbitClient = new RabbitMQClient()
    rdd.foreachPartition { partition: Iterator[String] =>
      partition.foreach(msg => rabbitClient.send(msg))
    }
  }

在对它执行所有转换逻辑后,transformedDataStream 是一个任意 DStream。所有这些转换的结果为 DStream[String]。在 foreachRDD 中,我们得到一个 RDD[String],然后在其中迭代每个分区,从而创建 RabbitMQClient 来在分区迭代器中发送每个消息。

Spark Streaming 文档页面上列出了更多这些输出转换,它们非常有用。

包装起来 #

Spark Streamings DStream 抽象为以流方式处理数据提供了强大的转换。当我们在 Spark 中进行流处理时,我们正在处理许多单个的微批量 RDD,这些 RDD 我们可以在我们的系统中不断地推理。当我们在 DStream 上应用转换时,它一直渗透到所传递的每个 RDD,而无需我们自己对其应用转换。最后,当我们要获取转换后的数据并对它执行一些副作用操作时,应保留使用 foreachRDD,主要是通过有线将数据发送到数据库,pub-sub 等。明智地使用它,只有在您确实需要时才使用!