Wait the light to fall

探索 Apache Spark 中的状态流

焉知非鱼

更新(2017.08.01): Spark v2.2 最近推出了一个叫做 mapGroupsWithState 的有状态流的新抽象, 我最近有一篇博客也是关于它的。我强烈建议你检查一下。

介绍 #

Apache Spark 由几个模块组成,每个模块都有不同的用途。 它的功能强大的模块之一是 Streaming API,它使开发人员能够在称为 Discretized Stream 或 DStream 的抽象下使用连续流(或准确地说是微批次)。

在这篇文章中,我将深入探讨 Spark Streaming 的一个特殊属性,它是有状态的 Streaming API。 有状态流使我们能够维护微批之间的状态,从而使我们能够形成数据的会话化。

免责声明-为了遵循本文的流程,应该对 Spark 的工作原理有基本的了解,并对 DStream 抽象具有一般的了解。 如果没有,请继续阅读,不用担心,我会等你…

欢迎回来! 让我们继续。

通过例子理解 #

为了了解如何使用 API​​,让我们创建一个简单的传入数据示例,该示例要求我们进行会话化。我们的输入数据流将是 UserEvent 类型:

case class UserEvent(id: Int, data: String, isLast: Boolean)

每个事件描述一个唯一的用户。我们通过用户 ID 标识用户,并用 String 表示发生的事件的内容。我们还想知道用户何时结束会话,因此我们提供了一个 isLast 标志来指示会话结束。

我们负责汇总所有用户事件的状态将是 UserSession 类型的状态:

case class UserSession(userEvents: Seq[UserEvent])

其中包含特定用户发生的事件序列。在此示例中,我们假设数据源是来自于 Kafka 使用的 JSON 编码数据流。

我们的 Id 属性将用作键,而 UserEvent 将是我们的值。两个放在一块,我们得到一个 DStream[(Int, UserEvent)]

在我们开始之前,有两个重要的关键点:

1.检查点是有状态流的初步选择

来自于 Spark 文档:

流式应用程序必须24/7全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为此,Spark Streaming 需要将足够的信息检查点指向容错存储系统,以便可以从故障中恢复。

Spark 的检查点机制是在我们 spark 工作的整个生命周期中保证容错能力的框架方法。当我们以24/7全天候运行时,某些故障可能无法直接控制,例如网络故障或数据中心崩溃。为了保证采用一种干净的恢复方式,Spark 可以在选择的每个时间间隔内将我们的数据检查到一个永久数据存储,例如 Amazon S3,HDFS 或 Azure Blob 存储(如果我们这样做的话)。

检查点是任何无状态转换的功能,但是必须为有状态流提供检查点目录,否则您的应用程序将无法启动。

提供检查点目录就像使用目录位置调用 StreamingContext 一样容易:

val sparkContext = new SparkContext()
val ssc = new StreamingContext(sparkContext, Duration(4000))
ssc.checkpoint("path/to/persistent/storage")

需要注意的重要一件事是,只有在你未修改现有代码的情况下,检查点数据才可用,并且主要适合于从作业失败中恢复。修改代码后(即将新版本上传到 Spark 集群),检查点数据不再兼容,必须删除这些数据后才能开始工作。

  1. DStream 中的键值对

一个常见的错误是想知道为什么在使用 DStream 时为什么看不到有状态的转换方法(我们将很快看到的 updateStateByKeymapWithState)。有状态转换要求我们对 DStream 进行操作,该 DStream 封装了一个键值对,格式为 DStream[(K,V)],其中 K 是键的类型,V 是值的类型。通过使用此类流,Spark 可以根据键对数据进行随机排序,因此给定键的所有数据都可以在同一工作节点上使用,并允许您进行有意义的聚合。

好的,我们准备好了。我们去写一些代码。

简要回顾过去 #

在 Spark 1.6.0 之前,唯一可用的状态转换是 PairDStreamFunctions.updateStateByKey

最简单的形式(我们将要介绍)的签名如下所示:

def updateStateByKey[S](updateFunc: (Seq[V], Option[S])  Option[S])

updateStateByKey 需要一个接受以下参数的函数:

  1. Seq[V]-当前批次中给定键收到的新值的列表。
  2. Option[S]-我们在每次迭代中更新的状态。

对于我们的工作的第一次调用,状态将为 None,表示它是给定键的第一批。之后,完全由我们来管理其值。完成给定键的特定状态后,我们需要返回 None 来指示 Spark 我们不再需要该状态。

对于我们的场景而言,一个简单的实现看起来像这样:

def updateUserEvents(newEvents: Seq[UserEvent],
                    state: Option[UserSession]): Option[UserSession] = {
  /*
  Append the new events to the state. If this the first time we're invoked for the key
  we fallback to creating a new UserSession with the new events.
  */
  val newState = state
    .map(prev => UserSession(prev.userEvents ++ newEvents))
    .orElse(Some(UserSession(newEvents)))

/*
If we received the `isLast` event in the current batch, save the session to the underlying store and return None to delete the state.
Otherwise, return the accumulated state so we can keep updating it in the next batch.
*/
  if (newEvents.exists(_.isLast)) {
    saveUserSession(state)
    None
  } else newState
}

在每个批次中,我们都希望获取给定用户的状态,并将旧事件和新事件都合并到新的 Option[UserSession] 中。然后,我们要检查是否已到达此用户会话的末尾,因此我们在任何 UserEvent 上检查 isLast 标志的新到达序列。如果收到最后一条消息,则将用户操作保存到一些持久性存储中,然后返回 None 表示我们已完成。如果尚未收到结束消息,则只需返回新创建的状态即可进行下一次迭代。

我们的Spark DAG(有向无环图)如下所示:

val kafkaStream =
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

kafkaStream
  .map(deserializeUserEvent)
  .updateStateByKey(updateUserEvents)

第一个 map 用于将 JSON 解析为 (Int, UserEvent) 的元组,其中 IntUserEvent.id。然后,将元组传递给我们的 updateStateByKey 来完成其余的工作。

updateStateByKey 的缺点

  1. 使用 updateStateByKey 的主要缺点是,对于每个新的传入批处理,转换都会迭代整个状态存储,而不管给定键的新值是否已被消耗。这可能会影响性能,尤其是随着时间处理大量状态时。有多种技术可以提高性能,但这仍然是一个难题。

  2. 没有内置的超时机制-请考虑一下我们的示例中的情况,即如果表示用户会话结束的事件丢失或由于某种原因没有到达。 updateStateByKey 迭代所有键这一事实的一个好处是,我们可以自己实现这样的超时,但这绝对是框架该做的功能。

  3. 你收到的就是返回的内容-由于 updateStateByKey 的返回值与我们存储的状态相同。对于我们的 Option[UserSession],我们被迫将其返回到下游。但是,如果状态完成后,我想输出一个不同的类型并在另一个转换中使用它,会发生什么?目前,这是不可能的。

引入 mapWithState #

mapWithState 是在 Spark 1.6.0 中作为实验性 API 发布的 updateStateByKeys 的后继产品。这是从在 Spark 中使用状态流进行的过程中汲取的教训,并带来了新的有前途的产品。

mapWithState 带有 updateStateByKey 中缺少的功能:

  1. 内置的超时机制-我们可以告诉 mapWithState 我们想要保持状态的时间,以防万一没有新数据出现。一旦达到超时,最后一次调用 mapWithState 时会带有一个特殊标志(稍后将会看到)。

  2. 部分更新-仅迭代在当前批次中具有新数据的键。这意味着不再需要在每个批处理间隔内迭代整个状态存储,这是一个很好的性能优化。

  3. 选择你的返回类型-现在,无论我们的状态对象是哪种类型,我们都可以选择所需的返回类型。

  4. 初始状态-我们可以选择一个自定义的 RDD 来在启动时初始化我们的状态转换。

让我们看一下构成新 API 的不同部分。

mapWithState 的签名:

mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType])

updateStateByKey 要求我们传递带有消息序列的函数和状态(以 Option[S] 形式)相反,现在我们需要传递 StateSpec

表示对 DStream(Scala)或 JavaPairDStream(Java)的 DStream 转换 mapWithState 操作的所有规范的抽象类。 使用 StateSpec.apply()StateSpec.create() 创建此类的实例。

Scala中的示例:

// A mapping function that maintains an integer state and return a String
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
  // Use state.exists(), state.get(), state.update() and state.remove()
  // to manage state, and return the necessary string
}

val spec = StateSpec.function(mappingFunction)

有趣的是 StateSpec.function,这是用于创建 StateSpec 的工厂方法。 它需要具有以下签名的功能:

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType

mappingFunction 需要几个参数。 让我们构造它们以匹配我们的示例:

  1. KeyType-显然是键类型 Int
  2. Option[ValueType]-传入数据类型,Option[UserEvent]
  3. State[StateType]-迭代之间要保留的状态,State[UserSession]
  4. MappedType-我们的返回类型,可以是任何类型。 对于我们的示例,我们将传递一个 Option[UserSession]

mapWithState 和 updateStateByKey 之间的区别

  1. 我们键的值,以前没有暴露出来。
  2. Option[S] 的形式传入的新值,以前是 Seq[S]
  3. 现在,我们的状态被封装在 State[StateType] 类型的对象中。
  4. 我们可以从转换中返回想要的任何类型,而不再局限于我们所持有的状态的类型。

(存在一个更高级的 API,我们还可以收到一个 Time 对象,但这里不再赘述。请在此处随意检查不同的重载)。

使用 State 对象探索状态管理 #

以前,管理我们的状态意味着要使用 Option[S]。为了更新状态,我们将创建一个新实例,并从转换中返回该实例。当我们想要删除状态时,我们将返回 None。由于我们现在可以自由地从 mapWithState 返回任何类型,因此我们需要一种与 Spark 交互的方式,以表达我们希望在每次迭代中对状态进行的处理。为此,我们有 State[S] 对象。

该对象公开了几种方法:

  • def exist(): Boolean-检查状态是否存在。
  • def get(): S-获取状态(如果存在),否则将抛出 java.util.NoSuchElementException。 (我们需要注意这一点!)
  • def isTimingOut(): Boolean-当前批处理后,状态是否超时并且将被系统删除。
  • def remove(): Unit-删除状态(如果存在)。
  • def update(newState: S): Unit-用新值更新状态。
  • def getOption(): Option[S]-获取状态为 scala.Option 类型。

我们很快就会看到。

更改我们的代码以符合新的 API #

让我们重建以前的 updateUserEvents 以符合新的 API。现在,我们的新方法签名如下所示:

def updateUserEvents(key: Int, value: Option[UserEvent], state: State[UserSession]): Option[UserSession]

现在,我们不再接收 Seq[UserEvent],而是分别接收每个事件。

让我们继续进行更改:

def updateUserEvents(key: Int,
                     value: Option[UserEvent],
                     state: State[UserSession]): Option[UserSession] = {
  /*
  Get existing user events, or if this is our first iteration
  create an empty sequence of events.
  */
  val existingEvents: Seq[UserEvent] =
    state
      .getOption()
      .map(_.userEvents)
      .getOrElse(Seq[UserEvent]())

  /*
  Extract the new incoming value, appending the new event with the old
  sequence of events.
  */
  val updatedUserSession: UserSession =
    value
      .map(newEvent => UserSession(newEvent +: existingEvents))
      .getOrElse(UserSession(existingEvents))

/*
Look for the end event. If found, return the final `UserSession`,
If not, update the internal state and return `None`
*/      
  updatedUserSession.userEvents.find(_.isLast) match {
    case Some(_) =>
      state.remove()
      Some(updatedUserSession)
    case None =>
      state.update(updatedUserSession)
      None
  }
}

对于 mapWithState 的每次迭代:

  • 如果这是我们的第一次迭代,则状态将为空。我们需要创建它并追加新事件。如果不是,那么事件已经存在,请从 State[UserSession] 中提取它们,然后将新事件与旧事件一起添加。
  • 查找 isLast 事件标志。如果存在,请删除 UserSession 状态并返回 Option[UserSession]。否则,更新状态并返回 None

返回 Option[UserSession] 转换的选择取决于我们。我们可以选择返回 Unit 并从 mapWithState 发送完整的 UserSession,就像使用 updateStateByKey 一样。但是,我更喜欢我们可以将 UserSession 传递给另一个转换,以根据需要执行更多工作。

我们的新Spark DAG现在看起来像这样:

val stateSpec = StateSpec.function(updateUserEvents _)

kafkaStream
  .map(deserializeUserEvent)
  .mapWithState(stateSpec)

但是,还需要添加一件事。 由于我们没有将 UserSession 保存在转换中,因此我们需要添加其他转换以将其存储在持久性存储中。 为此,我们可以使用 foreachRDD

kafkaStream
  .map(deserializeUserEvent)
  .mapWithState(stateSpec)
  .foreachRDD { rdd =>
    if (!rdd.isEmpty()) {
      rdd.foreach(maybeUserSession => maybeUserSession.foreach(saveUserSession))
    }
  }

(如果您不想为 RDD 中的每个值打开与基础持久性存储的连接,那么请考虑使用 rdd.foreachPartition 而不是 rdd.foreach(但这超出了本文的范围)

超时结束 #

实际上,当处理大量数据时,我们必须保护自己免受数据丢失的影响。在当前的实现中,如果 isLast 甚至都没有显示,我们最终将使用户操作“停留”在该状态。

添加超时很简单:

  • 构造 StateSpec 时添加超时。
  • 处理有状态转换中的超时。

第一步很容易达到:

import org.apache.spark.streaming._
val stateSpec =
  StateSpec
    .function(updateUserEvents _)
    .timeout(Minutes(5))

(Minutes 是 Scala 的 Duration 类的 Spark 包装器类。)

对于我们的 updateUserEvents,我们需要监视 State[S].isTimingOut 标志以了解我们正在超时。关于超时,我想提两件事:

  • 重要的是要注意,一旦发生超时,我们的 value 参数将为 None(解释为什么我们会收到 Option[S] 而不是 S 来获得值。有关更多信息,请参见此处)。
  • 如果由于超时而调用 mapWithState,则我们不能调用 state.remove(),它将由框架代表我们完成。从 State.remove 的文档中:

如果状态已被删除(即 remove() 已经被调用)或由于超时而将被删除(即 isTimingOut() 为 true),则无法更新状态。

让我们修改代码:

def updateUserEvents(key: Int,
                     value: Option[UserEvent],
                     state: State[UserSession]): Option[UserSession] = {
  def updateUserSession(newEvent: UserEvent): Option[UserSession] = {
    val existingEvents: Seq[UserEvent] =
      state
        .getOption()
        .map(_.userEvents)
        .getOrElse(Seq[UserEvent]())

    val updatedUserSession = UserSession(newEvent +: existingEvents)

    updatedUserSession.userEvents.find(_.isLast) match {
      case Some(_) =>
        state.remove()
        Some(updatedUserSession)
      case None =>
        state.update(updatedUserSession)
        None
    }
  }

  value match {
    case Some(newEvent) => updateUserSession(newEvent)
    case _ if state.isTimingOut() => state.getOption()
  }
}

我已将用户操作的更新提取到本地方法 updateUserSession 中,如果由于新的传入值而调用该方法,则将调用该方法。否则,我们将超时,需要返回到目前为止累积的用户事件。

包装起来 #

希望我能传达出 Spark 有状态流的一般用法。有状态流,尤其是新的 mapWithState 转换为希望使用 Spark 处理有状态数据的最终用户带来了强大的功能,同时享受了 Spark 带来的弹性,分布和容错能力的保证。

在即将发布的 Spark 2.0.0 发行版中,还有更多改进,例如状态版本控制,这将使我们能够标记累积的数据,并且仅保留所存储状态的一个子集。如果您对更多内容感兴趣,请参阅“用于流聚合的状态存储”建议。

如果您想玩转,可以在 GitHub 上找到包含代码的完整工作库。

此外, DataBricks 帖子中对 updateStateByKeymapWithState 性能特征进行了很好的比较。