Configuration

By default, the Table & SQL API is preconfigured for producing accurate results with acceptable performance.

Depending on the requirements of a table program, it might be necessary to adjust certain parameters for optimization. For example, unbounded streaming programs may need to ensure that the required state size is capped (see streaming concepts).

Overview

In every table environment, the TableConfig offers options for configuring the current session.

For common or important configuration options, the TableConfig provides getters and setters methods with detailed inline documentation.

For more advanced configuration, users can directly access the underlying key-value map. The following sections list all available options that can be used to adjust Flink Table & SQL API programs.

Attention Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.

  1. // instantiate table environment
  2. TableEnvironment tEnv = ...
  3. // access flink configuration
  4. Configuration configuration = tEnv.getConfig().getConfiguration();
  5. // set low-level key-value options
  6. configuration.setString("table.exec.mini-batch.enabled", "true");
  7. configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
  8. configuration.setString("table.exec.mini-batch.size", "5000");
  1. // instantiate table environment
  2. val tEnv: TableEnvironment = ...
  3. // access flink configuration
  4. val configuration = tEnv.getConfig().getConfiguration()
  5. // set low-level key-value options
  6. configuration.setString("table.exec.mini-batch.enabled", "true")
  7. configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
  8. configuration.setString("table.exec.mini-batch.size", "5000")
  1. # instantiate table environment
  2. t_env = ...
  3. # access flink configuration
  4. configuration = t_env.get_config().get_configuration();
  5. # set low-level key-value options
  6. configuration.set_string("table.exec.mini-batch.enabled", "true");
  7. configuration.set_string("table.exec.mini-batch.allow-latency", "5 s");
  8. configuration.set_string("table.exec.mini-batch.size", "5000");

Attention Currently, key-value options are only supported for the Blink planner.

Execution Options

The following options can be used to tune the performance of the query execution.

KeyDefaultTypeDescription
table.exec.async-lookup.buffer-capacity

Batch Streaming
100IntegerThe max number of async i/o operation that the async lookup join can trigger.
table.exec.async-lookup.timeout

Batch Streaming
3 minDurationThe async timeout for the asynchronous operation to complete.
table.exec.disabled-operators

Batch
(none)StringMainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator. Operators that can be disabled include “NestedLoopJoin”, “ShuffleHashJoin”, “BroadcastHashJoin”, “SortMergeJoin”, “HashAgg”, “SortAgg”. By default no operator is disabled.
table.exec.mini-batch.allow-latency

Streaming
0 msDurationThe maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.
table.exec.mini-batch.enabled

Streaming
falseBooleanSpecifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, ‘table.exec.mini-batch.allow-latency’ and ‘table.exec.mini-batch.size’ must be set.
table.exec.mini-batch.size

Streaming
-1LongThe maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.
table.exec.resource.default-parallelism

Batch Streaming
-1IntegerSets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.
table.exec.shuffle-mode

Batch
“ALL_EDGES_BLOCKING”StringSets exec shuffle mode.
Accepted values are:
  • ALL_EDGES_BLOCKING: All edges will use blocking shuffle.
  • FORWARD_EDGES_PIPELINED: Forward edges will use pipelined shuffle, others blocking.
  • POINTWISE_EDGES_PIPELINED: Pointwise edges will use pipelined shuffle, others blocking. Pointwise edges include forward and rescale edges.
  • ALL_EDGES_PIPELINED: All edges will use pipelined shuffle.
  • batch: the same as ALL_EDGES_BLOCKING. Deprecated.
  • pipelined: the same as ALL_EDGES_PIPELINED. Deprecated.
Note: Blocking shuffle means data will be fully produced before sent to consumer tasks. Pipelined shuffle means data will be sent to consumer tasks once produced.
table.exec.sink.not-null-enforcer

Batch Streaming
ERROR

Enum

Possible values: [ERROR, DROP]
The NOT NULL column constraint on a table enforces that null values can’t be inserted into the table. Flink supports ‘error’ (default) and ‘drop’ enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to ‘drop’ to silently drop such records without throwing exception.
table.exec.sort.async-merge-enabled

Batch
trueBooleanWhether to asynchronously merge sorted spill files.
table.exec.sort.default-limit

Batch
-1IntegerDefault limit when user don’t set a limit after order by. -1 indicates that this configuration is ignored.
table.exec.sort.max-num-file-handles

Batch
128IntegerThe maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.
table.exec.source.cdc-events-duplicate

Streaming
falseBooleanIndicates whether the CDC (Change Data Capture) sources in the job will produce duplicate change events that requires the framework to deduplicate and get consistent result. CDC source refers to the source that produces full change events, including INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. The value of this configuration is false by default.

However, it’s a common case that there are duplicate change events. Because usually the CDC tools (e.g. Debezium) work in at-least-once delivery when failover happens. Thus, in the abnormal situations Debezium may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions.

Therefore, it is recommended to turn on this configuration if your CDC tool is at-least-once delivery. Enabling this configuration requires to define PRIMARY KEY on the CDC sources. The primary key will be used to deduplicate change events and generate normalized changelog stream at the cost of an additional stateful operator.
table.exec.source.idle-timeout

Streaming
0 msDurationWhen a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle. Default value is 0, which means detecting source idleness is not enabled.
table.exec.spill-compression.block-size

Batch
“64 kb”StringThe memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.
table.exec.spill-compression.enabled

Batch
trueBooleanWhether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.
table.exec.state.ttl

Streaming
0 msDurationSpecifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.
table.exec.window-agg.buffer-size-limit

Batch
100000IntegerSets the window elements buffer size limit used in group window agg operator.

Optimizer Options

The following options can be used to adjust the behavior of the query optimizer to get a better execution plan.

KeyDefaultTypeDescription
table.optimizer.agg-phase-strategy

Batch Streaming
“AUTO”StringStrategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
table.optimizer.distinct-agg.split.bucket-num

Streaming
1024IntegerConfigure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key ‘hash_code(distinct_key) % BUCKET_NUM’ which is used as an additional group key after splitting.
table.optimizer.distinct-agg.split.enabled

Streaming
falseBooleanTells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false.
table.optimizer.join-reorder-enabled

Batch Streaming
falseBooleanEnables join reorder in optimizer. Default is disabled.
table.optimizer.join.broadcast-threshold

Batch
1048576LongConfigures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting.
table.optimizer.multiple-input-enabled

Batch
trueBooleanWhen it is true, the optimizer will merge the operators with pipelined shuffling into a multiple input operator to reduce shuffling and improve performance. Default value is true.
table.optimizer.reuse-source-enabled

Batch Streaming
trueBooleanWhen it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.
table.optimizer.reuse-sub-plan-enabled

Batch Streaming
trueBooleanWhen it is true, the optimizer will try to find out duplicated sub-plans and reuse them.
table.optimizer.source.predicate-pushdown-enabled

Batch Streaming
trueBooleanWhen it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.

Table Options

The following options can be used to adjust the behavior of the table planner.

KeyDefaultTypeDescription
table.dynamic-table-options.enabled

Batch Streaming
falseBooleanEnable or disable the OPTIONS hint used to specify table options dynamically, if disabled, an exception would be thrown if any OPTIONS hint is specified
table.generated-code.max-length

Batch Streaming
64000IntegerSpecifies a threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary.
table.local-time-zone

Batch Streaming
“default”StringThe local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don’t include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone id such as “GMT-8:00”.
table.sql-dialect

Batch Streaming
“default”StringThe SQL dialect defines how to parse a SQL query. A different SQL dialect may support different SQL grammar. Currently supported dialects are: default and hive