Wait the light to fall

侧输出

焉知非鱼

Side Outputs

Side Output #

除了 DataStream 操作产生的主流(main stream)外,还可以产生任意数量的附加侧输出结果流。结果流中的数据类型不必与主流中的数据类型相匹配,不同侧输出的类型也可以不同。当您要分割数据流时,这种操作非常有用,通常您必须复制数据流,然后从每个数据流中过滤掉您不想要的数据。

在使用侧输出时,首先需要定义一个 OutputTag,用来识别侧输出流。

val outputTag = OutputTag[String]("side-output")

请注意 OutputTag 是如何根据侧输出流所包含的元素类型进行类型化的。

可以通过以下函数向侧输出发送数据。

你可以使用 Context 参数(在上面的函数中暴露给用户)向一个由 OutputTag 标识的侧输出发送数据。下面是一个从 ProcessFunction 中发射侧输出数据的例子。

val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")

val mainDataStream = input
  .process(new ProcessFunction[Int, Int] {
    override def processElement(
        value: Int,
        ctx: ProcessFunction[Int, Int]#Context,
        out: Collector[Int]): Unit = {
      // emit data to regular output
      out.collect(value)

      // emit data to side output
      ctx.output(outputTag, "sideout-" + String.valueOf(value))
    }
  })

为了检索侧输出流,你可以在 DataStream 操作的结果上使用 getSideOutput(OutputTag)。这将给你一个 DataStream,它的类型是侧输出流的结果。

val outputTag = OutputTag[String]("side-output")

val mainDataStream = ...

val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html