Parallel Execution

This section describes how the parallel execution of programs can be configured in Flink. A Flinkprogram consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split intoseveral parallel instances for execution and each parallel instance processes a subset of the task’sinput data. The number of parallel instances of a task is called its parallelism.

If you want to use savepoints you should also considersetting a maximum parallelism (or max parallelism). When restoring from a savepoint you canchange the parallelism of specific operators or the whole program and this setting specifiesan upper bound on the parallelism. This is required because Flink internally partitions stateinto key-groups and we cannot have +Inf number of key-groups because this would be detrimentalto performance.

Setting the Parallelism

The parallelism of a task can be specified in Flink on different levels:

Operator Level

The parallelism of an individual operator, data source, or data sink can be defined by calling itssetParallelism() method. For example, like this:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = [...]
  3. DataStream<Tuple2<String, Integer>> wordCounts = text
  4. .flatMap(new LineSplitter())
  5. .keyBy(0)
  6. .timeWindow(Time.seconds(5))
  7. .sum(1).setParallelism(5);
  8. wordCounts.print();
  9. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val text = [...]
  3. val wordCounts = text
  4. .flatMap{ _.split(" ") map { (_, 1) } }
  5. .keyBy(0)
  6. .timeWindow(Time.seconds(5))
  7. .sum(1).setParallelism(5)
  8. wordCounts.print()
  9. env.execute("Word Count Example")

Execution Environment Level

As mentioned here Flinkprograms are executed in the context of an execution environment. Anexecution environment defines a default parallelism for all operators, data sources, and data sinksit executes. Execution environment parallelism can be overwritten by explicitly configuring theparallelism of an operator.

The default parallelism of an execution environment can be specified by calling thesetParallelism() method. To execute all operators, data sources, and data sinks with a parallelismof 3, set the default parallelism of the execution environment as follows:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(3);
  3. DataStream<String> text = [...]
  4. DataStream<Tuple2<String, Integer>> wordCounts = [...]
  5. wordCounts.print();
  6. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(3)
  3. val text = [...]
  4. val wordCounts = text
  5. .flatMap{ _.split(" ") map { (_, 1) } }
  6. .keyBy(0)
  7. .timeWindow(Time.seconds(5))
  8. .sum(1)
  9. wordCounts.print()
  10. env.execute("Word Count Example")

Client Level

The parallelism can be set at the Client when submitting jobs to Flink. TheClient can either be a Java or a Scala program. One example of such a Client isFlink’s Command-line Interface (CLI).

For the CLI client, the parallelism parameter can be specified with -p. Forexample:

  1. ./bin/flink run -p 10 ../examples/*WordCount-java*.jar

In a Java/Scala program, the parallelism is set as follows:

  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args);
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
  4. Configuration config = new Configuration();
  5. Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true);
  8. } catch (ProgramInvocationException e) {
  9. e.printStackTrace();
  10. }
  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args)
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
  4. Configuration config = new Configuration()
  5. Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true)
  8. } catch {
  9. case e: Exception => e.printStackTrace
  10. }

System Level

A system-wide default parallelism for all execution environments can be defined by setting theparallelism.default property in ./conf/flink-conf.yaml. See theConfiguration documentation for details.

Setting the Maximum Parallelism

The maximum parallelism can be set in places where you can also set a parallelism(except client level and system level). Instead of calling setParallelism() you callsetMaxParallelism() to set the maximum parallelism.

The default setting for the maximum parallelism is roughly operatorParallelism + (operatorParallelism / 2) witha lower bound of 128 and an upper bound of 32768.

Attention Setting the maximum parallelism to a very largevalue can be detrimental to performance because some state backends have to keep internal datastructures that scale with the number of key-groups (which are the internal implementation mechanism forrescalable state).