并行执行

本节描述了在 Flink 中配置程序的并行执行。一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

设置并行度

一个 task 的并行度可以从多个层次指定:

算子层次

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = [...]
  3. DataStream<Tuple2<String, Integer>> wordCounts = text
  4. .flatMap(new LineSplitter())
  5. .keyBy(value -> value.f0)
  6. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  7. .sum(1).setParallelism(5);
  8. wordCounts.print();
  9. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val text = [...]
  3. val wordCounts = text
  4. .flatMap{ _.split(" ") map { (_, 1) } }
  5. .keyBy(_._1)
  6. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  7. .sum(1).setParallelism(5)
  8. wordCounts.print()
  9. env.execute("Word Count Example")

执行环境层次

此节所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(3);
  3. DataStream<String> text = [...]
  4. DataStream<Tuple2<String, Integer>> wordCounts = [...]
  5. wordCounts.print();
  6. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(3)
  3. val text = [...]
  4. val wordCounts = text
  5. .flatMap{ _.split(" ") map { (_, 1) } }
  6. .keyBy(_._1)
  7. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  8. .sum(1)
  9. wordCounts.print()
  10. env.execute("Word Count Example")

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

  1. ./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args);
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
  4. Configuration config = new Configuration();
  5. Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true);
  8. } catch (ProgramInvocationException e) {
  9. e.printStackTrace();
  10. }
  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args)
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
  4. Configuration config = new Configuration()
  5. Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true)
  8. } catch {
  9. case e: Exception => e.printStackTrace
  10. }

系统层次

可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。你可以通过查阅配置文档获取更多细节。

设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

注意 为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。