给 DataSet 中的元素编号

在一些算法中,可能需要为数据集元素分配唯一标识符。 本文档阐述了如何将 DataSetUtils 用于此目的。

以密集索引编号

zipWithIndex 为元素分配连续的标签,接收数据集作为输入并返回一个新的 (unique id, initial value) 二元组的数据集。 这个过程需要分为两个(子)过程,首先是计数,然后是标记元素,由于计数操作的同步性,这个过程不能被 pipelined(流水线化)。

可供备选的 zipWithUniqueId 是以 pipelined 的方式进行工作的。当唯一标签足够时,首选 zipWithUniqueId 。 例如,下面的代码:

Java

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(2);
  3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
  4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
  5. result.writeAsCsv(resultPath, "\n", ",");
  6. env.execute();

Scala

  1. import org.apache.flink.api.scala._
  2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(2)
  4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
  5. val result: DataSet[(Long, String)] = input.zipWithIndex
  6. result.writeAsCsv(resultPath, "\n", ",")
  7. env.execute()

Python

  1. from flink.plan.Environment import get_environment
  2. env = get_environment()
  3. env.set_parallelism(2)
  4. input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
  5. result = input.zip_with_index()
  6. result.write_text(result_path)
  7. env.execute()

可能会生成这些元组:(0,G),(1,H),(2,A),(3,B),(4,C),(5,D),(6,E),(7,F)

以唯一标识符编号

在许多情况下,可能不需要指定连续的标签。 zipWithUniqueId 以 pipelined 的方式工作,加快了标签分配的过程。该方法接收一个数据集作为输入,并返回一个新的 (unique id, initial value) 二元组的数据集。 例如,下面的代码:

Java

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(2);
  3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
  4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
  5. result.writeAsCsv(resultPath, "\n", ",");
  6. env.execute();

Scala

  1. import org.apache.flink.api.scala._
  2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(2)
  4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
  5. val result: DataSet[(Long, String)] = input.zipWithUniqueId
  6. result.writeAsCsv(resultPath, "\n", ",")
  7. env.execute()

可能会产生这些元组:(0,G),(1,A),(2,H),(3,B),(5,C),(7,D),(9,E),(11,F)