Wait the light to fall

用于外部数据访问的异步 I/O

焉知非鱼

Asynchronous Io for External Data Access

本页解释了如何使用 Flink 的 API 与外部数据存储进行异步 I/O。对于不熟悉异步或事件驱动编程的用户来说,一篇关于 Futures 和事件驱动编程的文章可能是有用的准备。

注:关于异步 I/O 实用程序的设计和实现的细节可以在提案和设计文件 FLIP-12:异步I/O设计和实现中找到

异步I/O操作的必要性 #

在与外部系统交互时(例如用存储在数据库中的数据来丰富流事件时),需要注意与外部系统的通信延迟不会主导流应用的总工作。

奈何访问外部数据库中的数据,例如在 MapFunction 中,通常意味着同步交互。一个请求被发送到数据库,MapFunction 等待直到收到响应。在许多情况下,这种等待占据了函数的绝大部分时间。

与数据库的异步交互意味着一个并行函数实例可以同时处理许多请求,并同时接收响应。这样一来,等待时间就可以与发送其他请求和接收响应叠加起来。最起码,等待时间可以摊在多个请求上。这在大多数情况下会导致更高的流吞吐量。

img

注意:通过仅仅将 MapFunction 扩展到很高的并行度来提高吞吐量,在某些情况下也是可行的,但通常要付出很高的资源代价:拥有更多的并行 MapFunction 实例意味着更多的任务、线程、Flink 内部网络连接、与数据库的网络连接、缓冲区以及一般的内部记账开销。

前提条件 #

如上节所述,要实现对数据库(或键/值存储)的适当异步 I/O,需要向该数据库提供一个支持异步请求的客户端。许多流行的数据库都提供了这样的客户端。

在没有这样的客户端的情况下,可以尝试通过创建多个客户端,并用线程池处理同步调用,将同步客户端变成有限的并发客户端。然而,这种方法通常比一个合适的异步客户端效率低。

异步 I/O API #

Flink 的 Async I/O API 允许用户使用异步请求客户端与数据流。该 API 处理与数据流的集成,以及处理顺序、事件时间、容错等。

假设自己有一个目标数据库的异步客户端,需要三个部分来实现对数据库的异步 I/O 的流转换。

  • 一个 AsyncFunction 的实现,用来调度请求。
  • 一个回调,获取操作结果并将其交给 ResultFuture
  • 在 DataStream 上应用异步 I/O 操作作为转换。

下面的代码示例说明了基本模式。

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

重要提示:ResultFuture.complete 的第一次调用就完成了。所有后续的完成调用将被忽略。

以下两个参数控制异步操作。

  • 超时: 超时定义了异步请求在被认为失败之前可能需要的时间。这个参数可以防范死机/失败的请求。

  • Capacity(容量):该参数定义了异步请求在被认为失败之前可能需要的时间。这个参数定义了多少个异步请求可以同时进行。尽管异步I/O方法通常会带来更好的吞吐量,但操作者仍然可以成为流应用的瓶颈。限制并发请求的数量可以确保操作者不会积累越来越多的待处理请求的积压,但一旦容量耗尽,就会触发背压。

超时处理 #

当一个异步 I/O 请求超时时,默认情况下会抛出一个异常并重新启动作业。如果你想处理超时,你可以重写 AsyncFunction#timeout 方法。

结果的顺序 #

AsyncFunction 发出的并发请求经常以某种未定义的顺序完成,基于哪个请求先完成。为了控制结果记录以何种顺序发出,Flink 提供了两种模式。

  • Unordered: 异步请求一结束,结果记录就会被发出。在异步 I/O 操作符之后,流中记录的顺序与之前不同。这种模式以处理时间为基本时间特性时,延迟最低,开销最小。使用 AsyncDataStream.unorderedWait(...) 来实现这种模式。

  • Ordered: 在这种情况下,流的顺序被保留下来。结果记录的发出顺序与异步请求被触发的顺序相同(运算符输入记录的顺序)。为了达到这个目的,操作符会缓冲一个结果记录,直到它前面的所有记录都被发出来(或定时发出来)。这通常会在检查点中引入一些额外的延迟和一些开销,因为与无序模式相比,记录或结果在检查点状态下维持的时间更长。使用 AsyncDataStream.orderedWait(...) 来处理这种模式。

事件时间 #

当流媒体应用程序使用事件时间工作时,水印将由异步 I/O 操作符正确处理。具体来说,这意味着两种顺序模式的以下内容。

  • 无序的:水印不会超越记录,反之亦然,这意味着水印会建立一个顺序边界。只有在水印之间才会发出无序的记录。发生在某一水印之后的记录,只有在该水印被发射之后才会被发射。而水印则只有在该水印之前的所有输入的结果记录被发出之后才会被发出。

这意味着在有水印的情况下,无序模式会引入一些与有序模式相同的延迟和管理开销。该开销的数量取决于水印的频率。

  • 有序的: 水印和记录的顺序被保留下来 就像记录之间的顺序被保留一样 与处理时间相比,开销没有明显变化。

请记住,摄取时间是事件时间的一种特殊情况,其自动生成的水印是基于源处理时间的。

容错保证 #

异步 I/O 操作符提供了完全精确的一次容错保证,它将飞行中的异步请求记录存储在检查点中,并在故障恢复时恢复/重新触发请求。它将飞行中的异步请求记录存储在检查点中,并在从故障中恢复时恢复/重新触发请求。

实现技巧 #

对于有 Executor(或 Scala 中的 ExecutionContext)用于回调的 Futures 实现,我们建议使用 DirectExecutor,因为回调通常只做最少的工作,而且DirectExecutor 避免了额外的线程间交接开销。回调通常只将结果交给 ResultFuture,后者将其添加到输出缓冲区。从那里开始,包括记录排放和与检查点记账的交互在内的繁重逻辑无论如何都发生在一个专用线程池中。

可以通过 org.apache.flink.runtime.concurrent.Executors.directExecutor()com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得 DirectExecutor。

注意事项 #

AsyncFunction 不叫多线程。

我们在这里要明确指出的一个常见的困惑是,AsyncFunction 不是以多线程的方式调用的。AsyncFunction 只存在一个实例,并且对于流的各个分区中的每一条记录,它都会被依次调用。除非 asyncInvoke(...) 方法快速返回并依赖于回调(由客户端),否则不会导致正确的异步 I/O。

例如,以下模式会导致阻塞 asyncInvoke(...) 函数,从而使异步行为无效。

  • 使用一个数据库客户端,其查找/查询方法的调用会被阻塞,直到结果被接收回来为止

  • asyncInvoke(...) 方法中阻止/等待异步客户端返回的未来型对象。

出于一致性的考虑,AsyncFunction 的操作符(AsyncWaitOperator)目前必须位于操作符链的头部。

由于在 FLINK-13063 问题中给出的原因,我们目前必须打破 AsyncWaitOperator 的操作符链,以防止潜在的一致性问题。这是对以前支持链的行为的改变。需要旧行为并接受潜在的违反一致性保证的用户可以手动实例化并将 AsyncWaitOperator 添加到作业图中,并通过 AsyncWaitOperator#setChainingStrategy(ChainingStrategy.ALWAYS) 将链式策略设置回链式。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html