为Flink程序注册自定义序列化程序

如果您在Flink程序中使用自定义类型无法通过Flink类型序列化程序进行序列化,则Flink将回退到使用通用Kryo序列化程序。您可以注册自己的序列化程序或序列化系统,如Google Protobuf或Apache Thrift with Kryo。为此,只需在ExecutionConfigFlink程序中注册类型类和序列化程序即可。

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // register the class of the serializer as serializer for a type
  3. env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
  4. // register an instance as serializer for a type
  5. MySerializer mySerializer = new MySerializer();
  6. env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

请注意,您的自定义序列化程序必须扩展Kryo的Serializer类。对于Google Protobuf或Apache Thrift,已经为您完成了这项工作:

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // register the Google Protobuf serializer with Kryo
  3. env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
  4. // register the serializer included with Apache Thrift as the standard serializer
  5. // TBaseSerializer states it should be initialized as a default Kryo serializer
  6. env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

要使上述示例起作用,您需要在Maven项目文件(pom.xml)中包含必要的依赖项。在依赖项部分中,为Apache Thrift添加以下内容:

  1. <dependency>
  2. <groupId>com.twitter</groupId>
  3. <artifactId>chill-thrift</artifactId>
  4. <version>0.5.2</version>
  5. </dependency>
  6. <!-- libthrift is required by chill-thrift -->
  7. <dependency>
  8. <groupId>org.apache.thrift</groupId>
  9. <artifactId>libthrift</artifactId>
  10. <version>0.6.1</version>
  11. <exclusions>
  12. <exclusion>
  13. <groupId>javax.servlet</groupId>
  14. <artifactId>servlet-api</artifactId>
  15. </exclusion>
  16. <exclusion>
  17. <groupId>org.apache.httpcomponents</groupId>
  18. <artifactId>httpclient</artifactId>
  19. </exclusion>
  20. </exclusions>
  21. </dependency>

对于Google Protobuf,您需要以下Maven依赖项:

  1. <dependency>
  2. <groupId>com.twitter</groupId>
  3. <artifactId>chill-protobuf</artifactId>
  4. <version>0.5.2</version>
  5. </dependency>
  6. <!-- We need protobuf for chill-protobuf -->
  7. <dependency>
  8. <groupId>com.google.protobuf</groupId>
  9. <artifactId>protobuf-java</artifactId>
  10. <version>2.5.0</version>
  11. </dependency>

请根据需要调整两个库的版本。

使用Kryo JavaSerializer的问题

如果您JavaSerializer为自定义类型注册Kryo 即使您的自定义类型类包含在提交的用户代码jar中,您也可能遇到ClassNotFoundException。这是由于Kryo的已知问题JavaSerializer,可能会错误地使用错误的类加载器。

在这种情况下,您应该使用它org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer来解决问题。这是JavaSerializer在Flink中重新实现的,确保使用用户代码类加载器。

有关详细信息,请参阅FLINK-6025