Wait the light to fall

流和流之间的连接内部研讨

焉知非鱼

在最近关于Apache Spark结构化流的3篇文章中,我们发现了流连接:内部连接,外部连接和状态管理策略。发现所有这些操作背后发生的事情是总结该系列的一个好方法。 这篇文章首先介绍了流连接过程中涉及的类。接下来是专注于与联接有关的状态管理内部的部分。文章以关于连接机制的一小段结尾。

所涉及的类 #

在流连接所涉及的类中,我们可以区分3个非常重要的类:SymmetricHashJoinStateManager,StreamingSymmetricHashJoinExec 和 StreamingJoinHelper。所有这些都用于流查询执行的不同阶段。

首先,查询的流式表示形式 IncrementalExecution 实例存储对该状态的引用。如果查询具有某些流间连接,则此状态在每次执行时都表示为 StreamingSymmetricHashJoinExec 的实例。该实例在每次执行中都不同,不同点是偏移量统计信息和状态水位谓词。谓词的计算公式为:

def getStateWatermarkPredicates(
    leftAttributes: Seq[Attribute],
    rightAttributes: Seq[Attribute],
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    condition: Option[Expression],
    eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates

此方法通过应用不同的规则来计算用于从状态存储中丢弃太晚的行的状态水位谓词。 它首先检查查询的相等性 JOIN 中涉及的所有列中是否至少有一个用水位注释标记的列。 如果是,它将自动认为必须将状态键水位策略应用于迟到的行(您可以在 Apache Spark 结构化流的外连接中了解它们)。 如果不是,则检查一个连接侧是否定义了水位列。 如果满足上述条件之一,并且优先选择前者,则使用常规的 org.apache.spark.sql.execution.streaming.WatermarkSupport#watermarkExpression(optionalWatermarkExpression: Option[Expression], optionalWatermarkMs: Option[Long]) 方法。

JOIN 子句中的相等重要性 流到流连接的实际实现仅接受相等关系作为连接键。 这意味着如果我们有2个流:stream#1(field1[int], field2[timestamp]), stream#2(field10[int], field20[timestamp]), ,则只有 field1 和 field10 之间的相等关系以及 field2 和 field20 将被视为连接键。 如果在 JOIN 的 ON 部分中表示不等式,则会将其转换为 WHERE 条件。

例如,以下查询:

val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark",
window($"mainEventTimeWatermark", "3 seconds").as("mainWatermarkWindow")).withWatermark("mainWatermarkWindow", "3 seconds")
val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark",
window($"joinedEventTimeWatermark", "3 seconds").as("joinedWatermarkWindow")).withWatermark("joinedWatermarkWindow", "3 seconds")
 
val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") && expr("joinedWatermarkWindow > mainWatermarkWindow"))
 
val query = stream.writeStream.trigger(Trigger.ProcessingTime(5000L)).foreach(RowProcessor).start()

会被转换为:

+- Exchange hashpartitioning(joinedKey#966, 200)
   +- EventTimeWatermark joinedWatermarkWindow#23: struct, interval 3 seconds
      +- Union
         :- *(7) Project [joinedKey#966, joinedEventTime#967L, joinedEventTimeWatermark#968, named_struct(start, precisetimestampconversion(((((CASE WHEN 
         // ...
         :  +- *(7) Filter (isnotnull(joinedEventTimeWatermark#968) && isnotnull(joinedKey#966))
         :     +- LocalTableScan [joinedKey#966, joinedEventTime#967L, joinedEventTimeWatermark#968]
// ...

虽然类似的查询,但在JOIN子句中具有相等条件,但将在数据混洗级别进行:

val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
  mainEventsDataset("mainEventTimeWatermark") === joinedEventsDataset("joinedEventTimeWatermark"))

并且物理执行计划为:

== Physical Plan ==
StreamingSymmetricHashJoin [mainKey#1447, mainEventTimeWatermark#1449-T2000ms], [joinedKey#1451, joinedEventTimeWatermark#1453], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/tmp/temporary-4bd3025b-1e05-4410-9ee4-04dd4191c63b/state, runId = b85412cd-94b1-4112-8d81-1e5c2e94a0f9, opId = 0, ver = 2, numPartitions = 200], 8000, state cleanup [ left key predicate: (input[1, timestamp, true] <= 8000000), right key predicate: (input[1, timestamp, true] <= 8000000) ]
:- Exchange hashpartitioning(mainKey#1447, mainEventTimeWatermark#1449-T2000ms, 200)
:  +- *(5) Filter isnotnull(mainEventTimeWatermark#1449-T2000ms)
:     +- EventTimeWatermark mainEventTimeWatermark#1449: timestamp, interval 2 seconds
:        +- Union
// ...
:              +- LocalTableScan [mainKey#2479, mainEventTime#2480L, mainEventTimeWatermark#2481]
+- Exchange hashpartitioning(joinedKey#1451, joinedEventTimeWatermark#1453, 200)
   +- Union
      :- *(6) Filter (isnotnull(joinedEventTimeWatermark#1453) && isnotnull(joinedKey#1451))
      :  +- LocalTableScan [joinedKey#1451, joinedEventTime#1452L, joinedEventTimeWatermark#1453]

旧状态清理 #

定义水位表达式后,引擎将其转换为状态键水位的实例 (JoinStateKeyWatermarkPredicate(expr: Expression)) 或状态值水位 (JoinStateValueWatermarkPredicate(expr: Expression))。需要强调的是,每个查询方都有自己的谓词,因为每个流可以具有不同的延迟特性。

这种构建的谓词稍后用于 org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec。OneSideHashJoiner 实例由 StreamingSymmetricHashJoinExec 引用。在删除过程中,引擎将调用适当的 SymmetricHashJoinStateManager 的方法:

  • removeByKeyCondition(removalCondition: UnsafeRow => Boolean) 用于键水位
  • removeByValueCondition(removalCondition: UnsafeRow => Boolean) 用于值水位

两种方法的参数都是从基础水位类型构建的谓词。 对于值一,它可以表示为(joinedWatermarkWindow#23-T3000ms> mainWatermarkWindow#14-T3000ms)。 内部将状态存储为 multi-map,即一个具有零个或多个值的键。 这样引擎就只能删除一部分存储状态。

连接 #

到目前为止,我们已经了解了状态管理,但是如何连接自己呢?在信息块的第一部分中,简要介绍了它们的执行情况。正如我们在此处看到的那样,不可避免地要通过哈希交换(即混洗)进行连接。具有给定连接键的所有行都移到同一分区,这就是魔术发生的地方。物理执行是通过 processPartitions(leftInputIter: Iterator[InternalRow], rightInputIter: Iterator[InternalRow]) 方法进行的。它由2个 OneSideHashJoiner 实例提供帮助,每个实例代表联接的每一侧。

连接的结果是一行 org.apache.spark.sql.catalyst.expressions.JoinedRow 类型,该类型使用 withLeft(InternalRow)withRight(InternalRow) 方法构造。但是在发生之前,OneSideHashJoiner 首先检索所有非迟到的行,并为它们中的每一个检索,并检查是否满足过滤条件。这些条件是连接前过滤器(仅适用于已处理连接侧的过滤器)和连接后过滤器(整体连接条件)。稍后,连接器尝试通过执行以下操作为每条有效行在另一侧找到匹配的行:

val key = keyGenerator(thisRow)
val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
  generateJoinedRow(thisRow, thatRow)
}.filter(postJoinFilter)

如果实际匹配的行(原始版本,未连接)可以保留在状态存储中,则连接器稍后进行验证。 仅当2个水位谓词(键和值,已在文章中提到)有效(即行本身未过期)时,该参数才会保留在该位置。 这就是内连接的全部内容。 对于外连接,将启动特殊处理。 匹配的行由不匹配完成,发生在这里:

val outputIter: Iterator[InternalRow] = joinType match {
  case Inner =>
    innerOutputIter
  case LeftOuter =>
    def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
      rightSideJoiner.get(leftKeyValue.key).exists { rightValue =>
        postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
      }
    }
    // BK: removeOldState returns an iterator with expired rows 
    //     It clearly shows that without watermark it wouldn't be possible
    //     to emit not matched ones in the case of outer join.
    val removedRowIter = leftSideJoiner.removeOldState()
    val outerOutputIter = removedRowIter
      .filterNot(pair => matchesWithRightSideState(pair))
      .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
 
    innerOutputIter ++ outerOutputIter
  case RightOuter =>
    // BK: does the same as LeftOuter but by switching sides
  case _ => throwBadJoinTypeException()
}

使用完所有连接的行后,将执行回调以从状态存储中删除旧状态:

val cleanupIter = joinType match {
  case Inner =>
    leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
  // BK: for outer only joined rows are removed - the removal of outer side
  //     was made before, at generating not fully matched rows
  case LeftOuter => rightSideJoiner.removeOldState()
  case RightOuter => leftSideJoiner.removeOldState()
  case _ => throwBadJoinTypeException()
}  

无限数据源中的状态管理非常重要。 通过计算水位谓词,它有助于减少存储的行数。 但是,如本文第一部分所示,它不是流连接功能中涉及的单个部分。 另一个在寻找匹配。 在第3部分中介绍,它描述了StreamingSymmetricHashJoinExec 的 processPartitions 方法中实现的连接逻辑。 如我们所知,该操作首先检索有效行(过滤器+允许的延迟),然后只有主行在另一侧查找匹配的行。 完成此操作后,将返回一个包含所有连接行的迭代器,并且在迭代结束时将调用清理旧行的函数。