Wait the light to fall

数据管道和 ETL

焉知非鱼

Data Pipelines & ETL

对于 Apache Flink 来说,一个非常常见的用例是实现 ETL(提取、转换、加载)管道,从一个或多个源中获取数据,进行一些转换和/或丰富,然后将结果存储在某个地方。在这一节中,我们将看看如何使用 Flink 的 DataStream API 来实现这种应用。

请注意,Flink的 Table 和 SQL API很适合许多 ETL 用例。但无论你最终是否直接使用 DataStream API,对这里介绍的基础知识有一个扎实的理解都是有价值的。

无状态转换 #

本节介绍了 map() 和 flatmap(),它们是用来实现无状态转换的基本操作。本节中的例子假设你熟悉 flink-training 仓库中的实战练习中使用的出租车乘车数据。

map() #

在第一个练习中,你过滤了一个打车事件的流,在同一个代码库中,有一个 GeoUtils 类,它提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),该方法将一个 location (longitude, latitude) 映射到一个网格单元,该单元指的是一个大约100x100米大小的区域。

现在让我们通过为每个事件添加 startCell 和 endCell 字段来丰富我们的打车对象流。你可以创建一个 EnrichedRide 对象,扩展 TaxiRide,添加这些字段。

public static class EnrichedRide extends TaxiRide {
    public int startCell;
    public int endCell;

    public EnrichedRide() {}

    public EnrichedRide(TaxiRide ride) {
        this.rideId = ride.rideId;
        this.isStart = ride.isStart;
        ...
        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
    }

    public String toString() {
        return super.toString() + "," +
            Integer.toString(this.startCell) + "," +
            Integer.toString(this.endCell);
    }
}

然后,您可以创建一个应用程序,将流转化为:

val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(...));

val enrichedNYCRides: DataStream[EnrichedRide]  = rides
    .filter(new RideCleansingSolution.NYCFilter())
    .map(new Enrichment());

enrichedNYCRides.print();

使用这个 MapFunction:

class Enrichment extends MapFunction[TaxiRide, EnrichedRide] {

    override def map(taxiRide: TaxiRide) {
        return new EnrichedRide(taxiRide);
    }
}

flatmap() #

MapFunction 只适用于执行一对一的转换:对于每一个进入的流元素,map() 将发出一个转换后的元素。否则,你将需要使用 flatmap():

val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(...));

val enrichedNYCRides: DataStream[EnrichedRide] = rides
    .flatMap(new NYCEnrichment());

enrichedNYCRides.print();

加上一个 FlatMapFunction:

class NYCEnrichment extends FlatMapFunction[TaxiRide, EnrichedRide] {

    override def flatMap(taxiRide: TaxiRide, out: Collector[EnrichedRide]) {
        val valid: FilterFunction[TaxiRide] = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

通过这个接口提供的 Collector,flatmap() 方法可以随心所欲地发射许多流元素,包括完全不发射元素。

Keyed Streams #

keyBy() #

通常,能够围绕一个属性对一个流进行分区是非常有用的,这样所有具有相同属性值的事件就会被归为一组。例如,假设你想找到从每个网格单元开始的最长的出租车乘车时间。从 SQL 查询的角度考虑,这意味着要对 startCell 进行某种 GROUP BY,而在 Flink 中,这是用 keyBy(KeySelector) 来完成的。

rides
    .flatMap(new NYCEnrichment())
    .keyBy("startCell")

每一个 keyBy 都会引起一次网络洗牌,对流进行重新分区。一般来说,这是很昂贵的,因为它涉及到网络通信以及序列化和反序列化。

img

在上面的例子中,键是由一个字段名 “startCell” 指定的。这种键选择的风格有一个缺点,那就是编译器无法推断用于键选择的字段的类型,因此 Flink 会将键值作为元组传递,这可能会很笨拙。最好是使用一个正确类型的 KeySelector,例如:

rides
    .flatMap(new NYCEnrichment())
    .keyBy(
        new KeySelector<EnrichedRide, int>() {

            @Override
            public int getKey(EnrichedRide enrichedRide) throws Exception {
                return enrichedRide.startCell;
            }
        })

可以用 lambda 更简洁地表达出来。

rides
    .flatMap(new NYCEnrichment())
    .keyBy(enrichedRide -> enrichedRide.startCell)

Keys are computed #

KeySelectors 并不局限于从你的事件中提取一个键,相反,它们可以用任何你想要的方式来计算键,只要产生的键是确定性的,并且有有效的 hashCode()equals() 的实现。这个限制排除了生成随机数,或者返回数组或枚举的 KeySelectors,但是你可以使用元组或 POJOs 来生成复合键,例如,只要它们的元素遵循这些相同的规则。

键必须以确定性的方式产生,因为每当需要它们时,它们就会被重新计算,而不是附加到流记录上。

例如,我们不是创建一个新的 EnrichedRide 类,该类有一个 startCell 字段,然后我们将其用作键:

keyBy(enrichedRide -> enrichedRide.startCell)

相反, 我们可以这样做:

keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed 流的聚合 #

这段代码为每个 end-of-ride 事件创建一个新的元组流,其中包含 startCell 和持续时间(分钟)。

import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

现在可以产生一个流,其中只包含那些对每个 startCell 来说是有史以来(至此)最长的乘车记录。

有多种方式可以表达作为键的字段。之前你看到了一个 EnrichedRide POJO 的例子,在这个例子中,要用作键的字段是用它的名字指定的。这个例子涉及到 Tuple2 对象,元组中的索引(从0开始)被用来指定键。

minutesByStartCell
  .keyBy(0) // startCell
  .maxBy(1) // duration
  .print();

现在,每当持续时间达到一个新的最大值时,输出流就会包含一个针对每个键的记录–如这里的50797单元格所示。

...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

(Implicit) State #

这是本次训练中第一个涉及有状态流的例子。虽然状态被透明地处理,但 Flink 必须跟踪每个不同键的最大持续时间。

每当状态涉及到你的应用时,你应该考虑状态可能会变得多大。每当键空间是无限制的,那么 Flink 需要的状态量也是无限制的。

当处理流时,一般来说,在有限的窗口上考虑聚合比在整个流上考虑更有意义。

reduce() 和其他聚合器 #

上文中使用的 maxBy() 只是 Flink 的 KeyedStreams 上众多聚合函数中的一个例子。还有一个更通用的 reduce() 函数,你可以用它来实现自己的自定义聚合。

状态转换 #

你的应用程序当然能够在没有让 Flink 参与管理状态的情况下使用状态–但 Flink 为它所管理的状态提供了一些引人注目的功能。

  • 本地化。Flink 状态被保存在处理它的机器的本地,并且可以以内存速度被访问。
  • 耐用。Flink 状态是容错的,即每隔一段时间就会自动检查一次,一旦失败就会恢复。
  • 纵向可扩展。Flink 状态可以保存在嵌入式 RocksDB 实例中,通过增加更多的本地磁盘来扩展。
  • 横向可扩展。随着集群的增长和收缩,Flink 状态会被重新分配。
  • 可查询。Flink 状态可以通过可查询状态 API 进行外部查询。

在本节中,您将学习如何使用 Flink 的 API 管理 keyed 状态。

Rich 函数 #

此时你已经看到了 Flink 的几个函数接口,包括 FilterFunctionMapFunctionFlatMapFunction。这些都是单一抽象方法模式的例子。

对于每一个接口,Flink 还提供了一个所谓的"富"变体,例如,RichFlatMapFunction,它有一些额外的方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 在操作符初始化期间被调用一次。这是一个加载一些静态数据的机会,或者, 例如打开一个外部服务的连接。

getRuntimeContext() 提供了对一整套潜在的有趣的东西的访问,但最值得注意的是它是如何创建和访问由 Flink 管理的状态。

一个带有 Keyed State 的例子 #

在这个例子中,想象一下,你有一个事件流,你想去掉重复,所以你只保留每个键的第一个事件。这里有一个应用程序可以做到这一点,使用一个名为 DeduplicatorRichFlatMapFunction:

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

为了达到这个目的,Deduplicator 将需要以某种方式记住,对于每个键来说,是否已经有了该键的事件。它将使用 Flink 的 keyed state 接口来做到这一点。

当你在使用像这样的 keyed 流时,Flink 将为每个被管理的状态项目维护一个键/值存储。

Flink 支持几种不同类型的 keyed state,本例使用的是最简单的一种,即 ValueState。这意味着对于每个键,Flink 将存储一个单一的对象–在本例中,一个类型为 Boolean 的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open 方法通过定义一个 ValueStateDescriptor` 来建立对托管状态的使用。构造函数的参数为这个 keyed state 项指定了一个名称(“keyHasBeenSeen”),并提供了可用于序列化这些对象的信息(在本例中,Types.BOOLEAN)。

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 的运行时会在上下文中查找 key 的这块状态值,只有当它为 null 时,它才会去收集事件到输出。在这种情况下,它还会将 keyHasBeenSeen 更新为 true。

这种访问和更新 key-partitioned 状态的机制可能看起来相当神奇,因为在我们的 Deduplicator 的实现中,key 并不是显式可见的。当 Flink 的运行时调用我们的 RichFlatMapFunctionopen 方法时,没有任何事件,因此那一刻上下文中没有 key。但是当它调用 flatMap 方法时,被处理的事件的 key 对运行时来说是可用的,并在幕后用于确定 Flink 的状态后端中的哪个条目被操作。

当部署到分布式集群时,会有很多这个 Deduplicator 的实例,每个实例将负责整个键空间的一个不相干子集。因此,当你看到一个 ValueState 的单项,如:

ValueState<Boolean> keyHasBeenSeen;

理解这不仅仅是一个单一的布尔值,而是一个分布式的、分片式的、键/值存储。

清除状态 #

上面的例子有一个潜在的问题。如果键的空间是无限制的,会发生什么?Flink 是在某个地方为每一个被使用的不同键存储一个布尔的实例。如果有一个有界的键集,那么这将是很好的,但是在键集以无界的方式增长的应用中,有必要为不再需要的键清除状态。这是通过调用状态对象上的 clear() 来实现的,如:

keyHasBeenSeen.clear()

例如,你可能想在给定键的一段时间不活动后这样做。当你在事件驱动的应用程序一节中学习 ProcessFunction 时,你将看到如何使用 Timer 来实现这一点。

此外,还有一个状态存活时间(TTL)选项,你可以用状态描述符来配置,指定什么时候自动清除陈旧键的状态。

Non-keyed State #

也可以在 non-keyed 的上下文中使用托管状态。这有时被称为 operator state。所涉及的接口有些不同,由于用户定义的函数需要 non-keyed state 是不常见的,所以这里不做介绍。这个功能最常用于源和接收器(sink)的实现。

Connected Streams #

有时不是应用这样的预定义变换:

img

你希望能够动态地改变变换的某些方面–通过流的阈值,或规则,或其他参数。Flink 中支持这种模式的是一种叫做连接流(connected streams)的东西,其中一个 operator 有两个输入流,就像这样:

img

连接流也可以用来实现流式连接(streaming joins.)。

例子 #

在这个例子中,控制流被用来指定必须从 streamOfWords 中过滤掉的单词。一个名为 ControlFunction 的 RichCoFlatMapFunction 被应用到连接的流中来完成这个任务。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
  
    control
        .connect(datastreamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

注意,被连接的两个流必须以兼容的方式进行 keyed。keyBy 的作用是对流的数据进行分区,当 keyed 流连接时,必须以同样的方式进行分区。这样就可以保证两个流中具有相同 key 的事件都会被发送到同一个实例中。那么,这就使得将该键上的两个流连接起来成为可能,例如。

在这种情况下,两个流的类型都是 DataStream[String],并且两个流都以字符串为键。如下所示,这个 RichCoFlatMapFunction 在 keyed state 下存储了一个布尔值,而这个布尔值是由两个流共享的。

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

RichCoFlatMapFunction 是 FlatMapFunction 的一种,它可以应用于一对连接的流,并且它可以访问富函数接口。这意味着它可以被做成有状态的。

屏蔽的(blocked)布尔正在被用来记住控制流上提到的键(在这里是单词),这些词被过滤出 streamOfWords 流。这就是 keyed state,它在两个流之间是共享的,这就是为什么两个流要共享同一个键空间。

flatMap1flatMap2 被 Flink 运行时调用,分别来自两个连接流的元素–在我们的例子中,来自控制流的元素被传入 flatMap1,来自 streamOfWords 的元素被传入 flatMap2。这是由使用 control.connect(datastreamOfWords) 连接两个流的顺序决定的。

重要的是要认识到,你无法控制调用 flatMap1flatMap2 回调的顺序。这两个输入流在相互竞争,Flink 运行时将对来自一个流或另一个流的事件的消耗做它想做的事。在时间和/或顺序很重要的情况下,你可能会发现有必要在托管的 Flink 状态下缓冲事件,直到你的应用程序准备好处理它们。(注意:如果你真的很绝望,可以通过使用实现 InputSelectable 接口的自定义 Operator 来对双输入 operator 消耗输入的顺序进行一些有限的控制。)

实践 #

与本节配套的实践练习是“乘车与票价练习”

进一步阅读 #