Wait the light to fall

Spark Structured Streaming 中流之间的内连接

焉知非鱼

Apache Kafka Streams 支持流之间的连接,社区对 Apache Spark 期望相同。此功能已在最新的 2.3.0 版本中实现并发布,在此之后的几个月后,现在是讨论该功能的好时机。

该帖子由 2 个部分组成。第一个介绍了流管道中的连接的想法。下一个讨论两种受支持的类型之一-内连接。

连接和流 #

在流上执行连接操作非常具有挑战性,这有很多原因。最明显的是延迟。对于有界数据源,要连接的所有数据都就位,而对于基于流的数据源,数据则在不断移动。有时(通常?)移动速度不同。可能是由于技术方面的问题(例如摄入管道问题),或者仅是由于功能要求,而相关事件并非总是在相似的时间段内生成。这种功能限制之一可能是在电子商务商店中的订购过程,在该过程中,订单很难很快完成。因此,连接操作必须以某种方式管理相关但非常异步的事件的情况。

解决的另一个重要点是状态管理。由于给定事件的数据可能随时(非常晚)到达,并且存储该事件的存储空间有限,因此引擎必须弄清楚如何处理累积状态,尤其是何时丢弃它。此特定时间对应于我们不希望为给定的连接键接收任何新事件的时刻。

Apache Spark 结构化流提供了连接 2 个或更多流的功能,从而解决了 2.3.0 版本中的两个问题。流到流的连接可以通过以下时间轴来表征:

  • 连接语义与批连接相同
  • 找到匹配元素后立即生成输出(内连接)
  • 水位和时间范围查询用于连接较晚的数据,并确定何时不再发生给定键的其他事件(状态丢弃)
  • 支持不同的连接类型:内连和外连
  • 连接可以级联,即应用于两个以上的流

但是,从 Apache Spark 2.3.1 开始,流到流连接具有几个限制:

  • 它只能与 append 输出模式一起使用(已计划支持其他模式)
  • 连接之前仅支持类似 map 的操作,例如我们不能在连接之前进行聚合

流的内连接 #

支持的第一种流到流连接是内连接。由于当没有匹配的行不发出时是严格的连接,因此连接的列不需要任何时间限制。但是,这样做很危险,因为即使没有匹配的行也可能在状态存储中保留很长时间。这是因为建议使用一种条件来告知特定键的状态应保留多长时间。下面显示了不清除状态时内连接的最简单情况:

it should "output the result as soon as it arrives without watermark" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val stream = mainEventsStream.toDS().join(joinedEventsStream.toDS(), $"mainKey" === $"joinedKey")
 
  val query = stream.writeStream.foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      var key = 0
      while (true) {
        // Here main event are always sent before the joined
        // But we also send, an event for key - 10 in order to see if the main event is still kept in state store
        joinedEventsStream.addData(Events.joined(s"key${key-10}"))
        val mainEventTime = System.currentTimeMillis()
        mainEventsStream.addData(MainEvent(s"key${key}", mainEventTime, new Timestamp(mainEventTime)))
        Thread.sleep(1000L)
        joinedEventsStream.addData(Events.joined(s"key${key}"))
        key += 1
      }
    }
  }).start()
  query.awaitTermination(60000)
 
  // As you can see in this test, when neither watermark nor range condition is defined, the state isn't cleared
  // It's why we can see data came 9/10 seconds after the first joined event of the same key
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  val keysWith2Entries = groupedByKeys.filter(keyWithEntries => keyWithEntries._2.size == 2)
  keysWith2Entries.foreach(keyWithEntries => {
    val entries = keyWithEntries._2
    val metric1 = entries(0)
    val metric2 = entries(1)
    val diffBetweenEvents = metric2.joinedEventMillis - metric1.joinedEventMillis
    val timeDiffSecs = diffBetweenEvents/1000
    (timeDiffSecs >= 9 && timeDiffSecs <= 10) shouldBe true
  })
}

在前面的示例中,流以简单的键条件连接。 另一种连接类型使用窗口。 我们可以在下面的示例中看到它们是如何工作的,在该示例中,具有奇数键的行被放置在较晚的位置,即在发出不可为空的连接端事件的窗口之外。 结果,我们应该只接收具有偶数键的行:

it should "join rows per windows" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", window($"mainEventTimeWatermark", "5 seconds"),
    $"mainEventTime", $"mainEventTimeWatermark")
  val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", window($"joinedEventTimeWatermark", "5 seconds"),
    $"joinedEventTime", $"joinedEventTimeWatermark")
  val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
    mainEventsDataset("window") === joinedEventsDataset("window"))
 
  val query = stream.writeStream.foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      var key = 0
      var iterationTimeFrom1970 = 1000L // 1 sec
      while (query.isActive) {
        val (key1, key2) = (key + 1, key + 2)
        // join window is of 5 seconds so joining the value 6 seconds later (1 sec of sleep * 6)
        // should exclude given row from the join. Thus, at the end we should retrieve only rows with even keys
        joinedEventsStream.addData(Events.joined(s"key${key1-6}", eventTime = iterationTimeFrom1970))
        mainEventsStream.addData(MainEvent(s"key${key1}", iterationTimeFrom1970, new Timestamp(iterationTimeFrom1970)),
          MainEvent(s"key${key2}", iterationTimeFrom1970, new Timestamp(iterationTimeFrom1970)))
        Thread.sleep(1000L)
        joinedEventsStream.addData(Events.joined(s"key${key2}", eventTime = iterationTimeFrom1970))
        iterationTimeFrom1970 += iterationTimeFrom1970
        key += 2
      }
    }
  }).start()
  query.awaitTermination(60000)
 
  // Because rows with odd key are joined in late (outside the 5 seconds window), we should find
  // here only rows with even keys
  val processedEventsKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  processedEventsKeys.keys.foreach(key => {
    val keyNumber = key.substring(3).toInt
    keyNumber % 2 == 0 shouldBe true
  })
}

如第一部分所述,流到流连接不支持所有可能的操作。 如以下测试案例所示,可以将它们仅与映射或过滤器转换一起使用:

it should "filter and map before joining" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val mainEventsWithMappedKey = mainEventsStream.toDS().filter(mainEvent => mainEvent.mainKey.length > 3)
    .map(mainEvent => mainEvent.copy(mainKey = s"${mainEvent.mainKey}_copy"))
  // For nullable side we deliberately omit the filtering - it shows that the query
  // works even without some subtle differences
  val joinedEventsWithMappedKey = joinedEventsStream.toDS()
    .map(joinedEvent => joinedEvent.copy(joinedKey = s"${joinedEvent.joinedKey}_copy"))
 
  val stream = mainEventsWithMappedKey.join(joinedEventsWithMappedKey, $"mainKey" === $"joinedKey")
 
  val query = stream.writeStream.foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      var key = 0
      while (query.isActive) {
        val eventTime = System.currentTimeMillis()
        mainEventsStream.addData(MainEvent(s"key${key}", eventTime, new Timestamp(eventTime)))
        joinedEventsStream.addData(Events.joined(s"key${key}", eventTime = eventTime))
        Thread.sleep(1000L)
        key += 1
      }
    }
  }).start()
  query.awaitTermination(60000)
 
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  groupedByKeys.keys.foreach(key => {
    key should endWith("_copy")
  })
}
 
it should "fail when the aggregations are made before the join" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val exception = intercept[AnalysisException] {
    val mainEventsWithMappedKey = mainEventsStream.toDS()
    val joinedEventsWithMappedKey = joinedEventsStream.toDS().groupBy($"joinedKey").count()
 
    val stream = mainEventsWithMappedKey.join(joinedEventsWithMappedKey, $"mainKey" === $"joinedKey")
 
    stream.writeStream.foreach(RowProcessor).start()
  }
  exception.getMessage() should include("Append output mode not supported when there are streaming aggregations " +
    "on streaming DataFrames/DataSets without watermark")
}

关于流对流连接的第一篇介绍性文章讨论了内连接。 但是,第一部分通过说明为什么很难实现这些连接,以大的角度展示了这些连接。 正如我们可以从中了解到的那样,主要问题是迟到事件和状态管理,这直接影响到结果的质量和硬件要求。 第二部分显示了 2 种实现的连接类型之一-内连接。 这些示例显示了流到流连接的局限性以及它们在最严格的情况下的行为(在这种情况下,两侧必须匹配)。