在管道的引擎盖下看

介绍

将不同的变换器和预测器链接在一起的能力是任何机器学习(ML)库的重要特征。在FlinkML中,我们希望提供直观的API,同时利用Scala语言的函数来提供管道的类型安全实现。我们希望实现的是一个易于使用的API,它可以保护用户免受飞行前(作业启动前)时间的类型错误的影响,从而消除了长时间运行的作业被提交到集群只是为了看到它们失败的情况通常在ML管道中发生的一系列数据转换中的一些错误。

在本指南中,我们将描述我们在FlinkML中实现可链接变换器和预测器时所做的选择,并提供有关开发人员如何创建自己的算法以利用这些函数的指南。

什么和为什么

那么“ML管道”是什么意思呢?ML上下文中的流水线可以被认为是具有一些数据作为输入的 算子操作链,对该数据执行许多转换,然后输出转换的数据,或者用作预测函数的输入(特征) ,例如学习模型,或者只是输出转换后的数据本身,用于其他一些任务。最终学习者当然也可以成为管道的一部分。ML管道通常可以是复杂的 算子操作集(深入解释),并且可能成为端到端学习系统的错误源。

然后,ML管道的目的是创建一个框架,可用于管理这些 算子操作链引入的复杂性。管道应该使开发人员能够轻松定义可应用于训练数据的链式转换,以便创建将用于训练学习模型的最终特征,然后执行相同的转换集,就像无标记一样容易(测试数据。管道还应简化这些 算子操作链上的交叉验证和模型选择。

最后,通过确保管道链中的连续链接“合在一起”,我们还避免了代价高昂的类型错误。由于管道中的每个步骤都可能是计算量很大的 算子操作,因此我们希望避免运行流水线作业,除非我们确定管道中的所有输入/输出对“都适合”。

FlinkML中的管道

可以在ml.pipeline包中找到FlinkML中管道的构建块FlinkML如下启发的API sklearn这意味着我们有EstimatorTransformerPredictor接口。对于深入看看sklearn API的设计,有兴趣的读者可以参考这个文件。简而言之,它Estimator继承TransformerPredictor继承的基类Estimator定义fit方法,并Transformer定义transform方法并Predictor定义predict方法。

执行模型的实际训练fit方法Estimator,例如在线性回归任务中找到正确的权重,或者在特征缩放器中找到数据的均值和标准偏差。正如命名所显示的那样,实现的类Transformer是变换 算子操作,如缩放输入Predictor实现是学习算法,如多元线性回归可以通过将多个变换器链接在一起来创建管道,并且管道中的最终链接可以是预测变量或另一个变换器。以Predictor结尾的管道无法再进行链接。下面是如何形成管道的示例:

  1. // Training data
  2. val input: DataSet[LabeledVector] = ...
  3. // Test data
  4. val unlabeled: DataSet[Vector] = ...
  5. val scaler = StandardScaler()
  6. val polyFeatures = PolynomialFeatures()
  7. val mlr = MultipleLinearRegression()
  8. // Construct the pipeline
  9. val pipeline = scaler
  10. .chainTransformer(polyFeatures)
  11. .chainPredictor(mlr)
  12. // Train the pipeline (scaler and multiple linear regression)
  13. pipeline.fit(input)
  14. // Calculate predictions for the testing data
  15. val predictions: DataSet[LabeledVector] = pipeline.predict(unlabeled)

正如我们所提到的,FlinkML管道是类型安全的。如果我们试图将类型输出的变压器链接A到另一个具有类型输入的变压器,B那么如果A!= 我们将在飞行前时间得到错误BFlinkML通过使用Scala的含义实现了这种类型安全性。

Scala隐式

如果您不熟悉Scala的含义,我们可以推荐Martin Odersky的“Scala编程” 摘录简而言之,隐式转换通过提供从一种类型到另一种类型的转换来允许Scala中的ad-hoc多态,并且隐式值为编译器提供可以通过隐式参数提供给函数调用的默认值。隐式转换和隐式参数的组合使我们能够以类型安全的方式链接转换和预测 算子操作。

算子操作

正如我们所提到的,trait(抽象类)Estimator定义了一个fit方法。该方法有两个参数列表(即一个curried函数)。第一个参数列表采用输入(训练)DataSet和估计器的参数。第二个参数列表采用一个implicit类型的参数FitOperationFitOperation是一个定义fit方法的类,这是应该实现训练具体估计器的实际逻辑的地方。fit方法Estimator基本上是围绕拟合方法的打包FitOperation所述predict的方法Predictor和所述transform的方法Transform被设计成在类似的方式,与相应的 算子操作类。

在这些方法中, 算子操作对象作为隐式参数提供。Scala将在类型的伴随对象中查找implicits,因此实现这些接口的类应该将这些对象作为隐藏对象提供在伴随对象中。

作为一个例子,我们可以看一下这个StandardScaler课程。StandardScaler延伸Transformer,所以它可以访问它fittransform函数。这两个函数分别伴随对象的方法方法提供对象FitOperationTransformOperation隐式参数,通过fittransformStandardScalertransformVectorsfitVectorStandardScaler

  1. class StandardScaler extends Transformer[StandardScaler] {
  2. ...
  3. }
  4. object StandardScaler {
  5. ...
  6. implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] {
  7. override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T])
  8. : Unit = {
  9. ...
  10. }
  11. implicit def transformVectors[T <: Vector: VectorConverter: TypeInformation: ClassTag] = {
  12. new TransformOperation[StandardScaler, T, T] {
  13. override def transform(
  14. instance: StandardScaler,
  15. transformParameters: ParameterMap,
  16. input: DataSet[T])
  17. : DataSet[T] = {
  18. ...
  19. }
  20. }

请注意,StandardScaler不能覆盖fit的方法Estimatortransform方法Transformer更确切地说,它的的实现FitOperationTransformOperation覆盖其各自的fittransform方法,其然后由被叫fittransform方法EstimatorTransformer类似地,实现的类Predictor应该PredictOperation在其伴随对象中定义隐式对象。

类型和类型安全

除了fittransform运营,我们上面列出的StandardScaler还提供fittransform类型的输入 算子操作LabeledVector这允许我们将算法用于标记或未标记的输入,这将自动发生,具体取决于我们为拟合和变换 算子操作提供的输入类型。编译器选择正确的隐式 算子操作,具体取决于输入类型。

如果我们尝试使用不受支持的类型调用fittransform方法,我们将在启动作业之前收到运行时错误。虽然在编译时也可以捕获这些类型的错误,但是我们能够为用户提供的错误消息的信息量会少得多,这就是我们选择抛出运行时异常的原因。

链接

链接是通过调用chainTransformer实现chainPredictor的类的对象来实现的Transformer这些方法分别返回一个ChainedTransformer一个ChainedPredictor对象。正如我们所提到的,ChainedTransformer对象可以进一步链接,而ChainedPredictor对象则不能。这些类负责为一对连续的变压器或变压器和预测器应用拟合,变换和预测 算子操作。他们还充当递归如果链的长度是大于二,因为每个ChainedTransformer定义了一个transformfit算子操作可以与多个变压器或一个预测器被进一步链接。

值得注意的是,开发人员和用户在实现算法时无需担心链接,所有这些都由FlinkML自动处理。

如何实现管道 算子

为了支持FlinkML的流水线 算子操作,算法必须遵循某种设计模式,我们将在本节中对其进行描述。假设我们想要实现一个改变数据均值的管道 算子。由于居中数据是许多分析管道中常见的预处理步骤,因此我们将其实现为Transformer因此,我们首先创建一个MeanTransformer继承自Transformer

  1. class MeanTransformer extends Transformer[MeanTransformer] {}

由于我们希望能够配置结果数据的均值,因此我们必须添加配置参数。

  1. class MeanTransformer extends Transformer[MeanTransformer] {
  2. def setMean(mean: Double): this.type = {
  3. parameters.add(MeanTransformer.Mean, mean)
  4. this
  5. }
  6. }
  7. object MeanTransformer {
  8. case object Mean extends Parameter[Double] {
  9. override val defaultValue: Option[Double] = Some(0.0)
  10. }
  11. def apply(): MeanTransformer = new MeanTransformer
  12. }

参数在变换器类的伴随对象中定义并扩展Parameter该类。由于参数实例应该作为参数映射的不可变键,因此它们应该实现为case objects如果此组件的用户未设置其他值,则将使用默认值。如果没有指定默认值,意味着defaultValue = None该算法必须相应地处理这种情况。

我们现在可以实例化一个MeanTransformer对象并设置转换数据的平均值。但我们仍然必须实现转型的运作方式。工作流程可分为两个阶段。在第一阶段,变压器学习给定训练数据的平均值。然后可以在第二阶段中使用该知识来相对于配置的结果平均值变换所提供的数据。

平均值的学习可以在fit我们Transformer继承 算子操作中实现Estimator在该fit 算子操作中,针对给定的训练数据训练管道组件。然而,该算法不是通过覆盖该fit方法而是通过提供对应FitOperation于正确类型的实现来实现的看一下该fit方法的定义Estimator,即父类Transformer,揭示了为什么会出现这种情况。

  1. trait Estimator[Self] extends WithParameters with Serializable {
  2. that: Self =>
  3. def fit[Training](
  4. training: DataSet[Training],
  5. fitParameters: ParameterMap = ParameterMap.Empty)
  6. (implicit fitOperation: FitOperation[Self, Training]): Unit = {
  7. FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
  8. fitOperation.fit(this, fitParameters, training)
  9. }
  10. }

我们看到该fit方法是使用类型的输入数据集Training,可选参数列表和第二个参数列表调用的,其中隐式参数类型FitOperation在函数体内,首先注册一些机器学习类型,然后调用参数fit方法FitOperation实例将自身,参数图和训练数据集作为方法的参数。因此,所有的程序逻辑都发生在FitOperation

FitOperation有两个类型参数。第一个定义管道 算子类型,它FitOperation应该工作,第二个类型参数定义数据集数据元的类型。如果我们首先想要实现这个MeanTransformer工作DenseVector,那么我们就必须提供一个实现FitOperation[MeanTransformer, DenseVector]

  1. val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
  2. override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
  3. import org.apache.flink.ml.math.Breeze._
  4. val meanTrainingData: DataSet[DenseVector] = input
  5. .map{ x => (x.asBreeze, 1) }
  6. .reduce{
  7. (left, right) =>
  8. (left._1 + right._1, left._2 + right._2)
  9. }
  10. .map{ p => (p._1/p._2).fromBreeze }
  11. }
  12. }

A FitOperation[T, I]具有使用fit类型实例T,参数映射和输入调用方法DataSet[I]在我们的案例T=MeanTransformerI=DenseVector如果我们的拟合步骤取决于在创建时未直接给出的某些参数值,则参数映射是必需的TransformerFitOperation所述的MeanTransformer和的DenseVector给定的输入数据设置的实例,并除以矢量的总数的结果。这样,我们获得了一个DataSet[DenseVector]具有平均值的单个数据元。

但是如果仔细观察实现,我们会发现平均计算的结果永远不会存储在任何地方。如果我们想在后面的步骤中使用这些知识来调整其他输入的平均值,我们必须保持它。这里是哪里类型的参数,MeanTransformer这是考虑到该fit方法的用武之地。我们可以使用此实例来存储状态,该状态由transform对同一对象起作用的后续 算子操作使用但首先我们必须MeanTransformer通过成员字段进行扩展,然后调整FitOperation实现。

  1. class MeanTransformer extends Transformer[Centering] {
  2. var meanOption: Option[DataSet[DenseVector]] = None
  3. def setMean(mean: Double): Mean = {
  4. parameters.add(MeanTransformer.Mean, mu)
  5. }
  6. }
  7. val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
  8. override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
  9. import org.apache.flink.ml.math.Breeze._
  10. instance.meanOption = Some(input
  11. .map{ x => (x.asBreeze, 1) }
  12. .reduce{
  13. (left, right) =>
  14. (left._1 + right._1, left._2 + right._2)
  15. }
  16. .map{ p => (p._1/p._2).fromBreeze })
  17. }
  18. }

如果我们查看transform方法Transformer,我们将看到我们还需要一个实现TransformOperation可能的平均转换实现可能如下所示。

  1. val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] {
  2. override def transform(
  3. instance: MeanTransformer,
  4. transformParameters: ParameterMap,
  5. input: DataSet[DenseVector])
  6. : DataSet[DenseVector] = {
  7. val resultingParameters = parameters ++ transformParameters
  8. val resultingMean = resultingParameters(MeanTransformer.Mean)
  9. instance.meanOption match {
  10. case Some(trainingMean) => {
  11. input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean")
  12. }
  13. case None => throw new RuntimeException("MeanTransformer has not been fitted to data.")
  14. }
  15. }
  16. }
  17. class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] {
  18. var trainingMean: DenseVector = null
  19. override def open(parameters: Configuration): Unit = {
  20. trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
  21. }
  22. override def map(vector: DenseVector): DenseVector = {
  23. import org.apache.flink.ml.math.Breeze._
  24. val result = vector.asBreeze - trainingMean.asBreeze + resultingMean
  25. result.fromBreeze
  26. }
  27. }

现在我们已经实现了所有实现,以适应实例MeanTransformer的训练数据集DenseVector并对其进行转换。但是,当我们执行fit 算子操作时

  1. val trainingData: DataSet[DenseVector] = ...
  2. val meanTransformer = MeanTransformer()
  3. meanTransformer.fit(trainingData)

我们在运行时收到以下错误:"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"原因是Scala编译器无法FitOperation为方法的隐式参数找到具有正确类型参数的拟合fit因此,它选择了一个回退隐式值,它在运行时为您提供此错误消息。为了使编译器能够识别我们的实现,我们必须将其定义为隐式值并将其放在MeanTransformer's伴随对象的范围内

  1. object MeanTransformer{
  2. implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ...
  3. implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ...
  4. }

现在,我们可以调用fittransform我们的MeanTransformerDataSet[DenseVector]作为输入。此外,我们现在可以将此变换器用作分析管道的一部分,其中我们具有DenseVector输入和预期输出。

  1. val trainingData: DataSet[DenseVector] = ...
  2. val mean = MeanTransformer.setMean(1.0)
  3. val polyFeatures = PolynomialFeatures().setDegree(3)
  4. val pipeline = mean.chainTransformer(polyFeatures)
  5. pipeline.fit(trainingData)

值得注意的是,启用链接不需要额外的代码。系统使用各个组件的 算子操作自动构建管道逻辑。

到目前为止一切正常DenseVector但是,如果我们LabeledVector改用变压器,会发生什么

  1. val trainingData: DataSet[LabeledVector] = ...
  2. val mean = MeanTransformer()
  3. mean.fit(trainingData)

和以前一样,我们在执行程序时会看到以下异常:"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"值得注意的是,这个异常是在飞行前阶段抛出的,这意味着作业尚未提交给运行时系统。这样做的好处是,您不会看到一个运行了几天但由于管道组件不兼容而失败的作业。因此,在一开始就检查类型兼容性是否完整。

为了使MeanTransformer工作LabeledVector也顺利进行,我们必须提供相应的 算子操作。因此,我们必须伴随对象的范围内定义一个FitOperation[MeanTransformer, LabeledVector]TransformOperation[MeanTransformer, LabeledVector, LabeledVector]作为隐式值MeanTransformer

  1. object MeanTransformer {
  2. implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ...
  3. implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ...
  4. }

如果我们想要实现a Predictor而不是a Transformer,那么我们也必须提供a FitOperation此外,a PredictorRequiredPredictOperation实现如何根据测试数据计算预测。