Wait the light to fall

数据类型和序列化

焉知非鱼

Datatypes Serialization

数据类型和序列化

Apache Flink 以独特的方式处理数据类型和序列化,包含自己的类型描述符、通用类型提取和类型序列化框架。本文档描述了这些概念和它们背后的原理。

支持的数据类型 #

Flink 对 DataSet 或 DataStream 中的元素类型进行了一些限制。这样做的原因是系统分析类型以确定高效的执行策略。

有七种不同类别的数据类型。

  1. Java Tuples 和 Scala Case 类
  2. Java POJOs
  3. Primitive Types
  4. Regular Classes
  5. Values
  6. Hadoop Writables
  7. 特殊类型

Tuples 和 Case 类 #

Scala case 类(以及 Scala tuples,它是 case 类的一种特殊类型),是包含固定数量的各种类型的字段的复合类型。元组字段由它们的 1-offset 名称寻址,例如第一个字段为 _1。case 类字段用它们的名字来访问。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs #

如果 Java 和 Scala 类满足以下要求,Flink 会将其作为一种特殊的 POJO 数据类型。

  • 类必须是公共的。

  • 它必须有一个没有参数的公共构造函数(默认构造函数)。

  • 所有字段要么是公共的,要么必须通过 getter 和 setter 函数来访问。对于一个名为 foo 的字段,getter 和 setter 方法必须命名为 getFoo() setFoo()

  • 字段的类型必须由注册的序列器支持。

POJOs 通常用 PojoTypeInfo 表示,并用 PojoSerializer 序列化(使用 Kryo 作为可配置的回退)。例外的情况是当 POJOs 实际上是 Avro 类型(Avro 特定记录)或作为 “Avro 反射类型 “产生。在这种情况下,POJO 由 AvroTypeInfo 表示,并通过 AvroSerializer 序列化。如果需要,您也可以注册您自己的自定义序列化器;更多信息请参见序列化

Flink 分析 POJO 类型的结构,也就是说,它学习 POJO 的字段。因此,POJO 类型比一般类型更容易使用。此外,Flink 可以比一般类型更有效地处理 POJO。

下面的例子显示了一个简单的 POJO,它有两个公共字段。

class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

原始类型 #

Flink 支持所有的 Java 和 Scala 基元类型,如 Integer, String 和 Double。

通用类类型 #

Flink 支持大多数 Java 和 Scala 类(API 和自定义)。限制适用于包含不能序列化的字段的类,如文件指针、I/O 流或其他本地资源。遵循 Java Beans 约定的类一般都能很好地工作。

所有没有被确定为 POJO 类型的类(见上面的 POJO 要求)都被 Flink 作为一般类类型处理。Flink 将这些数据类型视为黑盒,无法访问它们的内容(例如,为了有效的排序)。一般类型使用序列化框架 Kryo 进行去/序列化。

值 #

Value 类型手动描述它们的序列化和反序列化。它们不需要通过一个通用的序列化框架,而是通过实现 org.apache.flinktypes.Value 接口的读写方法,为这些操作提供自定义代码。当通用序列化会非常低效时,使用 Value 类型是合理的。一个例子是一个数据类型,它将一个稀疏的元素向量实现为一个数组。知道数组大部分是零,就可以对非零元素使用特殊的编码,而通用序列化会简单地写入所有数组元素。

org.apache.flinktypes.CopyableValue 接口以类似的方式支持手动内部克隆逻辑。

Flink 自带了预先定义的 Value 类型,对应基本数据类型。ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue)。这些 Value 类型作为基本数据类型的可变体。它们的值可以被改变,允许程序员重用对象并减轻垃圾收集器的压力。

Hadoop 可写类型 #

你可以使用实现 org.apache.hadoop.Writable 接口的类型。在 write()readFields() 方法中定义的序列化逻辑将被用于序列化。

特殊类型 #

你可以使用特殊类型,包括 Scala 的 Either、Option 和 Try。Java API 对 Either 有自己的自定义实现。类似于 Scala 的 Either,它代表了两种可能的类型的值,左或右。Either 对于错误处理或需要输出两种不同类型记录的操作符来说非常有用。

类型擦除和类型推断。 #

注意:本节只与 Java 相关。

Java 编译器在编译后会丢弃很多通用类型信息。这在 Java 中被称为类型清除。这意味着在运行时,一个对象的实例不再知道它的通用类型。例如,DataStream<String>DataStream<Long> 的实例在 JVM 看来是一样的。

Flink 在准备执行程序时(调用程序的主方法时)需要类型信息。Flink Java API 试图重建以各种方式扔掉的类型信息,并将其显式存储在数据集和运算符中。你可以通过 DataStream.getType() 来检索类型。该方法返回一个 TypeInformation 的实例,这是 Flink 内部表示类型的方式。

类型推理有其局限性,在某些情况下需要程序员的 “配合”。例如从集合中创建数据集的方法,如 ExecutionEnvironment.fromCollection(),你可以传递一个描述类型的参数。但是像 MapFunction<I, O> 这样的通用函数也可能需要额外的类型信息。

ResultTypeQueryable 接口可以由输入格式和函数实现,以明确地告诉 API 它们的返回类型。函数被调用的输入类型通常可以通过前面操作的结果类型来推断。

Flink 试图推断出很多关于分布式计算过程中交换和存储的数据类型的信息。把它想象成一个数据库,推断表的模式。在大多数情况下,Flink 自己就能无缝地推断出所有必要的信息。有了类型信息,Flink 就可以做一些很酷的事情。

  • Flink 对数据类型了解得越多,序列化和数据布局方案就越好。这对于 Flink 中的内存使用范式相当重要(尽可能在堆内/堆外对序列化数据进行工作,并使序列化非常便宜)。

  • 最后,在大多数情况下,这也免去了用户对序列化框架的担心,也免去了对类型的注册。

一般来说,关于数据类型的信息是在飞行前阶段需要的–也就是说,当程序对 DataStream 和 DataSet 进行调用时,以及在对 execute()print()count()collect() 进行任何调用之前。

最常见的问题 #

用户最经常需要与 Flink 的数据类型处理进行交互的问题是。

  • 注册子类型。如果函数签名只描述了超类型,但它们在执行过程中实际使用了这些子类型,那么让 Flink 意识到这些子类型可能会提高很多性能。为此,可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 上为每个子类型调用 .registerType(clazz)

  • 注册自定义序列器。Flink 对于那些自己不透明处理的类型又回到了 Kryo。并非所有类型都能被 Kryo 无缝处理(因此也能被 Flink 处理)。例如,许多 Google Guava 集合类型在默认情况下不能很好地工作。解决的办法是为导致问题的类型注册额外的序列器。在 StreamExecutionEnvironment 或 ExecutionEnvironment 上调用.getConfig().addDefaultKryoSerializer( clazz, serializer)。许多库中都有额外的 Kryo 序列化器。有关使用自定义序列器的更多细节,请参见自定义序列器

  • 添加类型提示。有时,当 Flink 尽管使用了所有技巧也无法推断出通用类型时,用户必须传递一个类型提示。这一般只在 Java API 中才需要。类型提示一节对此进行了更详细的描述。

  • 手动创建一个 TypeInformation。对于一些 API 调用来说,这可能是必要的,由于 Java 的通用类型擦除,Flink 不可能推断出数据类型。详情请看创建 TypeInformation 或 TypeSerializer

TypeInformation 类是所有类型描述符的基础类。它揭示了类型的一些基本属性,并且可以生成序列器,在特殊化中,可以生成类型的比较器。(注意,Flink 中的比较器的作用远不止定义一个顺序–它们基本上是处理键的实用程序)

在内部,Flink 对类型做了如下区分。

  • 基本类型。所有的 Java 基元和它们的盒子形式,加上 void,String,Date,BigDecimal 和 BigInteger。

  • 基元数组和对象数组:基本类型:所有的 Java 基元和它们的盒子形式,加上 void、String、Date、BigDecimal 和 BigInteger。

  • 复合型

    • Flink Java Tuples(Flink Java API 的一部分):最多 25 个字段,不支持 null 字段。
    • Scala case 类(包括 Scala tuples):不支持 null 字段。
    • Row:具有任意数量字段的元组,支持 null 字段。
    • POJOs:遵循某种 bean-like 模式的类。
  • 辅助类型(Option、Either、Lists、Maps…)

  • 通用类型。这些类型不会由 Flink 本身序列化,而是由 Kryo 序列化。

POJOs 特别值得关注,因为它们支持创建复杂类型和在键的定义中使用字段名:dataSet.join(another).where("name").equalTo("personName")。它们对运行时也是透明的,可以被 Flink 非常有效地处理。

POJO 类型的规则 #

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许 “按名称 “字段引用)。

  • 类是公共的和独立的(没有非静态的内部类)。
  • 该类有一个公共的无参数构造函数。
  • 该类(以及所有超级类)中所有非静态、非瞬态的字段要么是公共的(是非最终的),要么有一个公共的 getter-setter- 方法,遵循 Java beans 中 getter 和 setter 的命名惯例。

请注意,当用户定义的数据类型不能被识别为 POJO 类型时,它必须被处理为 GenericType 并通过 Kryo 进行序列化。

创建一个 TypeInformation 或 TypeSerializer。 #

要为一个类型创建 TypeInformation 对象,请使用语言特定的方式。

在 Scala 中,Flink 使用在编译时运行的宏,并捕捉所有通用类型信息,而它仍然可用。

// important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

你仍然可以使用与 Java 中相同的方法作为后备。

要创建一个 TypeSerializer,只需在 TypeInformation 对象上调用 typeInfo.createSerializer(config)。

config 参数的类型是 ExecutionConfig,并持有程序注册的自定义序列器的信息。在可能的情况下,尽量传递程序的正确的 ExecutionConfig。你通常可以通过调用 getExecutionConfig() 从 DataStream 或 DataSet 中获取它。在函数内部(比如 MapFunction),你可以通过将函数变成 Rich Function,然后调用 getRuntimeContext().getExecutionConfig() 来获取它。

Scala API 中的类型信息 #

Scala 通过类型清单和类标签对运行时类型信息有非常详细的概念。一般来说,类型和方法可以访问其通用参数的类型–因此,Scala 程序不会像 Java 程序那样受到类型擦除的影响。

此外,Scala 允许通过 Scala Macros 在 Scala 编译器中运行自定义代码–这意味着每当你编译一个针对 Flink 的 Scala API 编写的 Scala 程序时,一些 Flink 代码就会被执行。

我们在编译过程中使用宏来查看所有用户函数的参数类型和返回类型–这时当然所有的类型信息都是完全可用的。在宏中,我们为函数的返回类型(或参数类型)创建一个 TypeInformation,并将其作为操作的一部分。

没有隐式值的证据参数错误 #

在 TypeInformation 不能被创建的情况下,程序编译失败,并出现 “could not find implicit value for evidence parameter of type TypeInformation” 的错误。

一个常见的原因是生成 TypeInformation 的代码没有被导入。请确保导入整个 flink.api.scala 包。

import org.apache.flink.api.scala._

另一个常见的原因是通用方法,它可以在下面的章节中进行修复。

通用方法 #

请考虑以下案例。

def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

val data : DataSet[(String, Long) = ...

val result = selectFirst(data)

对于这样的通用方法,每次调用时,函数参数和返回类型的数据类型可能不一样,在定义方法的站点不知道。上面的代码会导致一个错误,即没有足够的隐含证据。

在这种情况下,必须在调用站点生成类型信息并传递给方法。Scala 为此提供了隐式参数。

下面的代码告诉 Scala 将 T 的类型信息带入函数中。然后,类型信息将在方法被调用的站点生成,而不是在方法被定义的站点生成。

def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

Java API 中的类型信息 #

在一般情况下,Java 会擦除通用类型信息,而 Flink 试图通过反射来重建尽可能多的类型信息,使用 Java 保留的少量信息(主要是函数签名和子类信息)。Flink 试图通过反射来重建尽可能多的类型信息,使用 Java 保留的少量信息(主要是函数签名和子类信息)。这个逻辑还包含了一些简单的类型推理,用于函数的返回类型取决于其输入类型的情况。

public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {

    public Tuple2<T, Long> map(T value) {
        return new Tuple2<T, Long>(value, 1L);
    }
}

在有些情况下,Flink 无法重建所有的通用类型信息。在这种情况下,用户必须通过类型提示来帮忙。

Java API 中的类型提示 #

在 Flink 无法重建被擦除的通用类型信息的情况下,Java API 提供了所谓的类型提示。类型提示告诉系统一个函数产生的数据流或数据集的类型。

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

returns 语句指定产生的类型,在本例中是通过一个类。提示支持类型定义,通过:

  • 类,适用于非参数化类型(无属类)
  • TypeHint 以 returns(new TypeHint<Tuple2<Integer, SomeType>>(){}) 的形式存在。TypeHint 类可以捕获通用类型信息,并将其保存到运行时(通过匿名子类)。

Java 8 lambdas 的类型提取。 #

Java 8 lambdas 的类型提取与非 lambdas 的工作方式不同,因为 lambdas 不与扩展函数接口的实现类相关联。

目前,Flink 试图找出哪个方法实现了 lambda,并使用 Java 的通用签名来确定参数类型和返回类型。然而,并不是所有的编译器都能为 lambdas 生成这些签名(在写这篇文档时,只有 4.5 以后的 Eclipse JDT 编译器能可靠地生成)。

POJO 类型的序列化 #

PojoTypeInfo 正在为 POJO 内部的所有字段创建序列器。标准类型,如 int、long、String 等,由 Flink 附带的序列器处理。对于所有其他类型,我们回到了 Kryo

如果 Kryo 不能处理类型,你可以要求 PojoTypeInfo 使用 Avro 来序列化 POJO。要做到这一点,你必须调用

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

请注意,Flink 是用 Avro 序列化器自动序列化 Avro 生成的 POJO。

如果您想让整个 POJO 类型被 Kryo 序列化器处理,请设置

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

如果 Kryo 无法序列化你的 POJO,你可以在 Kryo 中添加一个自定义序列化器,使用

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

这些方法有不同的变体。

禁用 Kryo Fallback #

有些情况下,程序可能希望明确避免使用 Kryo 作为通用类型的后备。最常见的情况是希望确保所有类型都能通过 Flink 自己的序列化器或通过用户定义的自定义序列化器进行有效序列化。

下面的设置会在遇到会通过 Kryo 的数据类型时引发异常。

env.getConfig().disableGenericTypes();

使用工厂定义类型信息 #

类型信息工厂允许将用户定义的类型信息插入到 Flink 类型系统中。你必须实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory 来返回你的自定义类型信息。如果相应的类型已经被 @org.apache.flink.api.common.typeinfo.TypeInfo 注解,那么在类型提取阶段就会调用这个工厂。

类型信息工厂可以在 Java 和 Scala API 中使用。

在类型的层次结构中,在向上遍历时将选择最接近的工厂,然而,内置工厂具有最高的优先级。工厂也比 Flink 的内置类型有更高的优先级,因此你应该知道你在做什么。

下面的例子展示了如何在 Java 中使用工厂来注释一个自定义类型 MyTuple 并为其提供自定义类型信息。

注解的自定义类型:

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
  public T0 myfield0;
  public T1 myfield1;
}

工厂供应自定义类型信息:

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {

  @Override
  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
    return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

方法 createTypeInfo(Type, Map<String, TypeInformation<?>) 为工厂的目标类型创建类型信息。参数提供了关于类型本身的附加信息,以及类型的通用类型参数(如果可用)。

如果你的类型包含了可能需要从 Flink 函数的输入类型中导出的通用参数,请确保同时实现 org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters 来实现通用参数到类型信息的双向映射。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/types_serialization.html