Wait the light to fall

自定义序列化管理状态

焉知非鱼

Custom Serialization for Managed State

本页面的目标是为需要使用自定义状态序列化的用户提供指导,涵盖了如何提供自定义状态序列化器,以及实现允许状态模式演化的序列化器的指南和最佳实践。

如果你只是简单地使用 Flink 自带的序列化器,这个页面是不相关的,可以忽略。

使用自定义状态序列化器 #

当注册一个 managed operator 或 keyed state时,需要一个 StateDescriptor 来指定状态的名称,以及状态的类型信息。类型信息被 Flink 的类型序列化框架用来为状态创建合适的序列化器。

也可以完全绕过这一点,让 Flink 使用自己的自定义序列化器来序列化被管理的状态,只需用自己的 TypeSerializer 实现直接实例化 StateDescriptor 即可。

class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}

val descriptor = new ListStateDescriptor[(String, Integer)](
    "state-name",
    new CustomTypeSerializer)
)

checkpointedState = getRuntimeContext.getListState(descriptor)

状态序列化器和模式演进 #

本节解释了与状态序列化和模式演进相关的面向用户的抽象,以及关于 Flink 如何与这些抽象交互的必要内部细节。

当从保存点恢复时,Flink 允许改变用于读取和写入先前注册状态的序列化器,因此用户不会被锁定在任何特定的序列化模式上。当状态被还原时,将为该状态注册一个新的序列化器(即在还原作业中用于访问状态的 StateDescriptor 所附带的序列化器)。这个新的序列化器可能与之前的序列化器的模式不同。因此,在实现状态序列化器时,除了读取/写入数据的基本逻辑外,另一个需要注意的重要问题是未来如何改变序列化模式。

说到 schema,在这里,这个术语可以互换,指的是状态类型的数据模型和状态类型的序列化二进制格式。一般来说,模式,可以为少数情况而改变。

  1. 状态类型的数据模式发生了变化,即从 POJO 中增加或删除一个作为状态的字段。
  2. 一般来说,数据模式发生变化后,需要升级序列器的序列化格式。
  3. 序列器的配置发生了变化。

为了让新的执行有状态的写入模式的信息,并检测模式是否发生了变化,在对操作符的状态进行保存点时,需要将状态序列器的快照和状态字节一起写入。这就是抽象出来的一个 TypeSerializerSnapshot,在下一小节解释。

TypeSerializerSnapshot 抽象 #

public interface TypeSerializerSnapshot<T> {
    int getCurrentVersion();
    void writeSnapshot(DataOuputView out) throws IOException;
    void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
    TypeSerializer<T> restoreSerializer();
}
public abstract class TypeSerializer<T> {    
    
    // ...
    
    public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}

序列器的 TypeSerializerSnapshot 是一个时间点信息,它作为状态序列器的写模式的唯一真理来源,以及还原一个序列器所必须的任何额外信息,这些信息将与给定的时间点相同。关于在还原时应该写入和读取什么作为序列器快照的逻辑是在 writeSnapshot和readSnapshot 方法中定义的。

请注意,快照本身的写模式也可能需要随着时间的推移而改变(例如,当你希望在快照中添加更多关于序列器的信息时)。为了方便,快照是有版本的,在 getCurrentVersion 方法中定义了当前的版本号。在还原时,当从保存点读取序列器快照时,将向 readSnapshot 方法提供写入快照的模式的版本,以便读取实现可以处理不同的版本。

在还原时,检测新的序列器的模式是否改变的逻辑应该在 resolveSchemaCompatibility 方法中实现。当之前的注册状态在还原执行的操作符中再次注册新的序列化器时,新的序列化器会通过这个方法提供给之前序列化器的快照。该方法返回一个代表兼容性解决结果的 TypeSerializerSchemaCompatibility,它可以是以下之一。

  1. TypeSerializerSchemaCompatibility.compatibleAsIs():这个结果标志着新的序列化器是兼容的,这意味着新的序列化器与之前的序列化器具有相同的模式。有可能在resolveSchemaCompatibility方法中重新配置了新的序列化器,使其兼容。
  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration():这个结果标志着新的序列化器具有不同的序列化模式,可以从旧的模式迁移,使用之前的序列化器(识别旧的模式)将字节读入状态对象,然后用新的序列化器(识别新的模式)将对象重新写回字节。
  3. TypeSerializerSchemaCompatibility.incompatible():这个结果标志着新的序列化器有不同的序列化模式,但不可能从旧模式迁移。

最后一点细节是在需要迁移的情况下,如何获得之前的序列化器。序列化器的 TypeSerializerSnapshot 的另一个重要作用是,它可以作为一个工厂来恢复以前的序列化器。更具体地说,TypeSerializerSnapshot 应该实现 restoreSerializer 方法来实例化一个序列化器实例,该实例能够识别之前序列化器的模式和配置,因此可以安全地读取之前序列化器写入的数据。

总结一下,本节总结了 Flink,或者更具体地说,状态后端如何与抽象进行交互。根据状态后端的不同,交互略有不同,但这与状态序列化器及其序列化器快照的实现是正交的。

离堆状态后端(如 RocksDBStateBackend)

  1. 用具有模式A的状态序列器注册新的状态。
  • 注册的 TypeSerializer 用于在每次状态访问时读取/写入状态。
  • 状态被写入模式A中。
  1. 拍摄一个保存点
  • 序列器快照是通过 TypeSerializer#snapshotConfiguration 方法提取的。
  • 序列器快照被写入保存点,以及已经序列化的状态字节(模式A)。
  1. 恢复的执行用新的状态序列化器重新访问恢复的状态字节,新的状态序列化器具有模式B。
  • 前一个状态序列器的快照被还原。
  • 状态字节在还原时不被反序列化,只被加载回状态后端(因此,仍在模式A中)。
  • 接收到新的序列化器后,通过 TypeSerializer#resolveSchemaCompatibility 提供给被还原的前一个序列化器的快照,检查模式是否兼容。
  1. 将后端中的状态字节从模式A迁移到模式B。
  • 如果兼容性决议反映模式已经改变,并且可以进行迁移,则进行模式迁移。通过 TypeSerializerSnapshot#restoreSerializer(),将从序列化器快照中获取之前识别模式A的状态序列化器,并用于反序列化状态字节到对象,进而用新的序列化器再次重写,识别模式B,完成迁移。在继续处理之前,所有访问状态的条目全部迁移完毕。
  • 如果解析信号为不兼容,则状态访问失败,出现异常。

堆状态后端(如 MemoryStateBackend、FsStateBackend):

  1. 用具有模式A的状态序列器注册新的状态。
  • 注册的 TypeSerializer 由状态后端维护。
  1. 拍摄一个保存点,将所有状态用模式A序列化。
  • 序列器快照是通过 TypeSerializer#snapshotConfiguration 方法提取的。
  • 序列化器快照被写入保存点。
  • 现在状态对象被序列化到保存点,写入模式A中。
  1. 在还原时,将状态反序列化为堆中的对象。
  • 前一个状态序列器的快照被恢复。
  • 通过 TypeSerializerSnapshot#restoreSerializer() 从序列化器快照中获取之前的序列化器,该序列化器识别模式A,用于将状态字节反序列化为对象。
  • 从现在开始,所有的状态都已经被反序列化了。
  1. 恢复后的执行用新的状态序列化器重新访问以前的状态,新的状态序列化器具有模式B。
  • 在接收到新的序列化器后,通过 TypeSerializer#resolveSchemaCompatibility 提供给恢复之前序列化器的快照,以检查模式的兼容性。
  • 如果兼容性检查发出需要迁移的信号,在这种情况下什么都不会发生,因为对于堆后端来说,所有的状态已经被反序列化为对象。
  • 如果解析信号为不兼容,则状态访问失败,出现异常。
  1. 再拍摄一个保存点,将所有状态用模式B序列化。
  • 与步骤2.相同,但现在状态字节都在模式B中。

预先定义方便的 TypeSerializerSnapshot 类 #

Flink 提供了两个抽象的基础 TypeSerializerSnapshot 类,可以用于典型场景。SimpleTypeSerializerSnapshot 和 CompositeTypeSerializerSnapshot。

提供这些预定义快照作为其序列化器快照的序列化器必须始终有自己独立的子类实现。这与不在不同的序列化器之间共享快照类的最佳实践相对应,这将在下一节中得到更详尽的解释。

实现 SimpleTypeSerializerSnapshot #

SimpleTypeSerializerSnapshot 是为没有任何状态或配置的序列化器准备的,本质上意味着序列化器的序列化模式完全由序列化器的类来定义。

当使用 SimpleTypeSerializerSnapshot 作为你的序列化器的快照类时,兼容性解决只有2种可能的结果。

  • TypeSerializerSchemaCompatibility.compatibleAsIs(),如果新的序列化器类保持相同,或
  • TypeSerializerSchemaCompatibility.incompatible(),如果新的序列化器类与之前的序列化器类不同。

下面以 Flink 的 IntSerializer 为例,介绍 SimpleTypeSerializerSnapshot 的使用方法。

public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
    public IntSerializerSnapshot() {
        super(() -> IntSerializer.INSTANCE);
    }
}

IntSerializer 没有状态或配置。序列化格式完全由序列化器类自己定义,只能由另一个 IntSerializer 读取。因此,它适合 SimpleTypeSerializerSnapshot 的使用情况。

SimpleTypeSerializerSnapshot 的基础超级构造函数期望得到一个相应序列器实例的 Supplier,不管快照当前是在还原还是在快照期间写入。该 Supplier 用于创建还原序列化器,以及类型检查,以验证新序列化器是否属于相同的预期序列化器类。

实现 CompositeTypeSerializerSnapshot #

CompositeTypeSerializerSnapshot 是为那些依赖于多个嵌套序列化器的序列化器而设计的。

在进一步解释之前,我们将依赖于多个嵌套序列化器的序列化器称为此上下文中的"外部"序列化器。这方面的例子可以是 MapSerializer、ListSerializer、GenericArraySerializer 等。例如,考虑 MapSerializer –键和值序列化器将是嵌套序列化器,而MapSerializer本身是 “外部 “序列化器。

在这种情况下,外层序列化器的快照也应该包含嵌套序列化器的快照,这样就可以独立检查嵌套序列化器的兼容性。在解决外层序列化器的兼容性时,需要考虑每个嵌套序列化器的兼容性。

提供 CompositeTypeSerializerSnapshot 是为了协助实现这类复合序列器的快照。它处理嵌套序列化器快照的读写,以及考虑到所有嵌套序列化器的兼容性,解析最终的兼容性结果。

下面以 Flink 的 MapSerializer 为例,介绍如何使用 CompositeTypeSerializerSnapshot。

public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {

    private static final int CURRENT_VERSION = 1;

    public MapSerializerSnapshot() {
        super(MapSerializer.class);
    }

    public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
        super(mapSerializer);
    }

    @Override
    public int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
        TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
        return new MapSerializer<>(keySerializer, valueSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
    }
}

当实现一个新的序列器快照作为 CompositeTypeSerializerSnapshot 的子类时,必须实现以下三个方法。

  • #getCurrentOuterSnapshotVersion()。该方法定义了当前外部序列化器快照的序列化二进制格式的版本。
  • #getNestedSerializers(TypeSerializer)。给定外部序列化器,返回其嵌套的序列化器。
  • #createOuterSerializerWithNestedSerializers(TypeSerializer[])。给定嵌套的序列化器,创建一个外部序列化器的实例。

上面的例子是一个 CompositeTypeSerializerSnapshot,除了嵌套的序列化器的快照外,没有额外的信息需要快照。因此,可以预期其外部快照版本永远不需要上报。然而,其他一些序列化器,包含一些额外的静态配置,需要和嵌套的组件序列化器一起持久化。一个例子是 Flink 的 GenericArraySerializer,除了嵌套的元素序列化器之外,它还包含了数组元素类型的类作为配置。

在这些情况下,需要在 CompositeTypeSerializerSnapshot 上实现另外三个方法。

  • #writeOuterSnapshot(DataOutputView):定义如何写入外部快照信息。
  • #readOuterSnapshot(int, DataInputView, ClassLoader):定义如何读取外部快照信息。
  • #resolveOuterSchemaCompatibility(TypeSerializer):根据外部快照信息检查兼容性。

默认情况下,CompositeTypeSerializerSnapshot 假设没有任何外部快照信息可读/可写,因此上述方法的默认实现为空。如果子类有外部快照信息,那么这三个方法必须全部实现。

下面以 Flink 的 GenericArraySerializer 为例,说明 CompositeTypeSerializerSnapshot 如何用于确实有外部快照信息的复合序列器快照。

public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {

    private static final int CURRENT_VERSION = 1;

    private Class<C> componentClass;

    public GenericArraySerializerSnapshot() {
        super(GenericArraySerializer.class);
    }

    public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
        super(genericArraySerializer);
        this.componentClass = genericArraySerializer.getComponentClass();
    }

    @Override
    protected int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected void writeOuterSnapshot(DataOutputView out) throws IOException {
        out.writeUTF(componentClass.getName());
    }

    @Override
    protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
    }

    @Override
    protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
        return (this.componentClass == newSerializer.getComponentClass())
            ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
            : OuterSchemaCompatibility.INCOMPATIBLE;
    }

    @Override
    protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
        return new GenericArraySerializer<>(componentClass, componentSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
    }
}

在上面的代码片段中,有两个重要的事情需要注意。首先,由于这个 CompositeTypeSerializerSnapshot 实现的外快照信息是作为快照的一部分写入的,所以每当外快照信息的序列化格式发生变化时,由 getCurrentOuterSnapshotVersion() 定义的外快照版本必须被上调。

其次,请注意我们在写组件类时避免使用 Java 序列化,只写类名,在读回快照时动态加载。避免使用 Java 序列化来编写序列化器快照的内容,总的来说是一个很好的做法。关于这方面的更多细节将在下一节介绍。

实施说明和最佳实践 #

  1. Flink 通过将序列器快照实例化,恢复序列器快照,其类名为

序列器的快照,是注册状态如何被序列化的唯一真实来源,是读取保存点中状态的入口。为了能够恢复和访问以前的状态,必须能够恢复以前状态序列化器的快照。

Flink 通过首先实例化 TypeSerializerSnapshot 与其类名(与快照字节一起写入)来恢复序列器快照。因此,为了避免受到意外的类名更改或实例化失败, TypeSerializerSnapshot 类应该。

  • 避免被实现为匿名类或嵌套类。
  • 有一个公共的空值构造函数用于实例化。
  1. 避免在不同的序列化器之间共享同一个 TypeSerializerSnapshot 类。

由于模式兼容性检查要通过序列化器快照,让多个序列化器返回同一个 TypeSerializerSnapshot 类作为它们的快照,会使 TypeSerializerSnapshot#resolveSchemaCompatibilityTypeSerializerSnapshot#restoreSerializer() 方法的实现变得复杂。

这也将是一个不好的分离关注点,一个单一序列化器的序列化模式、配置以及如何恢复它,应该合并在自己专门的TypeSerializerSnapshot类中。

  1. 避免使用 Java 序列化来制作序列化器快照内容

在编写持久化的序列化器快照的内容时,完全不应该使用 Java 序列化。例如,一个序列化器需要持久化一个目标类型的类作为其快照的一部分。关于类的信息应该通过写入类名来持久化,而不是直接使用 Java 将类序列化。在读取快照时,会读取类名,并通过名称来动态加载类。

这种做法保证了序列化器快照总是可以安全读取。在上面的例子中,如果类型类是使用 Java 序列化来持久化的,一旦类的实现发生了变化,根据 Java 序列化的具体规定,快照可能不再可读,不再二进制兼容。

本节是一个从 Flink 1.7 之前存在的序列化器和序列化器快照的 API 迁移指南。

在 Flink 1.7 之前,序列化器快照是以 TypeSerializerConfigSnapshot 的形式实现的(现在已经被废弃了,将来最终会被移除,完全被新的 TypeSerializerSnapshot 接口取代)。此外,序列化器模式兼容性检查的责任住在 TypeSerializer 内部,在 TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) 方法中实现。

新旧抽象之间的另一个主要区别是,被废弃的 TypeSerializerConfigSnapshot 不具备实例化之前的序列化器的能力。因此,在你的序列化器仍然返回 TypeSerializerConfigSnapshot 的子类作为它的快照的情况下,序列化器实例本身将总是使用 Java 序列化写入 savepoints,以便在还原时可以使用以前的序列化器。这是很不可取的,因为还原作业是否成功,很容易受到前一个序列化器类的可用性的影响,或者说,一般来说,序列化器实例是否可以在还原时使用 Java 序列化读回。这意味着你的状态只能使用同一个序列化器,一旦你想升级序列化器类或进行模式迁移,可能会出现问题。

为了面向未来,并能灵活地迁移你的状态序列器和模式,强烈建议从旧的抽象中迁移。做到这一点的步骤如下。

  1. 实现 TypeSerializerSnapshot 的新子类。这将是你的序列化器的新快照。
  2. TypeSerializer#snapshotConfiguration() 方法中返回新的 TypeSerializerSnapshot 作为你的 serializer 快照。
  3. 从 Flink 1.7 之前存在的保存点恢复作业,然后再取一个保存点。注意,在这一步,旧的序列化器的 TypeSerializerConfigSnapshot 必须仍然存在于 classpath 中,并且不能删除 TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) 方法的实现。这个过程的目的是将旧保存点中写的 TypeSerializerConfigSnapshot 替换为序列化器新实现的 TypeSerializerSnapshot
  4. 一旦你有一个用 Flink 1.7 拍摄的保存点,保存点将包含 TypeSerializerSnapshot 作为状态序列化器快照,序列化器实例将不再写入保存点中。在这一点上,现在可以安全地删除旧抽象的所有实现(从序列化器中删除旧的 TypeSerializerConfigSnapshot 实现,因为将作为 TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot))。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/custom_serialization.html