Wait the light to fall

Custom Serializer

焉知非鱼

Custom Serializer

为你的 Flink 程序注册一个自定义的序列器

如果你在 Flink 程序中使用的自定义类型不能被 Flink 类型序列化器序列化,Flink 就会回到使用通用的 Kryo 序列化器。你可以用 Kryo 注册你自己的序列化器或像 Google Protobuf 或 Apache Thrift 这样的序列化系统。要做到这一点,只需在 Flink 程序的 ExecutionConfig 中注册类型类和序列化器。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

请注意,你的自定义序列化器必须扩展 Kryo 的序列化器类。在 Google Protobuf 或 Apache Thrift 的情况下,这已经为你完成了。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

为了使上面的例子有效,你需要在 Maven 项目文件(pom.xml)中加入必要的依赖关系。在依赖关系部分,为 Apache Thrift 添加以下内容。

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-thrift</artifactId>
	<version>0.7.6</version>
	<!-- exclusions for dependency conversion -->
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
	<groupId>org.apache.thrift</groupId>
	<artifactId>libthrift</artifactId>
	<version>0.11.0</version>
	<exclusions>
		<exclusion>
			<groupId>javax.servlet</groupId>
			<artifactId>servlet-api</artifactId>
		</exclusion>
		<exclusion>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</exclusion>
	</exclusions>
</dependency>

对于 Google Protobuf,你需要以下 Maven 依赖。

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.7.6</version>
	<!-- exclusions for dependency conversion -->
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.7.0</version>
</dependency>

请根据需要调整两个库的版本。

使用 Kryo 的 JavaSerializer 的问题。 #

如果你为你的自定义类型注册了 Kryo 的 JavaSerializer,你可能会遇到 ClassNotFoundExceptions,即使你的自定义类型类包含在提交的用户代码 jar 中。这是由于 Kryo 的 JavaSerializer 的一个已知问题,它可能会错误地使用错误的 classloader。

在这种情况下,你应该使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 来代替解决这个问题。这是 Flink 中重新实现的 JavaSerializer,它可以确保使用用户代码类加载器。

更多细节请参考 FLINK-6025

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html