Wait the light to fall

测试

焉知非鱼

Testing

测试

测试是每个软件开发过程中不可缺少的一部分,因此 Apache Flink 提供的工具可以在测试金字塔的多个层次上测试你的应用程序代码。

测试用户自定义函数 #

通常,我们可以假设 Flink 在用户定义的函数之外产生正确的结果。因此,建议尽可能用单元测试来测试那些包含主要业务逻辑的类。

单元测试无状态、Timeless UDFs。 #

例如,我们来看看下面的无状态 MapFunction。

class IncrementMapFunction extends MapFunction[Long, Long] {

    override def map(record: Long): Long = {
        record + 1
    }
}

通过传递合适的参数和验证输出,用你最喜欢的测试框架对这样的函数进行单元测试是非常容易的。

class IncrementMapFunctionTest extends FlatSpec with Matchers {

    "IncrementMapFunction" should "increment values" in {
        // instantiate your function
        val incrementer: IncrementMapFunction = new IncrementMapFunction()

        // call the methods that you have implemented
        incremeter.map(2) should be (3)
    }
}

同样,使用 org.apache.flink.util.Collector 的用户定义函数(例如 FlatMapFunction 或 ProcessFunction)可以通过提供一个模拟对象而不是真实的 Collector 来轻松测试。一个与 IncrementMapFunction 功能相同的 FlatMapFunction 可以进行如下单元测试。

class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {

    "IncrementFlatMapFunction" should "increment values" in {
       // instantiate your function
      val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()

      val collector = mock[Collector[Integer]]

      //verify collector was called with the right output
      (collector.collect _).expects(3)

      // call the methods that you have implemented
      flattenFunction.flatMap(2, collector)
  }
}

单元测试 有状态或及时的 UDF 和自定义操作符 #

测试一个用户定义函数的功能是比较困难的,因为它涉及到测试用户代码和 Flink 运行时之间的交互。为此,Flink 提供了一个所谓的测试线束的集合,它可以用来测试这样的用户定义函数以及自定义操作符。

  • OneInputStreamOperatorTestHarness(用于 DataStreams 上的操作符)
  • KeyedOneInputStreamOperatorTestHarness(用于 KeyedStreams 上的操作者)
  • TwoInputStreamOperatorTestHarness (适用于两个 DataStreams 的 ConnectedStreams 操作者)
  • KeyedTwoInputStreamOperatorTestHarness (用于两个 KeyedStream 的 ConnectedStreams 上的操作员)

为了使用测试套件,需要一组额外的依赖关系(测试范围)。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.11.0</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>1.11.0</version>
  <scope>test</scope>
  <classifier>tests</classifier>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.11.0</version>
  <scope>test</scope>
  <classifier>tests</classifier>
</dependency>

现在,测试线束可以用来将记录和水印推送到你的用户定义函数或自定义运算符中,控制处理时间,最后对运算符的输出进行断言(包括侧输出)。

class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {

  private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
  private var statefulFlatMap: StatefulFlatMapFunction = null

  before {
    //instantiate user-defined function
    statefulFlatMap = new StatefulFlatMap

    // wrap user defined function into a the corresponding operator
    testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))

    // optionally configured the execution environment
    testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

    // open the test harness (will also call open() on RichFunctions)
    testHarness.open();
  }

  "StatefulFlatMap" should "do some fancy stuff with timers and state" in {


    //push (timestamped) elements into the operator (and hence user defined function)
    testHarness.processElement(2, 100);

    //trigger event time timers by advancing the event time of the operator with a watermark
    testHarness.processWatermark(100);

    //trigger proccesign time timers by advancing the processing time of the operator directly
    testHarness.setProcessingTime(100);

    //retrieve list of emitted records for assertions
    testHarness.getOutput should contain (3)

    //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
    //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
  }
}

KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 是通过额外提供一个包括键类的 TypeInformation 的 KeySelector 来实例化的。

class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {

  private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
  private var statefulFlatMapFunction: FlattenFunction = null

  before {
    //instantiate user-defined function
    statefulFlatMapFunction = new StateFulFlatMap

    // wrap user defined function into a the corresponding operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())

    // open the test harness (will also call open() on RichFunctions)
    testHarness.open();
  }

  //tests

}

在 Flink 代码库中还可以找到更多使用这些测试线束的例子,例如。

  • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest 是一个很好的例子,用于测试依赖于处理或事件时间的操作员和用户定义的函数。
  • org.apache.flink.streaming.api.function.sink.filesystem.LocalStreamingFileSinkTest 展示了如何使用 AbstractStreamOperatorTestHarness 测试自定义的 sink。具体来说,它使用 AbstractStreamOperatorTestHarness.snapshot 和 AbstractStreamOperatorTestHarness.initializeState 来测试它与 Flink 的检查点机制的交互。

注意: AbstractStreamOperatorTestHarness 和它的派生类目前不是公共 API 的一部分,可能会发生变化。

单元测试 ProcessFunction #

鉴于其重要性,除了之前的测试线束可以直接用于测试 ProcessFunction 外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试线束工厂,可以更方便地进行测试线束实例化。考虑到这个例子。

注意: 要使用这个测试线束,你还需要引入上一节中提到的依赖关系。

class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {

    @throws[Exception]
    override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
      out.collect(value)
    }
}

使用 ProcessFunctionTestHarnesses 对这样的函数进行单元测试是非常容易的,通过传递合适的参数并验证输出。

class PassThroughProcessFunctionTest extends FlatSpec with Matchers {

  "PassThroughProcessFunction" should "forward values" in {

    //instantiate user-defined function
    val processFunction = new PassThroughProcessFunction

    // wrap user defined function into a the corresponding operator
    val harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction)

    //push (timestamped) elements into the operator (and hence user defined function)
    harness.processElement(1, 10)

    //retrieve list of emitted records for assertions
    harness.extractOutputValues() should contain (1)
  }
}

关于如何使用 ProcessFunctionTestHarnesses 来测试 ProcessFunction 的不同风味,如 KeyedProcessFunction、KeyedCoProcessFunction、BroadcastProcessFunction 等的更多例子,鼓励用户查看 ProcessFunctionTestHarnessesTest。

JUnit 规则 MiniClusterWithClientResource #

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 JUnit 规则,用于针对本地的、嵌入式的迷你集群测试完整的作业,名为 MiniClusterWithClientResource。

要使用 MiniClusterWithClientResource,需要一个额外的依赖(测试范围)。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.11.0</version>
</dependency>

让我们以前面几节中同样简单的 MapFunction 为例。

class IncrementMapFunction extends MapFunction[Long, Long] {

    override def map(record: Long): Long = {
        record + 1
    }
}

现在可以在本地 Flink 集群中测试使用该 MapFunction 的简单管道,具体如下。

class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(1)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }


  "IncrementFlatMapFunction pipeline" should "incrementValues" in {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(2)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env.fromElements(1, 21, 22)
       .map(new IncrementMapFunction())
       .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    CollectSink.values should contain allOf (2, 22, 23)
    }
}
// create a testing sink
class CollectSink extends SinkFunction[Long] {

  override def invoke(value: Long): Unit = {
    synchronized {
      CollectSink.values.add(value)
    }
  }
}

object CollectSink {
    // must be static
    val values: util.List[Long] = new util.ArrayList()
}

关于 MiniClusterWithClientResource 的集成测试的几点说明。

  • 为了不把你的整个流水线代码从生产中复制到测试中,请在你的生产代码中使源和汇可插拔,并在你的测试中注入特殊的测试源和测试汇。

  • 这里使用了 CollectSink 中的静态变量,因为 Flink 在将所有操作符分布在集群中之前,会将它们序列化。通过静态变量与本地 Flink 迷你集群实例化的运算符进行通信是解决这个问题的一种方法。另外,你可以将数据写到与你的测试汇的临时目录中的文件中。

  • 如果你的作业使用事件时间计时器,你可以实现一个自定义的并行源函数来发射水印。

  • 建议始终以并行度 >1 的方式在本地测试你的流水线,以识别只有并行执行的流水线才会出现的错误。

  • 优先选择 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会支配实际测试的执行时间。

  • 如果你的管道包含自定义状态处理,你可以通过启用检查点并在迷你集群内重新启动作业来测试其正确性。为此,你需要通过从你的管道中的(仅测试的)用户定义函数中抛出一个异常来触发失败。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html