Configuration

By default, the Table & SQL API is preconfigured for producing accurate results with acceptableperformance.

Depending on the requirements of a table program, it might be necessary to adjustcertain parameters for optimization. For example, unbounded streaming programs may need to ensurethat the required state size is capped (see streaming concepts).

Overview

In every table environment, the TableConfig offers options for configuring the current session.

For common or important configuration options, the TableConfig provides getters and setters methodswith detailed inline documentation.

For more advanced configuration, users can directly access the underlying key-value map. The followingsections list all available options that can be used to adjust Flink Table & SQL API programs.

Attention Because options are read at different point in timewhen performing operations, it is recommended to set configuration options early after instantiating atable environment.

  1. // instantiate table environment
  2. TableEnvironment tEnv = ...
  3. // access flink configuration
  4. Configuration configuration = tEnv.getConfig().getConfiguration();
  5. // set low-level key-value options
  6. configuration.setString("table.exec.mini-batch.enabled", "true");
  7. configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
  8. configuration.setString("table.exec.mini-batch.size", "5000");
  1. // instantiate table environment
  2. val tEnv: TableEnvironment = ...
  3. // access flink configuration
  4. val configuration = tEnv.getConfig().getConfiguration()
  5. // set low-level key-value options
  6. configuration.setString("table.exec.mini-batch.enabled", "true")
  7. configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
  8. configuration.setString("table.exec.mini-batch.size", "5000")
  1. # instantiate table environment
  2. t_env = ...
  3. # access flink configuration
  4. configuration = t_env.get_config().get_configuration();
  5. # set low-level key-value options
  6. configuration.set_string("table.exec.mini-batch.enabled", "true");
  7. configuration.set_string("table.exec.mini-batch.allow-latency", "5 s");
  8. configuration.set_string("table.exec.mini-batch.size", "5000");

Attention Currently, key-value options are only supportedfor the Blink planner.

Execution Options

The following options can be used to tune the performance of the query execution.

KeyDefaultTypeDescription
##### table.exec.async-lookup.buffer-capacityBatchStreaming100IntegerThe max number of async i/o operation that the async lookup join can trigger.
##### table.exec.async-lookup.timeoutBatchStreaming"3 min"StringThe async timeout for the asynchronous operation to complete.
##### table.exec.disabled-operatorsBatch(none)StringMainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator.Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg".By default no operator is disabled.
##### table.exec.mini-batch.allow-latencyStreaming"-1 ms"StringThe maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.
##### table.exec.mini-batch.enabledStreamingfalseBooleanSpecifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set.
##### table.exec.mini-batch.sizeStreaming-1LongThe maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.
##### table.exec.resource.default-parallelismBatchStreaming-1IntegerSets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.
##### table.exec.shuffle-modeBatch"batch"StringSets exec shuffle mode. Only batch or pipeline can be set.batch: the job will run stage by stage. pipeline: the job will run in streaming mode, but it may cause resource deadlock that receiver waits for resource to start when the sender holds resource to wait to send data to the receiver.
##### table.exec.sort.async-merge-enabledBatchtrueBooleanWhether to asynchronously merge sorted spill files.
##### table.exec.sort.default-limitBatch-1IntegerDefault limit when user don't set a limit after order by. -1 indicates that this configuration is ignored.
##### table.exec.sort.max-num-file-handlesBatch128IntegerThe maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.
##### table.exec.source.idle-timeoutStreaming"-1 ms"StringWhen a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle.
##### table.exec.spill-compression.block-sizeBatch"64 kb"StringThe memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.
##### table.exec.spill-compression.enabledBatchtrueBooleanWhether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.
##### table.exec.window-agg.buffer-size-limitBatch100000IntegerSets the window elements buffer size limit used in group window agg operator.

Optimizer Options

The following options can be used to adjust the behavior of the query optimizer to get a better execution plan.

KeyDefaultTypeDescription
##### table.optimizer.agg-phase-strategyBatchStreaming"AUTO"StringStrategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set.AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate.ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
##### table.optimizer.distinct-agg.split.bucket-numStreaming1024IntegerConfigure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting.
##### table.optimizer.distinct-agg.split.enabledStreamingfalseBooleanTells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false.
##### table.optimizer.join-reorder-enabledBatchStreamingfalseBooleanEnables join reorder in optimizer. Default is disabled.
##### table.optimizer.join.broadcast-thresholdBatch1048576LongConfigures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting.
##### table.optimizer.reuse-source-enabledBatchStreamingtrueBooleanWhen it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.
##### table.optimizer.reuse-sub-plan-enabledBatchStreamingtrueBooleanWhen it is true, the optimizer will try to find out duplicated sub-plans and reuse them.
##### table.optimizer.source.predicate-pushdown-enabledBatchStreamingtrueBooleanWhen it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.

Python Options

KeyDefaultTypeDescription
##### python.fn-execution.buffer.memory.size"15mb"StringThe amount of memory to be allocated by the input buffer and output buffer of a Python worker. The memory will be accounted as managed memory if the actual memory allocated to an operator is no less than the total memory of a Python worker. Otherwise, this configuration takes no effect.
##### python.fn-execution.bundle.size1000IntegerThe maximum number of elements to include in a bundle for Python user-defined function execution. The elements are processed asynchronously. One bundle of elements are processed before processing the next bundle of elements. A larger value can improve the throughput, but at the cost of more memory usage and higher latency.
##### python.fn-execution.bundle.time1000LongSets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed. Lower timeouts lead to lower tail latencies, but may affect throughput.
##### python.fn-execution.framework.memory.size"64mb"StringThe amount of memory to be allocated by the Python framework. The sum of the value of this configuration and "python.fn-execution.buffer.memory.size" represents the total memory of a Python worker. The memory will be accounted as managed memory if the actual memory allocated to an operator is no less than the total memory of a Python worker. Otherwise, this configuration takes no effect.