Wait the light to fall

数据源

焉知非鱼

Data Sources

注:这描述了新的数据源 API ,作为 FLIP-27 的一部分在 Flink 1.11 中引入。这个新的 API 目前处于 BETA 状态。 大多数现有的源连接器还没有(截至 Flink 1.11 )使用这个新的 API 实现,而是使用以前的 API ,基于 SourceFunction 。 本页介绍了 Flink 的数据源 API 及其背后的概念和架构。如果你对 Flink 中的数据源是如何工作的,或者你想实现一个新的数据源,请阅读本页面。

如果您正在寻找预定义的源连接器,请查看连接器文档

数据源概念 #

核心部件

一个数据源有三个核心组件。SplitSplitEnumeratorSourceReader

  • Split 是数据源所消耗的一部分数据,就像一个文件或一个日志分区。Split 是源分配工作和并行读取数据的粒度。

  • SourceReader 请求 Split 并进行处理,例如读取 Split 所代表的文件或日志分区。SourceReaderSourceOperators 的 Task Manager 上并行运行,并产生事件/记录的并行流。

  • SplitEnumerator 生成 Split 并将它们分配给 SourceReader 。它作为单个实例在任务管理器上运行,负责维护待处理的 Split 的积压,并以平衡的方式将它们分配给读者。

Source 类是将上述三个组件联系在一起的 API 入口点。

img

统一的跨流和批处理

数据源 API 以统一的方式支持无界流源和有界批处理源。

这两种情况的区别很小:在有界/批处理的情况下,枚举器生成一组固定的 split ,而且每个 split 必然是有限的。在无界流的情况下,这两种情况中的一种是不正确的( split 不是有限的,或者枚举器不断产生新的 split )。

例子 #

下面是一些简化的概念性例子,以说明在流式和批处理情况下,数据源组件如何交互。

请注意,这并不能准确地描述 Kafka 和 File 源的实现是如何工作的;部分内容是简化的,用于说明目的。

绑定的文件源

源有一个要读取的目录的 URI/路径,以及一个定义如何解析文件的格式。

  • Split 是一个文件,或者一个文件的一个区域(如果数据格式支持分割文件)。
  • SplitEnumerator 列出了给定目录路径下的所有文件。它将 Split 分配给下一个请求 Split 的读者。一旦所有的 Split 都分配完毕,它就会用 NoMoreSplits 来响应请求。
  • SourceReader 请求一个 Split ,并读取被分配的 Split (文件或文件区域),并使用给定的格式进行解析。如果它没有得到另一个 Split ,而是得到一个 NoMoreSplits 消息,它就结束了。

非绑定流文件源

这个源的工作方式和上面描述的一样,除了 SplitEnumerator 从不响应 NoMoreSplits ,而是周期性地列出给定 URI/Path 下的内容以检查新文件。一旦发现新文件,它就会为它们生成新的 Splits ,并可以将它们分配给可用的 SourceReaders

无界流 Kafka 源

该源有一个 Kafka Topic (或 Topic 列表或 Topic regex )和一个 Deserializer 来解析记录。

  • 一个 Split 就是一个 Kafka Topic 分区。
  • SplitEnumerator 连接到 brokers ,以列出所有涉及订阅的主题分区。枚举器可以选择重复这个操作来发现新添加的主题/分区。
  • SourceReader 使用 KafkaConsumer 读取分配的 split (主题分区),并使用提供的 Deserializer 反序列化记录。分割(Topic Partitions) 没有终点,所以读取器永远不会到达数据的终点。

绑定的 Kafka 源

和上面一样,只是每个 Split (主题分区)有一个定义的结束偏移量。一旦 SourceReader 达到一个 Split 的结束偏移量,它就会完成该 Split 。一旦所有分配的 Split 结束, SourceReader 就结束了。

数据源 API #

本节介绍了 FLIP-27 中新引入的 Source API 的主要接口,并为开发者提供了 Source 开发的技巧。

Source #

Source API 是一个工厂风格的接口,用于创建以下组件。

  • Split Enumerator
  • 源读取器
  • 分离式序列器
  • 枚举器检查点序列器

除此之外, Source 还提供了源的边界属性,这样 Flink 可以选择合适的模式来运行 Flink 作业。

Source 的实现应该是可序列化的,因为 Source 实例在运行时被序列化并上传到 Flink 集群。

SplitEnumerator #

SplitEnumerator 有望成为 Source 的"大脑"。SplitEnumerator 的典型实现会做以下工作。

  • SourceReader 注册处理
  • SourceReader 失败处理
    • 当 SourceReader 失败时,将调用 addSplitsBack() 方法。SplitEnumerator 应该收回未被失败的 SourceReader 承认的分割分配。
  • SourceEvent 处理
    • SourceEvents 是在 SplitEnumerator 和 SourceReader 之间发送的自定义事件。实现可以利用这种机制来进行复杂的协调。
  • 分割发现和分配
    • SplitEnumerator 可以根据各种事件将 split 分配给 SourceReaders ,包括发现新的 split 、新的 SourceReader 注册、 SourceReader 失败等。

SplitEnumerator 可以借助 SplitEnumeratorContext 完成上述工作, SplitEnumeratorContext 是在创建或恢复 SplitEnumerator 时提供给 Source 的。SplitEnumeratorContext 允许 SplitEnumerator 检索读取器的必要信息并执行协调动作。Source 实现应该将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。

虽然 SplitEnumerator 实现可以通过只在它的方法被调用时采取协调动作的被动方式很好地工作,但一些 SplitEnumerator 实现可能希望主动采取行动。例如,一个 SplitEnumerator 可能希望定期运行 split discovery ,并将新的 split 分配给 SourceReaders 。这样的实现可能会发现调用 Async() 方法 SplitEnumeratorContext 很方便。下面的代码片段展示了 SplitEnumerator 实现如何在不维护自己的线程的情况下实现这一点。

class MySplitEnumerator implements SplitEnumerator<MySplit> {
    private final long DISCOVER_INTERVAL = 60_000L;

    /**
     * A method to discover the splits.
     */
    private List<MySplit> discoverSplits() {...}
    
    @Override
    public void start() {
        ...
        enumContext.callAsync(this::discoverSplits, splits -> {
            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
            int parallelism = enumContext.currentParallelism();
            for (MockSourceSplit split : splits) {
                int owner = split.splitId().hashCode() % parallelism;
                assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);
            }
            enumContext.assignSplits(new SplitsAssignment<>(assignments));
        }, 0L, DISCOVER_INTERVAL);
        ...
    }
    ...
}

SourceReader #

SourceReader 是一个运行在 Task Manager 中的组件,用于消耗来自 Splits 的记录。

SourceReader 暴露了一个基于拉的消费接口。一个 Flink 任务在循环中不断调用 pollNext(ReaderOutput) 来轮询 SourceReader 的记录。pollNext(ReaderOutput) 方法的返回值表示源阅读器的状态。

  • MORE_AVAILABLE - SourceReader 立即有更多的记录可用。
  • NOTHING_AVAILABLE - SourceReader 此时没有更多的记录可用,但将来可能会有更多的记录。
  • END_OF_INPUT - SourceReader 已经用完了所有的记录,达到了数据的终点。这意味着 SourceReader 可以被关闭。

为了保证性能,会给 pollNext(ReaderOutput) 方法提供一个 ReaderOutput ,所以如果有必要, SourceReader 可以在一次调用 pollNext() 的过程中发出多条记录。例如,有时外部系统的工作粒度是块。一个块可能包含多条记录,但源码只能在块的边界处进行检查点。在这种情况下, SourceReader 可以一次将一个块中的所有记录排放到 ReaderOutput 。但是,除非必要, SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发射多条记录。这是因为从 SourceReader 中进行轮询的任务线程是在事件循环中工作的,不能阻塞。

SourceReader 的所有状态都应该维护在 SourceSplits 里面,这些状态在 snapshotState() 调用时返回。这样做可以在需要时将 SourceSplits 重新分配给其他 SourceReaders 。

在创建 SourceReader 时,会向 Source 提供一个 SourceReaderContext 。预计 Source 将把上下文传递给 SourceReader 实例。SourceReader 可以通过 SourceReaderContext 向其 SplitEnumerator 发送 SourceEvent 。Source 的一个典型的设计模式是让 SourceReaders 向 SplitEnumerator 报告它们的本地信息, SplitEnumerator 有一个全局视图来做决策。

SourceReader API 是一个低级的 API ,它允许用户手动处理 split ,并有自己的线程模型来获取和交接记录。为了方便 SourceReader 的实现, Flink 提供了一个 SourceReaderBase 类,大大减少了编写 SourceReader 的工作量。强烈建议连接器开发人员利用 SourceReaderBase ,而不是从头开始编写 SourceReaders 。更多细节请查看 Split Reader API 部分。

使用 Source #

为了从 Source 创建 DataStream ,需要将 Source 传递给 StreamExecutionEnvironment。例如:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.fromSource(
      mySource,
      WatermarkStrategy.noWatermarks(),
      "MySourceName")
...

Split 读取器 API #

核心的 SourceReader API 是完全异步的,需要实现者手动管理异步拆分读取。然而,在实践中,大多数 Source 使用执行阻塞操作,比如在客户端(例如 KafkaConsumer )上阻塞 poll() 调用,或者在分布式文件系统( HDFS , S3 ,…)上阻塞 I/O 操作。为了与异步的 Source API 兼容,这些阻塞(同步)操作需要发生在单独的线程中,线程将数据交给异步部分的阅读器。

SplitReader 是用于简单的基于同步读取/轮询的源码实现的高级 API ,比如文件读取、 Kafka 等。

核心是 SourceReaderBase 类,它接收一个 SplitReader 并创建运行 SplitReader 的 fetcher 线程,支持不同的消费线程模型。

SplitReader #

SplitReader API 只有三个方法。

  • 一个阻塞获取方法,返回一个 RecordsWithSplitIds
  • 一种非阻塞方法,用于处理拆分变化。
  • 一个非阻塞的唤醒方法,用于唤醒阻塞的获取操作。

SplitReader 只专注于从外部系统读取记录,因此比 SourceReader 简单得多。详情请查看该类的 Java 文档。

SourceReaderBase #

SourceReader 的实现很常见,它做了以下工作。

  • 拥有一个线程池,以阻塞的方式从外部系统的分割处获取数据。
  • 处理内部获取线程和其他方法调用之间的同步,如 pollNext(ReaderOutput) 。
  • 维护每个 split 的水印,以便进行水印对齐。
  • 维护每个分身的状态,以便检查点。

为了减少编写一个新的 SourceReader 的工作, Flink 提供了一个 SourceReaderBase 类作为 SourceReader 的基础实现。SourceReaderBase 开箱即完成了上述所有工作。如果要编写一个新的 SourceReader ,只需要让 SourceReader 实现继承 SourceReaderBase ,填充一些方法,然后实现一个高级的 SplitReader 就可以了。

SplitFetcherManager #

SourceReaderBase 支持一些开箱即用的线程模型,这取决于与之合作的 SplitFetcherManager 的行为。SplitFetcherManager 帮助创建和维护一个 SplitFetcher 池,每个 SplitFetcher 用一个 SplitReader 来获取。它还决定了如何将 split 分配给每个 split fetcher 。

举个例子,如下图所示,一个 SplitFetcherManager 可能有固定数量的线程,每个线程从分配给 SourceReader 的一些 split 中获取。

img

下面的代码片段实现了这个线程模型。

/**
 * A SplitFetcherManager that has a fixed size of split fetchers and assign splits 
 * to the split fetchers based on the hash code of split IDs.
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureNotifier futureNotifier,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(futureNotifier, elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // Create numFetchers split fetchers.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // Group splits by their owner fetchers.
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // Assign the splits to their owner fetcher.
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}

而使用这个线程模型的 SourceReader 可以创建如下。

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureNotifier futureNotifier,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                futureNotifier,
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        futureNotifier,
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Collection<String> finishedSplitIds) {
        // Do something in the callback for the finished splits.
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}

显然, SourceReader 的实现也可以在 SplitFetcherManager 和 SourceReaderBase 之上轻松实现自己的线程模型。

事件时间和水印 #

事件时间分配和水印生成作为数据源的一部分发生。离开源读取器的事件流具有事件时间戳,并且(在流执行期间)包含水印。有关事件时间和水印的介绍,请参见及时流处理

重要事项 基于传统 SourceFunction 的应用程序通常会在后面单独的步骤中通过 stream.assignTimestampsAndWatermarks(WatermarkStrategy) 生成时间戳和水印。这个函数不应该被用于新的源,因为时间戳将被分配,并且它将覆盖之前的分割感知水印。

API #

在 DataStream API 创建期间, WatermarkStrategy 被传递给 Source,并创建 TimestampAssignerWatermarkGenerator

environment.fromSource(
    Source<OUT, ?, ?> source,
    WatermarkStrategy<OUT> timestampsAndWatermarks,
    String sourceName)

TimestampAssigner 和 WatermarkGenerator 作为 ReaderOutput(或 SourceOutput) 的一部分透明地运行,因此源码实现者不必实现任何时间戳提取和水印生成代码。

事件时间戳 #

事件时间戳的分配有两个步骤。

  1. SourceReader 可以通过调用 SourceOutput.collect(event, timestamp) 将源记录时间戳附加到事件上。这只与基于记录且有时间戳的数据源有关,如 Kafka 、 Kinesis 、 Pulsar 或 Pravega 。不基于记录且有时间戳的数据源(如文件)没有源记录时间戳。这一步是源连接器实现的一部分,而不是由使用源的应用程序参数化。

  2. 由应用程序配置的 TimestampAssigner 分配最终的时间戳。TimestampAssigner 看到原始源记录时间戳和事件。分配者可以使用源记录时间戳或访问事件的一个字段获得最终的事件时间戳。

这种两步法允许用户同时引用源系统的时间戳和事件数据中的时间戳作为事件时间戳。

注意:当使用没有源记录时间戳的数据源(如文件),并选择源记录时间戳作为最终的事件时间戳时,事件将得到一个默认的时间戳,等于 LONG_MIN (=-9,223,372,036,854,775,808 )。

水印生成 #

水印生成器仅在流式执行期间激活。批量执行会停用水印生成器;下面描述的所有相关操作都将成为有效的无操作。

数据源 API 支持每次拆分单独运行水印生成器。这使得 Flink 可以单独观察每个分体的事件时间进度,这对于正确处理事件时间偏斜和防止空闲分区拖累整个应用的事件时间进度非常重要。

img

当使用 Split Reader API 实现一个源连接器时,会自动处理这个问题。所有基于 Split Reader API 的实现都具有开箱即用的 split-aware 水印。

对于一个低级别的 SourceReader API 的实现来说,要使用 split-aware 水印的生成,该实现必须将不同的 split 事件输出到不同的输出中: Split-local SourceOutputs 。分割本地输出可以通过 createOutputForSplit(splitId) 和 releaseOutputForSplit(splitId) 方法在主 ReaderOutput 上创建和释放。详情请参考该类和方法的 JavaDocs 。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html