数据类型和序列化

Apache Flink以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。本文档描述了它们背后的概念和基本原理。

Flink尝试推断有关在分布式计算期间交换和存储的数据类型的大量信息。可以把它想象成一个推断表格架构的数据库。在大多数情况下,Flink自己无缝地推断所有必要的信息。拥有类型信息允许Flink做一些很酷的事情:

  • 使用POJO类型并通过引用字段名称(如dataSet.keyBy("username"))来分组/Join/聚合它们。类型信息允许Flink提前检查(用于拼写错误和类型兼容性),而不是稍后在运行时失败。

  • Flink对数据类型的了解越多,序列化和数据布局方案就越好。这对于Flink中的内存使用范例非常重要(尽可能处理堆内部/外部的序列化数据并使序列化非常便宜)。

  • 最后,它还使大多数情况下的用户免于担心序列化框架和必须注册类型。

一般来说,在需要有关数据类型的信息飞行前阶段 -也就是,当程序的调用DataStreamDataSet制成,任何调用之前execute()print()count(),或collect()

最常见的问题

用户需要与Flink的数据类型处理进行交互的最常见问题是:

  • 注册子类型:如果函数签名仅描述超类型,但实际上它们在执行期间使用了这些类型的子类型,则可能会大大提高性能,使Flink了解这些子类型。为此,呼吁.registerType(clazz)StreamExecutionEnvironment或者ExecutionEnvironment每个亚型。

  • 注册自定义序列化程序: Flink会回退到Kryo,因为它本身无法透明地处理这些类型。并非所有类型都由Kryo(以及Flink)无缝处理。例如,默认情况下,许多Google Guava集合类型无法正常工作。解决方案是为导致问题的类型注册其他序列化程序。打电话.getConfig().addDefaultKryoSerializer(clazz, serializer)StreamExecutionEnvironmentExecutionEnvironment。许多库中都提供了其他Kryo序列化程序。有关使用自定义序列化程序的详细信息,请参阅自定义序列化程

  • 添加类型提示:有时,尽管有所有技巧,但Flink无法推断泛型类型时,用户必须传递类型提示。这通常只在Java API中是必需的。该类型提示章节描述了更多的细节。

  • 手动创建TypeInformation这对于某些API调用可能是必要的,因为由于Java泛型类型擦除,Flink无法推断数据类型。有关详细信息,请参阅创建TypeInformation或TypeSerializer

TypeInformation是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器,并且在专业化中,可以生成类型的比较器。注意Flink中的比较器不仅仅是定义一个顺序 - 它们基本上是处理键的实用程序

在内部,Flink在类型之间做出以下区分:

  • 基本类型:所有的Java原语及其盒装形式,加voidStringDateBigDecimal,和BigInteger

  • 基元数组和对象数组

  • 复合类型

    • Flink Java Tuples(Flink Java API的一部分):最多25个字段,不支持空字段

    • Scala Case Class(包括Scala元组):最多22个字段,不支持空字段

    • Row:具有任意数量字段的元组并支持空字段

    • POJO:遵循某种类似bean的模式的类

  • 辅助类型(选项,任一,列表,Map,……)

  • 通用类型:这些不会被Flink本身序列化,而是由Kryo序列化。

POJO特别令人感兴趣,因为它们支持复杂类型的创建以及在键的定义中使用字段名称:dataSet.join(another).where("name").equalTo("personName")它们对运行时也是透明的,并且可以由Flink非常有效地处理。

POJO类型的规则

如果满足以下条件,Flink会将数据类型识别为POJO类型(并允许“按名称”字段引用):

  • 该类是公共的和独立的(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态非瞬态字段都是公共的(和非最终的)或者具有公共getter和setter方法,该方法遵循getter和setter的Java bean命名约定。请注意,当用户定义的数据类型无法识别为POJO类型时,必须将其作为GenericType处理并使用Kryo进行序列化。

创建TypeInformation或TypeSerializer

要为类型创建TypeInformation对象,请使用特定于语言的方式:

因为Java通常会擦除泛型类型信息,所以需要将类型传递给TypeInformation构造:

对于非泛型类型,您可以传递类:

  1. TypeInformation<String> info = TypeInformation.of(String.class);

对于泛型类型,您需要通过以下方式“捕获”泛型类型信息TypeHint

  1. TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

在内部,这将创建TypeHint的匿名子类,捕获通用信息以将其保存到运行时。

In Scala, Flink uses macros that runs at compile time and captures all generic type information while it isstill available.

  1. // important: this import is needed to access the 'createTypeInformation' macro function
  2. import org.apache.flink.streaming.api.scala._
  3. val stringInfo: TypeInformation[String] = createTypeInformation[String]
  4. val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

You can still use the same method as in Java as a fallback.

要创建一个TypeSerializer,只需调用typeInfo.createSerializer(config)TypeInformation对象即可。

config参数是类型ExecutionConfig和保存有关该计划的注册的自定义序列化的信息。尽可能尝试将程序传递给ExecutionConfig。您通常可以通过电话DataStreamDataSet通过电话获取getExecutionConfig()在内部函数(如MapFunction)中,您可以通过使函数成为Rich函数和调用来获得它getRuntimeContext().getExecutionConfig()



在Scala API中类型信息

尽管类型清单类标记, Scala对运行时类型信息有非常精细的概念通常,类型和方法可以访问其泛型参数的类型 - 因此,Scala程序不会像Java程序那样遭受类型擦除。

此外,Scala允许通过Scala宏在Scala编译器中运行自定义代码 - 这意味着无论何时编译针对Flink的Scala API编写的Scala程序,都会执行一些Flink代码。

我们使用宏来查看编译期间所有用户函数的参数类型和返回类型 - 这是当然所有类型信息都完全可用的时间点。在宏中,我们为函数的返回类型(或参数类型)创建一个TypeInformation,并使其成为 算子操作的一部分。

证据参数误差无隐含值

在无法创建TypeInformation的情况下,程序无法编译,并显示“无法找到TypeInformation类型的证据参数的隐式值”的错误

如果尚未导入生成TypeInformation的代码的常见原因。确保导入整个flink.api.scala包。

  1. import org.apache.flink.api.scala._

另一个常见原因是通用方法,可以按照以下部分所述进行修复。

通用方法

请考虑以下情况:

  1. def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  2. input.map { v => v._1 }
  3. }
  4. val data : DataSet[(String, Long) = ...
  5. val result = selectFirst(data)

对于这样的通用方法,函数参数和返回类型的数据类型对于每个调用可能不相同,并且在定义方法的站点处不知道。上面的代码将导致错误,即没有足够的隐式证据可用。

在这种情况下,必须在调用站点生成类型信息并将其传递给方法。Scala 为此提供隐式参数

以下代码告诉Scala将T的类型信息带入函数。然后,将在调用方法的站点生成类型信息,而不是在定义方法的位置生成类型信息。

  1. def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  2. input.map { v => v._1 }
  3. }


在Java API中类型信息

在一般情况下,Java会擦除泛型类型信息。Flink尝试使用Java保存的少量位(主要是函数签名和子类信息)通过反射重建尽可能多的类型信息。对于函数的返回类型取决于其输入类型的情况,此逻辑还包含一些简单的类型推断:

  1. public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
  2. public Tuple2<T, Long> map(T value) {
  3. return new Tuple2<T, Long>(value, 1L);
  4. }
  5. }

在某些情况下,Flink无法重建所有通用类型信息。在这种情况下,用户必须通过类型提示帮助

在Java API中类型提示

在Flink无法重建已擦除的泛型类型信息的情况下,Java API提供所谓的类型提示类型提示告诉系统函数生成的数据流或数据集的类型:

  1. DataSet<SomeType> result = dataSet
  2. .map(new MyGenericNonInferrableFunction<Long, SomeType>())
  3. .returns(SomeType.class);

returns语句指定生成的类型,在本例中通过类。提示支持通过类型定义

  • 用于非参数化类型的类(无泛型)
  • TypeHints的形式returns(new TypeHint<Tuple2<Integer, SomeType>>(){})。该TypeHint班可以捕获泛型类型信息,并保存它运行时(通过一个匿名子类)。

Java 8 lambdas的类型提取

Java 8 lambdas的类型提取与非lambdas的工作方式不同,因为lambda不与扩展函数接口的实现类相关联。

目前,Flink试图找出实现lambda的方法,并使用Java通用签名来确定参数类型和返回类型。但是,并非所有编译器都为lambda生成这些签名(从4.5开始,Eclipse JDT编译器只能可靠地编写本文档)。

POJO类型的序列化

PojoTypeInformation正在为POJO中的所有字段创建序列化器。标准类型(如int,long,String等)由我们随Flink提供的序列化程序处理。对于所有其他类型,我们回到Kryo。

如果Kryo无法处理该类型,您可以要求PojoTypeInfo使用Avro序列化POJO。要这样做,你必须打电话

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.getConfig().enableForceAvro();

请注意,Flink会使用Avro序列化程序自动序列化Avro生成的POJO。

如果您希望Kryo序列化程序处理整个 POJO类型,请进行设置

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.getConfig().enableForceKryo();

如果Kryo无法序列化您的POJO,您可以使用自定义序列化程序添加到Kryo

  1. env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

这些方法有不同的变体可供选择。

禁用Kryo回退

在某些情况下,程序可能希望明确避免将Kryo用作泛型类型的回退。最常见的是希望确保通过Flink自己的序列化程序或通过用户定义的自定义序列化程序有效地序列化所有类型。

每当遇到将通过Kryo的数据类型时,下面的设置将引发异常:

  1. env.getConfig().disableGenericTypes();

使用工厂定义类型信息

类型信息工厂允许将用户定义的类型信息插入Flink类型系统。您必须实现org.apache.flink.api.common.typeinfo.TypeInfoFactory以返回自定义类型信息。如果相应的类型已使用@org.apache.flink.api.common.typeinfo.TypeInfo注释进行注释,则在类型提取阶段调用工厂

类型信息工厂可以在Java和Scala API中使用。

在类型的层次结构中,在向上遍历时将选择最接近的工厂,但是,内置工厂具有最高优先级。工厂的优先级也高于Flink的内置类型,因此您应该知道自己在做什么。

以下示例说明如何MyTuple使用Java中的工厂注释自定义类型并为其提供自定义类型信息。

带注释的自定义类型:

  1. @TypeInfo(MyTupleTypeInfoFactory.class)
  2. public class MyTuple<T0, T1> {
  3. public T0 myfield0;
  4. public T1 myfield1;
  5. }

工厂提供自定义类型信息:

  1. public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
  2. @Override
  3. public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
  4. return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  5. }
  6. }

该方法createTypeInfo(Type, Map<String, TypeInformation<?>>)为工厂所针对的类型创建类型信息。参数提供有关类型本身的其他信息以及类型的泛型类型参数(如果可用)。

如果您的类型包含可能需要从Flink函数的输入类型派生的泛型参数,请确保还实现org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters泛型参数到类型信息的双向映射。