Wait the light to fall

流到流的状态管理

焉知非鱼

上周,我们在 Apache Spark 结构化流中发现了 2 种流对流连接类型。如这些帖子所述,有时可能会省略状态管理逻辑(对于内连接),但通常建议减少内存压力。 Apache Spark 提出了 3 种不同的状态管理策略,这些策略将在以下各节中详细介绍。

这篇文章分为 4 部分。第一个回顾了流对流连接情况下的状态特异性。接下来的 3 个讨论 3 种状态管理策略。

状态和流连接 #

Apache Spark 结构化流中的外连接这篇文章所述,每个潜在的可连接行都缓存在状态存储中。每当找到匹配的行时,都会进行连接并发射结果。内连接类型就是这种情况。对于外连接,逻辑略有不同,由于匹配的行或由于过期状态而发出结果。过期状态表示我们不希望收到给定条目的匹配事件的时刻。希望此行为也适用于内连接,但不同之处在于其可选特性。

没有这种“过期状态”的概念,引擎将使行无限期地匹配,并且由于数据源是无界的,因此不可避免地迟早会失败。因此,Apache Spark 提供了 3 种不同的策略来管理状态过期(水位)。

状态键水位 #

这些策略中的第一个称为状态键水位。在以下情况下将其应用于查询:

  • 在至少一个连接流中之一中定义了一个水位列-它可以是时间戳列或窗口列。如果仅在一侧定义水位,则 Apache Spark 能够从该水位推导另一侧的水位。
  • 水位列在 JOIN 子句中用作相等约束

此策略的名称来自直接在 JOIN 子句条件中使用水位-因此是状态键。为了说明这一点,我们可以用下面的代码片段举几个例子:

"state key watermark" should "be built from watermark used in join" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "2 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
    .withWatermark("joinedEventTimeWatermark", "2 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("mainEventTimeWatermark") === joinedEventsDataset("joinedEventTimeWatermark"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 10000L // 10 sec
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970) 
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      // We send keys: 2, 3, 4, 5, 7  in late to see watermark applied
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
      while (query.isActive) {
        //println(s"Sending key=${key} (${new Timestamp(startingTime)}) for watermark ${query.lastProgress.eventTime.get("watermark")}")
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  groupedByKeys.keys should not contain allOf("key2", "key3", "key4", "key5", "key6", "key7")
  // Check some initial keys that should be sent after the first generated watermark
  groupedByKeys.keys should contain allOf("key8", "key9", "key10") 
}

对于仅在一侧定义水位的情况,结果应与上述相同:

"state key watermark" should "be built from watermark used in one side of join" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "2 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("mainEventTimeWatermark") === joinedEventsDataset("joinedEventTimeWatermark"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 10000L // 10 sec
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      // We send keys: 2, 3, 4, 5, 7  in late to see watermark applied
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
      while (query.isActive) {
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  groupedByKeys.keys should not contain allOf("key2", "key3", "key4", "key5", "key6", "key7")
  // Check some initial keys that should be sent after the first generated watermark
  groupedByKeys.keys should contain allOf("key8", "key9", "key10") 
}

状态键水位也适用于窗口式水印列:

"state key watermark" should "be built from watermark used in join window" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark",
    window($"mainEventTimeWatermark", "5 seconds").as("watermarkWindow")).withWatermark("watermarkWindow", "5 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark",
    window($"joinedEventTimeWatermark", "5 seconds").as("watermarkWindow")).withWatermark("watermarkWindow", "5 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("watermarkWindow") === joinedEventsDataset("watermarkWindow"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 0
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query)
      while (query.isActive) {
        val joinedKeyTime = if (key % 2 == 0) {
          startingTime
        } else {
          // for odd keys we define the time for previous window
          startingTime - 6000L
        }
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime, Some(joinedKeyTime))
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val allKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key).keys
  val oddNumberKeys = allKeys.map(key => key.substring(3).toInt).filter(key => key > 1 && key % 2 != 0)
  oddNumberKeys shouldBe empty
}

状态值水位 #

当 JOIN 子句不包含水位字段上的相等条件时,将使用流到流连接中的另一种状态管理策略。取而代之的是,查询具有一个称为范围条件的条件,表示为水位列上的不等式。因此,该策略的名称是状态值水位。它的使用受以下条件制约:

  • 水位列的存在-与以前一样,此处允许使用时间戳和窗口
  • 在 JOIN 子句中的水位列上定义范围条件-水位列必须被连接,否则按值相等

JOIN 子句中定义的范围条件会自动影响连接边之一的水位。当此条件表示为 leftTimeWatermark > rightTimeWatermark + 10 minutes 时,我们自动知道左侧将仅接受晚于右侧水位 + 10 分钟发生的事件。也就是说,如果右侧水位为 10:00,则左侧水位会自动变为 10:10。它也起相反地作用,即左水位影响右水位。没有什么比一个简单的示例更能说明问题了:

"state value watermark" should "be built from a watermark column and range condition" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "2 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
    .withWatermark("joinedEventTimeWatermark", "2 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    expr("joinedEventTimeWatermark > mainEventTimeWatermark + interval 2 seconds"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 10000L // 10 sec
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progre ss changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query)
      while (query.isActive) {
        val joinedSideEventTime = if (startingTime % 2000 == 0) {
          startingTime + 3000L
        } else {
          // the value computed like this is evidently after the watermark, so should be accepted in the stream
          // but since the range condition is stricter, the row will be ignored
          startingTime - 1000L
        }
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime, Some(joinedSideEventTime))
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val processedKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key).keys
  val keyNumbers = processedKeys.map(key => key.substring(3).toInt)
  val oddKeyNumbers = keyNumbers.filter(keyNumber => keyNumber % 2 != 0)
  oddKeyNumbers shouldBe empty
}

如果我们在双方定义两个不同的水位值,则此策略也将起作用。 区别在于,引擎将采用一个共同的水位值,该值是连接流中最小的水位。 我们可以观察到,在以下测试案例中,连接流的水位为 1970-01-01T00:00:01.000Z:

"state value watermark" should "be built from different watermark columns and range condition" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "2 seconds")
  // To see what happens, let's set the watermark of joined side to 10 times more than the main dataset
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
    .withWatermark("joinedEventTimeWatermark", "20 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    expr("joinedEventTimeWatermark > mainEventTimeWatermark + interval 2 seconds"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  var firstWatermark: Option[String] = None
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      // 21 sec ==> watermark is MAX(event_time) - 20'' and lower value will never change it
      val processingTimeFrom1970 = 21000L
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      firstWatermark = Some(query.lastProgress.eventTime.get("watermark"))
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query)
      while (query.isActive) {
        val joinedSideEventTime = if (startingTime % 2000 == 0) {
          startingTime + 3000L
        } else {
          startingTime - 1000L
        }
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime, Some(joinedSideEventTime))
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  firstWatermark shouldBe defined
  firstWatermark.get shouldEqual "1970-01-01T00:00:01.000Z"
  val processedKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key).keys
  // In this case we don't expect event numbers because odd numbers goes to the first sending condition and the others
  // to the second one
  val keyNumbers = processedKeys.map(key => key.substring(3).toInt)
  val evenKeyNumbers = keyNumbers.filter(keyNumber => keyNumber % 2 == 0)
  evenKeyNumbers shouldBe empty
}

显然,在状态键水位示例中无法使用 2 个不同的水位,因为我们通过水位相等性来连接流。

状态值水位也可以应用于窗口水位,但是,如在时间戳列中一样,它必须在 JOIN 子句中表示为不等式:

"state value watermark" should "be built from a watermark window column and range condition" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark",
    window($"mainEventTimeWatermark", "3 seconds").as("mainWatermarkWindow")).withWatermark("mainWatermarkWindow", "3 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark",
    window($"joinedEventTimeWatermark", "3 seconds").as("joinedWatermarkWindow")).withWatermark("joinedWatermarkWindow", "3 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    expr("joinedWatermarkWindow > mainWatermarkWindow"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 10000L // 10 sec
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query)
      while (query.isActive) {
        val joinedSideEventTime = if (key % 2 == 0) {
          startingTime + 4000L
        } else {
          startingTime - 4000L
        }
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime, Some(joinedSideEventTime))
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val processedKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key).keys
  processedKeys.nonEmpty shouldBe true
  val keyNumbers = processedKeys.map(key => key.substring(3).toInt)
  val oddKeyNumbers = keyNumbers.filter(keyNumber => keyNumber % 2 != 0)
  oddKeyNumbers shouldBe empty
}

混合 #

最后一个策略称为混合策略,它在定义了两个先前策略时发生。 但是,由于其更严格的特性,引擎仅使用一个-状态键水位。 在 getOneSideStateWatermarkPredicate(oneSideInputAttributes: Seq[Attribute], oneSideJoinKeys: Seq[Expression], otherSideInputAttributes: Seq[Attribute]) 中清楚地显示了它:

if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 in the StreamingSymmetricHashJoinExec docs
  // ...
} else if (isWatermarkDefinedOnInput) { // case 2 in the StreamingSymmetricHashJoinExec docs
  // ...
} else {
  None
}

以下测试显示 JOIN 子句中两种策略的存在:

"mixed watermark" should "use stricter state key watermark" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
    .withWatermark("mainEventTimeWatermark", "2 seconds")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
    .withWatermark("joinedEventTimeWatermark", "2 seconds")
 
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("mainEventTimeWatermark") === joinedEventsDataset("joinedEventTimeWatermark") &&
    expr("joinedEventTimeWatermark >= mainEventTimeWatermark - interval 2 seconds"))
 
  val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
      var key = 0
      val processingTimeFrom1970 = 10000L // 10 sec
      stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
      println("progress changed, got watermark" + query.lastProgress.eventTime.get("watermark"))
      key = 2
      // We send keys: 2, 3, 4, 5, 7  in late to see watermark applied
      var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
      while (query.isActive) {
        stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
        startingTime += 1000L
        key += 1
      }
    }
  }).start()
  query.awaitTermination(90000)
 
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  groupedByKeys.keys should not contain allOf("key2", "key3", "key4", "key5", "key6", "key7")
  // Check some initial keys that should be sent after the first generated watermark
  groupedByKeys.keys should contain allOf("key8", "key9", "key10")
  println(s"got keys=${groupedByKeys.mkString("\n")}")
}

流到流连接是有趣的 Apache Spark 结构化流功能。 但是,如果没有适当管理的状态生命周期,它们可能会成为噩梦。 无限数据源是无限硬件资源的代名词,因此成本高昂且维护复杂。 这就是引擎提供三种不同的策略来处理状态的原因:状态键水位,状态值水位和混合水位。 它们都使用水位的概念来检测晚到的行。 它们自动成为要在下一个处理循环中丢弃的行。