Configuration

All configuration is done in conf/flink-conf.yaml, which is expected to be a flat collection of YAML key value pairs with format key: value.

The configuration is parsed and evaluated when the Flink processes are started. Changes to the configuration file require restarting the relevant processes.

The out of the box configuration will use your default Java installation. You can manually set the environment variable JAVA_HOME or the configuration key env.java.home in conf/flink-conf.yaml if you want to manually override the Java runtime to use.

Basic Setup

The default configuration supports starting a single-node Flink session cluster without any changes. The options in this section are the ones most commonly needed for a basic distributed Flink setup.

Hostnames / Ports

These options are only necessary for standalone application- or session deployments (simple standalone or Kubernetes).

If you use Flink with Yarn, Mesos, or the active Kubernetes integration, the hostnames and ports are automatically discovered.

  • rest.address, rest.port: These are used by the client to connect to Flink. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes) service in front of the JobManager’s REST interface.

  • The jobmanager.rpc.address (defaults to “localhost”) and jobmanager.rpc.port (defaults to 6123) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on setups with high-availability where the leader election mechanism is used to discover this automatically.

Memory Sizes

The default memory sizes support simple streaming/batch applications, but are too low to yield good performance for more complex applications.

  • jobmanager.memory.process.size: Total size of the JobManager (JobMaster / ResourceManager / Dispatcher) process.
  • taskmanager.memory.process.size: Total size of the TaskManager process.

The total sizes include everything. Flink will subtract some memory for the JVM’s own memory requirements (metaspace and others), and divide and configure the rest automatically between its components (JVM Heap, Off-Heap, for Task Managers also network, managed memory etc.).

These value are configured as memory sizes, for example 1536m or 2g.

Parallelism

  • taskmanager.numberOfTaskSlots: The number of slots that a TaskManager offers (default: 1). Each slot can take one task or pipeline. Having multiple slots in a TaskManager can help amortize certain constant overheads (of the JVM, application libraries, or network connections) across parallel tasks or pipelines. See the Task Slots and Resources concepts section for details.

    Running more smaller TaskManagers with one slot each is a good starting point and leads to the best isolation between tasks. Dedicating the same resources to fewer larger TaskManagers with more slots can help to increase resource utilization, at the cost of weaker isolation between the tasks (more tasks share the same JVM).

  • parallelism.default: The default parallelism used when no parallelism is specified anywhere (default: 1).

Checkpointing

You can configure checkpointing directly in code within your Flink job or application. Putting these values here in the configuration defines them as defaults in case the application does not configure anything.

  • state.backend: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are filesystem or rocksdb.
  • state.checkpoints.dir: The directory to write checkpoints to. This takes a path URI like s3://mybucket/flink-app/checkpoints or hdfs://namenode:port/flink/checkpoints.
  • state.savepoints.dir: The default directory for savepoints. Takes a path URI, similar to state.checkpoints.dir.

Web UI

  • web.submit.enable: Enables uploading and starting jobs through the Flink UI (true by default). Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
  • web.upload.dir: The directory where to store uploaded jobs. Only used when web.submit.enable is true.

Other

  • io.tmp.dirs: The directories where Flink puts local data, defaults to the system temp directory (java.io.tmpdir property). If a list of directories is configured, Flink will rotate files across the directories.

    The data put in these directories include by default the files created by RocksDB, spilled intermediate results (batch algorithms), and cached jar files.

    This data is NOT relied upon for persistence/recovery, but if this data gets deleted, it typically causes a heavyweight recovery operation. It is hence recommended to set this to a directory that is not automatically periodically purged.

    Yarn, Mesos, and Kubernetes setups automatically configure this value to the local working directories by default.



Common Setup Options

Common options to configure your Flink application or cluster.

Hosts and Ports

Options to configure hostnames and ports for the different Flink components.

The JobManager hostname and port are only relevant for standalone setups without high-availability. In that setup, the config values are used by the TaskManagers to find (and connect to) the JobManager. In all highly-available setups, the TaskManagers discover the JobManager via the High-Availability-Service (for example ZooKeeper).

Setups using resource orchestration frameworks (K8s, Yarn, Mesos) typically use the framework’s service discovery facilities.

You do not need to configure any TaskManager hosts and ports, unless the setup requires the use of specific port ranges or specific network interfaces to bind to.

KeyDefaultTypeDescription
jobmanager.rpc.address
(none)StringThe config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.port
6123IntegerThe config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
metrics.internal.query-service.port
“0”StringThe port range used for Flink’s internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.
rest.address
(none)StringThe address that should be used by clients to connect to the server.
rest.bind-address
(none)StringThe address that the server binds itself.
rest.bind-port
“8081”StringThe port that the server binds itself. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Rest servers are running on the same machine.
rest.port
8081IntegerThe port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port.
taskmanager.data.port
0IntegerThe task manager’s external port used for data exchange operations.
taskmanager.host
(none)StringThe external address of the network interface where the TaskManager is exposed. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.
taskmanager.rpc.port
“0”StringThe external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

Fault Tolerance

These configuration options control Flink’s restart behaviour in case of failures during the execution. By configuring these options in your flink-conf.yaml, you define the cluster’s default restart strategy.

The default restart strategy will only take effect if no job specific restart strategy has been configured via the ExecutionConfig.

KeyDefaultTypeDescription
restart-strategy
(none)StringDefines the restart strategy to use in case of job failures.
Accepted values are:
  • none, off, disable: No restart strategy.
  • fixeddelay, fixed-delay: Fixed delay restart strategy. More details can be found here.
  • failurerate, failure-rate: Failure rate restart strategy. More details can be found here.
If checkpointing is disabled, the default value is none. If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and ‘1 s‘ delay.

Fixed Delay Restart Strategy

KeyDefaultTypeDescription
restart-strategy.fixed-delay.attempts
1IntegerThe number of times that Flink retries the execution before the job is declared as failed if restart-strategy has been set to fixed-delay.
restart-strategy.fixed-delay.delay
1 sDurationDelay between two consecutive restart attempts if restart-strategy has been set to fixed-delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: “1 min”, “20 s”

Failure Rate Restart Strategy

KeyDefaultTypeDescription
restart-strategy.failure-rate.delay
1 sDurationDelay between two consecutive restart attempts if restart-strategy has been set to failure-rate. It can be specified using notation: “1 min”, “20 s”
restart-strategy.failure-rate.failure-rate-interval
1 minDurationTime interval for measuring failure rate if restart-strategy has been set to failure-rate. It can be specified using notation: “1 min”, “20 s”
restart-strategy.failure-rate.max-failures-per-interval
1IntegerMaximum number of restarts in given time interval before failing a job if restart-strategy has been set to failure-rate.

Checkpoints and State Backends

These options control the basic setup of state backends and checkpointing behavior.

The options are only relevant for jobs/applications executing in a continuous streaming fashion. Jobs/applications executing in a batch fashion do not use state backends and checkpoints, but different internal data structures that are optimized for batch processing.

KeyDefaultTypeDescription
state.backend
(none)StringThe state backend to be used to store and checkpoint state.
state.checkpoints.dir
(none)StringThe default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.savepoints.dir
(none)StringThe default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).
state.backend.incremental
falseBooleanOption whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.
state.backend.local-recovery
falseBooleanThis option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option.
state.checkpoints.num-retained
1IntegerThe maximum number of completed checkpoints to retain.
taskmanager.state.local.root-dirs
(none)StringThe config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option

High Availability

High-availability here refers to the ability of the JobManager process to recover from failures.

The JobManager ensures consistency during recovery across TaskManagers. For the JobManager itself to recover consistently, an external service must store a minimal amount of recovery metadata (like “ID of last committed checkpoint”), as well as help to elect and lock which JobManager is the leader (to avoid split-brain situations).

KeyDefaultTypeDescription
high-availability
“NONE”StringDefines high-availability mode used for the cluster execution. To enable high-availability, set this mode to “ZOOKEEPER” or specify FQN of factory class.
high-availability.cluster-id
“/default”StringThe ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.
high-availability.storageDir
(none)StringFile system path (URI) where Flink persists metadata in high-availability setups.

Options for high-availability setups with ZooKeeper

KeyDefaultTypeDescription
high-availability.zookeeper.path.root
“/flink”StringThe root path under which Flink stores its entries in ZooKeeper.
high-availability.zookeeper.quorum
(none)StringThe ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.

Memory Configuration

These configuration values control the way that TaskManagers and JobManagers use memory.

Flink tries to shield users as much as possible from the complexity of configuring the JVM for data-intensive processing. In most cases, users should only need to set the values taskmanager.memory.process.size or taskmanager.memory.flink.size (depending on how the setup), and possibly adjusting the ratio of JVM heap and Managed Memory via taskmanager.memory.managed.fraction. The other options below can be used for performane tuning and fixing memory related errors.

For a detailed explanation of how these options interact, see the documentation on TaskManager and JobManager memory configurations.

KeyDefaultTypeDescription
jobmanager.memory.enable-jvm-direct-memory-limit
falseBooleanWhether to enable the JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the value of ‘jobmanager.memory.off-heap.size’ option.
jobmanager.memory.flink.size
(none)MemorySizeTotal Flink Memory size for the JobManager. This includes all the memory that a JobManager consumes, except for JVM Metaspace and JVM Overhead. It consists of JVM Heap Memory and Off-heap Memory. See also ‘jobmanager.memory.process.size’ for total process memory size configuration.
jobmanager.memory.heap.size
(none)MemorySizeJVM Heap Memory size for JobManager. The minimum recommended JVM Heap size is 128.000mb (134217728 bytes).
jobmanager.memory.jvm-metaspace.size
256 mbMemorySizeJVM Metaspace Size for the JobManager.
jobmanager.memory.jvm-overhead.fraction
0.1FloatFraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.max
1 gbMemorySizeMax JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.min
192 mbMemorySizeMin JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.off-heap.size
128 mbMemorySizeOff-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by ‘jobmanager.memory.enable-jvm-direct-memory-limit’.
jobmanager.memory.process.size
(none)MemorySizeTotal Process Memory size for the JobManager. This includes all the memory that a JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this should be set to the container memory. See also ‘jobmanager.memory.flink.size’ for Total Flink Memory size configuration.
taskmanager.memory.flink.size
(none)MemorySizeTotal Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Network Memory. See also ‘taskmanager.memory.process.size’ for total process memory size configuration.
taskmanager.memory.framework.heap.size
128 mbMemorySizeFramework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.
taskmanager.memory.framework.off-heap.size
128 mbMemorySizeFramework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.jvm-metaspace.size
256 mbMemorySizeJVM Metaspace Size for the TaskExecutors.
taskmanager.memory.jvm-overhead.fraction
0.1FloatFraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.max
1 gbMemorySizeMax JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.min
192 mbMemorySizeMin JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.managed.fraction
0.4FloatFraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.
taskmanager.memory.managed.size
(none)MemorySizeManaged Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.network.fraction
0.1FloatFraction of Total Flink Memory to be used as Network Memory. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.network.max
1 gbMemorySizeMax Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.network.min
64 mbMemorySizeMin Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.process.size
(none)MemorySizeTotal Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also ‘taskmanager.memory.flink.size’ for total Flink memory size configuration.
taskmanager.memory.task.heap.size
(none)MemorySizeTask Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.
taskmanager.memory.task.off-heap.size
0 bytesMemorySizeTask Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

Miscellaneous Options

KeyDefaultTypeDescription
fs.allowed-fallback-filesystems
(none)StringA (semicolon-separated) list of file schemes, for which Hadoop can be used instead of an appropriate Flink plugin. (example: s3;wasb)
fs.default-scheme
(none)StringThe default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of an HDFS NameNode.
io.tmp.dirs
‘LOCAL_DIRS’ on Yarn. ‘_FLINK_TMP_DIR’ on Mesos. System.getProperty(“java.io.tmpdir”) in standalone.StringDirectories for temporary files, separated by”,”, “|”, or the system’s java.io.File.pathSeparator.


Security

Options for configuring Flink’s security and secure interaction with external systems.

SSL

Flink’s network connections can be secured via SSL. Please refer to the SSL Setup Docs for detailed setup guide and background.

KeyDefaultTypeDescription
security.ssl.algorithms
“TLS_RSA_WITH_AES_128_CBC_SHA”StringThe comma separated list of standard SSL algorithms to be supported. Read more here
security.ssl.internal.cert.fingerprint
(none)StringThe sha1 fingerprint of the internal certificate. This further protects the internal communication to present the exact certificate used by Flink.This is necessary where one cannot use private CA(self signed) or there is internal firm wide CA is required
security.ssl.internal.enabled
falseBooleanTurns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).
security.ssl.internal.key-password
(none)StringThe secret to decrypt the key in the keystore for Flink’s internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore
(none)StringThe Java keystore file with SSL Key and Certificate, to be used Flink’s internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore-password
(none)StringThe secret to decrypt the keystore file for Flink’s for Flink’s internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore
(none)StringThe truststore file containing the public CA certificates to verify the peer for Flink’s internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore-password
(none)StringThe password to decrypt the truststore for Flink’s internal endpoints (rpc, data transport, blob server).
security.ssl.protocol
“TLSv1.2”StringThe SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.
security.ssl.rest.authentication-enabled
falseBooleanTurns on mutual SSL authentication for external communication via the REST endpoints.
security.ssl.rest.cert.fingerprint
(none)StringThe sha1 fingerprint of the rest certificate. This further protects the rest REST endpoints to present certificate which is only used by proxy serverThis is necessary where once uses public CA or internal firm wide CA
security.ssl.rest.enabled
falseBooleanTurns on SSL for external communication via the REST endpoints.
security.ssl.rest.key-password
(none)StringThe secret to decrypt the key in the keystore for Flink’s external REST endpoints.
security.ssl.rest.keystore
(none)StringThe Java keystore file with SSL Key and Certificate, to be used Flink’s external REST endpoints.
security.ssl.rest.keystore-password
(none)StringThe secret to decrypt the keystore file for Flink’s for Flink’s external REST endpoints.
security.ssl.rest.truststore
(none)StringThe truststore file containing the public CA certificates to verify the peer for Flink’s external REST endpoints.
security.ssl.rest.truststore-password
(none)StringThe password to decrypt the truststore for Flink’s external REST endpoints.
security.ssl.verify-hostname
trueBooleanFlag to enable peer’s hostname verification during ssl handshake.

Auth with External Systems

ZooKeeper Authentication / Authorization

These options are necessary when connecting to a secured ZooKeeper quorum.

KeyDefaultTypeDescription
zookeeper.sasl.disable
falseBoolean
zookeeper.sasl.login-context-name
“Client”String
zookeeper.sasl.service-name
“zookeeper”String

Kerberos-based Authentication / Authorization

Please refer to the Flink and Kerberos Docs for a setup guide and a list of external system to which Flink can authenticate itself via Kerberos.

KeyDefaultTypeDescription
security.kerberos.login.contexts
(none)StringA comma-separated list of login contexts to provide the Kerberos credentials to (for example, Client,KafkaClient to use the credentials for ZooKeeper authentication and for Kafka authentication)
security.kerberos.login.keytab
(none)StringAbsolute path to a Kerberos keytab file that contains the user credentials.
security.kerberos.login.principal
(none)StringKerberos principal name associated with the keytab.
security.kerberos.login.use-ticket-cache
trueBooleanIndicates whether to read from your Kerberos ticket cache.


Resource Orchestration Frameworks

This section contains options related to integrating Flink with resource orchestration frameworks, like Kubernetes, Yarn, Mesos, etc.

Note that is not always necessary to integrate Flink with the resource orchestration framework. For example, you can easily deploy Flink applications on Kubernetes without Flink knowing that it runs on Kubernetes (and without specifying any of the Kubernetes config options here.) See this setup guide for an example.

The options in this section are necessary for setups where Flink itself actively requests and releases resources from the orchestrators.

YARN

KeyDefaultTypeDescription
external-resource.<resource_name>.yarn.config-key
(none)StringIf configured, Flink will add this key to the resource profile of container request to Yarn. The value will be set to the value of external-resource.<resource_name>.amount.
yarn.application-attempt-failures-validity-interval
10000LongTime window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See here for more information.
yarn.application-attempts
(none)StringNumber of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need to set the JM host:port manually. It is recommended to leave this option at 1.
yarn.application-master.port
“0”StringWith this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
yarn.application.id
(none)StringThe YARN application id of the running yarn cluster. This is the YARN cluster where the pipeline is going to be executed.
yarn.application.name
(none)StringA custom name for your YARN application.
yarn.application.node-label
(none)StringSpecify YARN node label for the YARN application.
yarn.application.priority
-1IntegerA non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to ‘-1’(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN’s official documentation for specific settings required to enable priority scheduling for the targeted YARN version.
yarn.application.queue
(none)StringThe YARN queue on which to put the current pipeline.
yarn.application.type
(none)StringA custom type for your YARN application..
yarn.appmaster.vcores
1IntegerThe number of virtual cores (vcores) used by YARN application master.
yarn.containers.vcores
-1IntegerThe number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
yarn.file-replication
-1IntegerNumber of file replication of each local resource file. If it is not configured, Flink will use the default replication value in hadoop configuration.
yarn.flink-dist-jar
(none)StringThe location of the Flink dist jar.
yarn.heartbeat.container-request-interval
500IntegerTime between heartbeats with the ResourceManager in milliseconds if Flink requests containers:
  • The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.
  • The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.
If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See this link for more information.
yarn.heartbeat.interval
5IntegerTime between heartbeats with the ResourceManager in seconds.
yarn.per-job-cluster.include-user-jar
“ORDER”StringDefines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning (“FIRST”), at the end (“LAST”), or be positioned based on their name (“ORDER”). “DISABLED” means the user-jars are excluded from the system class path.
yarn.properties-file.location
(none)StringWhen a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).
yarn.provided.lib.dirs
(none)List<String>A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn’t need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib
yarn.security.kerberos.localized-keytab-path
“krb5.keytab”StringLocal (on NodeManager) path where kerberos keytab file will be localized to. If yarn.security.kerberos.ship-local-keytab set to true, Flink willl ship the keytab file as a YARN local resource. In this case, the path is relative to the local resource directory. If set to false, Flink will try to directly locate the keytab from the path itself.
yarn.security.kerberos.ship-local-keytab
trueBooleanWhen this is true Flink will ship the keytab file configured via security.kerberos.login.keytab as a localized YARN resource.
yarn.ship-directories
(none)List<String>A semicolon-separated list of directories to be shipped to the YARN cluster.
yarn.tags
(none)StringA comma-separated list of tags to apply to the Flink YARN application.

Kubernetes

KeyDefaultTypeDescription
external-resource.<resourcename>.kubernetes.config-key
(none)StringIf configured, Flink will add “resources.limits.<config-key>” and “resources.requests.<config-key>” to the main container of TaskExecutor and set the value to the value of external-resource.<resource_name>.amount.
kubernetes.cluster-id
(none)StringThe cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random ID.
kubernetes.config.file
(none)StringThe kubernetes config file will be used to create the client. The default is located at ~/.kube/config
kubernetes.container-start-command-template
“%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%”StringTemplate for the kubernetes jobmanager and taskmanager container start invocation.
kubernetes.container.image
The default value depends on the actually running version. In general it looks like “flink:<FLINK_VERSION>-scala<SCALAVERSION>”StringImage to use for Flink containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit https://hub.docker.com//flink?tab=tags for the images provided by the Flink project.
kubernetes.container.image.pull-policy
IfNotPresent

Enum

Possible values: [IfNotPresent, Always, Never]
The Kubernetes container image pull policy (IfNotPresent or Always or Never). The default policy is IfNotPresent to avoid putting pressure to image repository.
kubernetes.container.image.pull-secrets
(none)List<String>A semicolon-separated list of the Kubernetes secrets used to access private image registries.
kubernetes.context
(none)StringThe desired context from your Kubernetes config file used to configure the Kubernetes client for interacting with the cluster. This could be helpful if one has multiple contexts configured and wants to administrate different Flink clusters on different Kubernetes clusters/contexts.
kubernetes.entry.path
“/opt/flink/bin/kubernetes-entry.sh”StringThe entrypoint script of kubernetes in the image. It will be used as command for jobmanager and taskmanager container.
kubernetes.flink.conf.dir
“/opt/flink/conf”StringThe flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.
kubernetes.flink.log.dir
“/opt/flink/log”StringThe directory that logs of jobmanager and taskmanager be saved in the pod.
kubernetes.hadoop.conf.config-map.name
(none)StringSpecify the name of an existing ConfigMap that contains custom Hadoop configuration to be mounted on the JobManager(s) and TaskManagers.
kubernetes.jobmanager.annotations
(none)MapThe user-specified annotations that are set to the JobManager pod. The value could be in the form of a1:v1,a2:v2
kubernetes.jobmanager.cpu
1.0DoubleThe number of cpu used by job manager
kubernetes.jobmanager.labels
(none)MapThe labels to be set for JobManager pod. Specified as key:value pairs separated by commas. For example, version:alphav1,deploy:test.
kubernetes.jobmanager.node-selector
(none)MapThe node selector to be set for JobManager pod. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.
kubernetes.jobmanager.service-account
“default”StringService account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.
kubernetes.jobmanager.tolerations
(none)List<Map>The user-specified tolerations to be set to the JobManager pod. The value should be in the form of key:key1,operator:Equal,value:value1,effect:NoSchedule;key:key2,operator:Exists,effect:NoExecute,tolerationSeconds:6000
kubernetes.namespace
“default”StringThe namespace that will be used for running the jobmanager and taskmanager pods.
kubernetes.rest-service.annotations
(none)MapThe user-specified annotations that are set to the rest Service. The value should be in the form of a1:v1,a2:v2
kubernetes.rest-service.exposed.type
LoadBalancer

Enum

Possible values: [ClusterIP, NodePort, LoadBalancer]
The type of the rest service (ClusterIP or NodePort or LoadBalancer). When set to ClusterIP, the rest service will not be created.
kubernetes.taskmanager.annotations
(none)MapThe user-specified annotations that are set to the TaskManager pod. The value could be in the form of a1:v1,a2:v2
kubernetes.taskmanager.cpu
-1.0DoubleThe number of cpu used by task manager. By default, the cpu is set to the number of slots per TaskManager
kubernetes.taskmanager.labels
(none)MapThe labels to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, version:alphav1,deploy:test.
kubernetes.taskmanager.node-selector
(none)MapThe node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.
kubernetes.taskmanager.tolerations
(none)List<Map>The user-specified tolerations to be set to the TaskManager pod. The value should be in the form of key:key1,operator:Equal,value:value1,effect:NoSchedule;key:key2,operator:Exists,effect:NoExecute,tolerationSeconds:6000

Mesos

KeyDefaultTypeDescription
mesos.failover-timeout
604800IntegerThe failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down.
mesos.master
(none)StringThe Mesos master URL. The value should be in one of the following forms:
  • host:port
  • zk://host1:port1,host2:port2,…/path
  • zk://username:password@host1:port1,host2:port2,…/path
  • file:///path/to/file
mesos.resourcemanager.artifactserver.port
0IntegerThe config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.
mesos.resourcemanager.artifactserver.ssl.enabled
trueBooleanEnables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to be set to true encryption to enable encryption.
mesos.resourcemanager.declined-offer-refuse-duration
5000LongAmount of time to ask the Mesos master to not resend a declined resource offer again. This ensures a declined resource offer isn’t resent immediately after being declined
mesos.resourcemanager.framework.name
“Flink”StringMesos framework name
mesos.resourcemanager.framework.principal
(none)StringMesos framework principal
mesos.resourcemanager.framework.role
“*”StringMesos framework role definition
mesos.resourcemanager.framework.secret
(none)StringMesos framework secret
mesos.resourcemanager.framework.user
(none)StringMesos framework user
mesos.resourcemanager.tasks.port-assignments
(none)StringComma-separated list of configuration keys which represent a configurable port. All port keys will dynamically get a port assigned through Mesos.
mesos.resourcemanager.unused-offer-expiration
120000LongAmount of time to wait for unused expired offers before declining them. This ensures your scheduler will not hoard unuseful offers.

Mesos TaskManager

KeyDefaultTypeDescription
mesos.constraints.hard.hostattribute
(none)StringConstraints for task placement on Mesos based on agent attributes. Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target mesos agents. Example: az:eu-west-1a,series:t2
mesos.resourcemanager.tasks.bootstrap-cmd
(none)StringA command which is executed before the TaskManager is started.
mesos.resourcemanager.tasks.container.docker.force-pull-image
falseBooleanInstruct the docker containerizer to forcefully pull the image rather than reuse a cached version.
mesos.resourcemanager.tasks.container.docker.parameters
(none)StringCustom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of “key=value” pairs. The “value” may contain ‘=’.
mesos.resourcemanager.tasks.container.image.name
(none)StringImage name to use for the container.
mesos.resourcemanager.tasks.container.type
“mesos”StringType of the containerization used: “mesos” or “docker”.
mesos.resourcemanager.tasks.container.volumes
(none)StringA comma separated list of [hostpath:]container_path[:RO|RW]. This allows for mounting additional volumes into your container.
mesos.resourcemanager.tasks.cpus
0.0DoubleCPUs to assign to the Mesos workers.
mesos.resourcemanager.tasks.disk
0IntegerDisk space to assign to the Mesos workers in MB.
mesos.resourcemanager.tasks.gpus
0IntegerGPUs to assign to the Mesos workers.
mesos.resourcemanager.tasks.hostname
(none)StringOptional value to define the TaskManager’s hostname. The pattern _TASK is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. TASK.flink-service.mesos) for name lookups.
mesos.resourcemanager.tasks.taskmanager-cmd
“$FLINK_HOME/bin/mesos-taskmanager.sh”String
mesos.resourcemanager.tasks.uris
(none)StringA comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers.


State Backends

Please refer to the State Backend Documentation for background on State Backends.

RocksDB State Backend

These are the options commonly needed to configure the RocksDB state backend. See the Advanced RocksDB Backend Section for options necessary for advanced low level configurations and trouble-shooting.

KeyDefaultTypeDescription
state.backend.rocksdb.memory.fixed-per-slot
(none)MemorySizeThe fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the ‘state.backend.rocksdb.memory.managed’ option when configured. If neither this option, nor the ‘state.backend.rocksdb.memory.managed’ optionare set, then each RocksDB column family state has its own memory caches (as controlled by the column family options).
state.backend.rocksdb.memory.high-prio-pool-ratio
0.1DoubleThe fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when ‘state.backend.rocksdb.memory.managed’ or ‘state.backend.rocksdb.memory.fixed-per-slot’ are configured.
state.backend.rocksdb.memory.managed
trueBooleanIf set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.
state.backend.rocksdb.memory.write-buffer-ratio
0.5DoubleThe maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when ‘state.backend.rocksdb.memory.managed’ or ‘state.backend.rocksdb.memory.fixed-per-slot’ are configured.
state.backend.rocksdb.timer-service.factory
ROCKSDB

Enum

Possible values: [HEAP, ROCKSDB]
This determines the factory for timer service state implementation. Options are either HEAP (heap-based) or ROCKSDB for an implementation based on RocksDB.


Metrics

Please refer to the metrics system documentation for background on Flink’s metrics infrastructure.

KeyDefaultTypeDescription
metrics.fetcher.update-interval
10000LongUpdate interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 disables the metric fetching completely.
metrics.internal.query-service.port
“0”StringThe port range used for Flink’s internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.
metrics.internal.query-service.thread-priority
1IntegerThe thread priority used for Flink’s internal metric query service. The thread is created by Akka’s thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.
metrics.latency.granularity
“operator”StringDefines the granularity of latency metrics. Accepted values are:
  • single - Track latency without differentiating between sources and subtasks.
  • operator - Track latency while differentiating between sources, but not subtasks.
  • subtask - Track latency while differentiating between sources and subtasks.
metrics.latency.history-size
128IntegerDefines the number of measured latencies to maintain at each operator.
metrics.latency.interval
0LongDefines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.
metrics.reporter.<name>.<parameter>
(none)StringConfigures the parameter <parameter> for the reporter named <name>.
metrics.reporter.<name>.class
(none)StringThe reporter class to use for the reporter named <name>.
metrics.reporter.<name>.interval
10 sDurationThe reporter interval to use for the reporter named <name>.
metrics.reporters
(none)StringAn optional list of reporter names. If configured, only reporters whose name matches any of the names in the list will be started. Otherwise, all reporters that could be found in the configuration will be started.
metrics.scope.delimiter
“.”StringDelimiter used to assemble the metric identifier.
metrics.scope.jm
“<host>.jobmanager”StringDefines the scope format string that is applied to all metrics scoped to a JobManager.
metrics.scope.jm.job
“<host>.jobmanager.<job_name>”StringDefines the scope format string that is applied to all metrics scoped to a job on a JobManager.
metrics.scope.operator
“<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>”StringDefines the scope format string that is applied to all metrics scoped to an operator.
metrics.scope.task
“<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>”StringDefines the scope format string that is applied to all metrics scoped to a task.
metrics.scope.tm
“<host>.taskmanager.<tm_id>”StringDefines the scope format string that is applied to all metrics scoped to a TaskManager.
metrics.scope.tm.job
“<host>.taskmanager.<tm_id>.<job_name>”StringDefines the scope format string that is applied to all metrics scoped to a job on a TaskManager.
metrics.system-resource
falseBooleanFlag indicating whether Flink should report system resource metrics such as machine’s CPU, memory or network usage.
metrics.system-resource-probing-interval
5000LongInterval between probing of system resource metrics specified in milliseconds. Has an effect only when ‘metrics.system-resource’ is enabled.

RocksDB Native Metrics

Flink can report metrics from RocksDB’s native code, for applications using the RocksDB state backend. The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs.

Note: Enabling RocksDB’s native metrics may cause degraded performance and should be set carefully.

KeyDefaultTypeDescription
state.backend.rocksdb.metrics.actual-delayed-write-rate
falseBooleanMonitor the current actual delayed write rate. 0 means no delay.
state.backend.rocksdb.metrics.background-errors
falseBooleanMonitor the number of background errors in RocksDB.
state.backend.rocksdb.metrics.block-cache-capacity
falseBooleanMonitor block cache capacity.
state.backend.rocksdb.metrics.block-cache-pinned-usage
falseBooleanMonitor the memory size for the entries being pinned in block cache.
state.backend.rocksdb.metrics.block-cache-usage
falseBooleanMonitor the memory size for the entries residing in block cache.
state.backend.rocksdb.metrics.column-family-as-variable
falseBooleanWhether to expose the column family as a variable.
state.backend.rocksdb.metrics.compaction-pending
falseBooleanTrack pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.
state.backend.rocksdb.metrics.cur-size-active-mem-table
falseBooleanMonitor the approximate size of the active memtable in bytes.
state.backend.rocksdb.metrics.cur-size-all-mem-tables
falseBooleanMonitor the approximate size of the active and unflushed immutable memtables in bytes.
state.backend.rocksdb.metrics.estimate-live-data-size
falseBooleanEstimate of the amount of live data in bytes.
state.backend.rocksdb.metrics.estimate-num-keys
falseBooleanEstimate the number of keys in RocksDB.
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes
falseBooleanEstimated total number of bytes compaction needs to rewrite to get all levels down to under target size. Not valid for other compactions than level-based.
state.backend.rocksdb.metrics.estimate-table-readers-mem
falseBooleanEstimate the memory used for reading SST tables, excluding memory used in block cache (e.g.,filter and index blocks) in bytes.
state.backend.rocksdb.metrics.is-write-stopped
falseBooleanTrack whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.
state.backend.rocksdb.metrics.mem-table-flush-pending
falseBooleanMonitor the number of pending memtable flushes in RocksDB.
state.backend.rocksdb.metrics.num-deletes-active-mem-table
falseBooleanMonitor the total number of delete entries in the active memtable.
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables
falseBooleanMonitor the total number of delete entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-entries-active-mem-table
falseBooleanMonitor the total number of entries in the active memtable.
state.backend.rocksdb.metrics.num-entries-imm-mem-tables
falseBooleanMonitor the total number of entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-immutable-mem-table
falseBooleanMonitor the number of immutable memtables in RocksDB.
state.backend.rocksdb.metrics.num-live-versions
falseBooleanMonitor number of live versions. Version is an internal data structure. See RocksDB file version_set.h for details. More live versions often mean more SST files are held from being deleted, by iterators or unfinished compactions.
state.backend.rocksdb.metrics.num-running-compactions
falseBooleanMonitor the number of currently running compactions.
state.backend.rocksdb.metrics.num-running-flushes
falseBooleanMonitor the number of currently running flushes.
state.backend.rocksdb.metrics.num-snapshots
falseBooleanMonitor the number of unreleased snapshots of the database.
state.backend.rocksdb.metrics.size-all-mem-tables
falseBooleanMonitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.
state.backend.rocksdb.metrics.total-sst-files-size
falseBooleanMonitor the total size (bytes) of all SST files.WARNING: may slow down online queries if there are too many files.


History Server

The history server keeps the information of completed jobs (graphs, runtimes, statistics). To enable it, you have to enable “job archiving” in the JobManager (jobmanager.archive.fs.dir).

See the History Server Docs for details.

KeyDefaultTypeDescription
historyserver.archive.clean-expired-jobs
falseBooleanWhether HistoryServer should cleanup jobs that are no longer present historyserver.archive.fs.dir.
historyserver.archive.fs.dir
(none)StringComma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via jobmanager.archive.fs.dir.
historyserver.archive.fs.refresh-interval
10000LongInterval in milliseconds for refreshing the archived job directories.
historyserver.web.address
(none)StringAddress of the HistoryServer’s web interface.
historyserver.web.port
8082IntegerPort of the HistoryServers’s web interface.
historyserver.web.refresh-interval
10000LongThe refresh interval for the HistoryServer web-frontend in milliseconds.
historyserver.web.ssl.enabled
falseBooleanEnable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true.
historyserver.web.tmpdir
(none)StringThis configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory.


Experimental

Options for experimental features in Flink.

Queryable State

Queryable State is an experimental features that gives lets you access Flink’s internal state like a key/value store. See the Queryable State Docs for details.

KeyDefaultTypeDescription
queryable-state.client.network-threads
0IntegerNumber of network (Netty’s event loop) Threads for queryable state client.
queryable-state.enable
falseBooleanOption whether the queryable state proxy and server should be enabled where possible and configurable.
queryable-state.proxy.network-threads
0IntegerNumber of network (Netty’s event loop) Threads for queryable state proxy.
queryable-state.proxy.ports
“9069”StringThe port range of the queryable state proxy. The specified range can be a single port: “9123”, a range of ports: “50100-50200”, or a list of ranges and ports: “50100-50200,50300-50400,51234”.
queryable-state.proxy.query-threads
0IntegerNumber of query Threads for queryable state proxy. Uses the number of slots if set to 0.
queryable-state.server.network-threads
0IntegerNumber of network (Netty’s event loop) Threads for queryable state server.
queryable-state.server.ports
“9067”StringThe port range of the queryable state server. The specified range can be a single port: “9123”, a range of ports: “50100-50200”, or a list of ranges and ports: “50100-50200,50300-50400,51234”.
queryable-state.server.query-threads
0IntegerNumber of query Threads for queryable state server. Uses the number of slots if set to 0.


Debugging & Expert Tuning

The options below here are meant for expert users and for fixing/debugging problems. Most setups should not need to configure these options.

Class Loading

Flink dynamically loads the code for jobs submitted to a session cluster. In addition, Flink tries to hide many dependencies in the classpath from the application. This helps to reduce dependency conflicts between the application code and the dependencies in the classpath.

Please refer to the Debugging Classloading Docs for details.

KeyDefaultTypeDescription
classloader.fail-on-metaspace-oom-error
trueBooleanFail Flink JVM processes if ‘OutOfMemoryError: Metaspace’ is thrown while trying to load a user code class.
classloader.parent-first-patterns.additional
(none)StringA (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to “classloader.parent-first-patterns.default”.
classloader.parent-first-patterns.default
“java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c”StringA (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use “classloader.parent-first-patterns.additional” instead.
classloader.resolve-order
“child-first”StringDefines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar (“child-first”) or the application classpath (“parent-first”). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).

Advanced State Backends Options

KeyDefaultTypeDescription
state.backend.async
trueBooleanOption whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option.
state.backend.fs.memory-threshold
20 kbMemorySizeThe minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.
state.backend.fs.write-buffer-size
4096IntegerThe default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option ‘state.backend.fs.memory-threshold’.

Advanced RocksDB State Backends Options

Advanced options to tune RocksDB and RocksDB checkpoints.

KeyDefaultTypeDescription
state.backend.rocksdb.checkpoint.transfer.thread.num
1IntegerThe number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.
state.backend.rocksdb.localdir
(none)StringThe local directory (on the TaskManager) where RocksDB puts its files.
state.backend.rocksdb.options-factory
“org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory”StringThe options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in ‘RocksDBConfigurableOptions’.
state.backend.rocksdb.predefined-options
“DEFAULT”StringThe predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the RocksDBOptionsFactory are applied on top of these predefined ones.

RocksDB Configurable Options

These options give fine-grained control over the behavior and resoures of ColumnFamilies. With the introduction of state.backend.rocksdb.memory.managed and state.backend.rocksdb.memory.fixed-per-slot (Apache Flink 1.10), it should be only necessary to use the options here for advanced performance tuning. These options here can also be specified in the application program via RocksDBStateBackend.setOptions(PptionsFactory).

KeyDefaultTypeDescription
state.backend.rocksdb.block.blocksize
(none)MemorySizeThe approximate size (in bytes) of user data packed per block. RocksDB has default blocksize as ‘4KB’.
state.backend.rocksdb.block.cache-size
(none)MemorySizeThe amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as ‘8MB’.
state.backend.rocksdb.compaction.level.max-size-level-base
(none)MemorySizeThe upper-bound of the total size of level base files in bytes. RocksDB has default configuration as ‘256MB’.
state.backend.rocksdb.compaction.level.target-file-size-base
(none)MemorySizeThe target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as ‘64MB’.
state.backend.rocksdb.compaction.level.use-dynamic-size
(none)BooleanIf true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. RocksDB has default configuration as ‘false’. For more information, please refer to RocksDB’s doc.
state.backend.rocksdb.compaction.style
(none)

Enum

Possible values: [LEVEL, UNIVERSAL, FIFO]
The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO or UNIVERSAL, and RocksDB choose ‘LEVEL’ as default style.
state.backend.rocksdb.files.open
(none)IntegerThe maximum number of open files (per TaskManager) that can be used by the DB, ‘-1’ means no limit. RocksDB has default configuration as ‘-1’.
state.backend.rocksdb.thread.num
(none)IntegerThe maximum number of concurrent background flush and compaction jobs (per TaskManager). RocksDB has default configuration as ‘1’.
state.backend.rocksdb.write-batch-size
2 mbMemorySizeThe max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0.
state.backend.rocksdb.writebuffer.count
(none)IntegerThe maximum number of write buffers that are built up in memory. RocksDB has default configuration as ‘2’.
state.backend.rocksdb.writebuffer.number-to-merge
(none)IntegerThe minimum number of write buffers that will be merged together before writing to storage. RocksDB has default configuration as ‘1’.
state.backend.rocksdb.writebuffer.size
(none)MemorySizeThe amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as ‘64MB’.

Advanced Fault Tolerance Options

These parameters can help with problems related to failover and to components erroneously considering each other as failed.

KeyDefaultTypeDescription
cluster.io-pool.size
(none)IntegerThe size of the IO executor pool used by the cluster to execute blocking IO operations (Master as well as TaskManager processes). By default it will use 4 * the number of CPU cores (hardware contexts) that the cluster process has access to. Increasing the pool size allows to run more IO operations concurrently.
cluster.registration.error-delay
10000LongThe pause made after an registration attempt caused an exception (other than timeout) in milliseconds.
cluster.registration.initial-timeout
100LongInitial registration timeout between cluster components in milliseconds.
cluster.registration.max-timeout
30000LongMaximum registration timeout between cluster components in milliseconds.
cluster.registration.refused-registration-delay
30000LongThe pause made after the registration attempt was refused in milliseconds.
cluster.services.shutdown-timeout
30000LongThe shutdown timeout for cluster services like executors in milliseconds.
heartbeat.interval
10000LongTime interval for requesting heartbeat from sender side.
heartbeat.timeout
50000LongTimeout for requesting and receiving heartbeat for both sender and receiver sides.
jobmanager.execution.failover-strategy
“region”StringThis option specifies how the job computation recovers from task failures. Accepted values are:
  • ‘full’: Restarts all tasks to recover the job.
  • ‘region’: Restarts all tasks that could be affected by the task failure. More details can be found here.

Advanced Scheduling Options

These parameters can help with fine-tuning scheduling for specific situations.

KeyDefaultTypeDescription
cluster.evenly-spread-out-slots
falseBooleanEnable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors.
slot.idle.timeout
50000LongThe timeout in milliseconds for a idle slot in Slot Pool.
slot.request.timeout
300000LongThe timeout in milliseconds for requesting a slot from Slot Pool.
slotmanager.number-of-slots.max
2147483647IntegerDefines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.

Advanced High-availability Options

KeyDefaultTypeDescription
high-availability.jobmanager.port
“0”StringThe port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of ‘jobmanager.rpc.port’.A value of ‘0’ means that a random free port is chosen. TaskManagers discover this port through the high-availability services (leader election), so a random port or a port range works without requiring any additional means of service discovery.

Advanced High-availability ZooKeeper Options

KeyDefaultTypeDescription
high-availability.zookeeper.client.acl
“open”StringDefines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).
high-availability.zookeeper.client.connection-timeout
15000IntegerDefines the connection timeout for ZooKeeper in ms.
high-availability.zookeeper.client.max-retry-attempts
3IntegerDefines the number of connection retries before the client gives up.
high-availability.zookeeper.client.retry-wait
5000IntegerDefines the pause between consecutive retries in ms.
high-availability.zookeeper.client.session-timeout
60000IntegerDefines the session timeout for the ZooKeeper session in ms.
high-availability.zookeeper.path.checkpoint-counter
“/checkpoint-counter”StringZooKeeper root path (ZNode) for checkpoint counters.
high-availability.zookeeper.path.checkpoints
“/checkpoints”StringZooKeeper root path (ZNode) for completed checkpoints.
high-availability.zookeeper.path.jobgraphs
“/jobgraphs”StringZooKeeper root path (ZNode) for job graphs
high-availability.zookeeper.path.latch
“/leaderlatch”StringDefines the znode of the leader latch which is used to elect the leader.
high-availability.zookeeper.path.leader
“/leader”StringDefines the znode of the leader which contains the URL to the leader and the current leader session ID.
high-availability.zookeeper.path.mesos-workers
“/mesos-workers”StringThe ZooKeeper root path for persisting the Mesos worker information.
high-availability.zookeeper.path.running-registry
“/running_job_registry/“String

Advanced SSL Security Options

KeyDefaultTypeDescription
security.ssl.internal.close-notify-flush-timeout
-1IntegerThe timeout (in ms) for flushing the close_notify that was triggered by closing a channel. If the close_notify was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)
security.ssl.internal.handshake-timeout
-1IntegerThe timeout (in ms) during SSL handshake. (-1 = use system default)
security.ssl.internal.session-cache-size
-1IntegerThe size of the cache used for storing SSL session objects. According to https://github.com/netty/netty/issues/832, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).
security.ssl.internal.session-timeout
-1IntegerThe timeout (in ms) for the cached SSL session objects. (-1 = use system default)
security.ssl.provider
“JDK”StringThe SSL engine provider to use for the ssl transport:
  • JDK: default Java-based SSL engine
  • OPENSSL: openSSL-based SSL engine using system libraries
OPENSSL is based on netty-tcnative and comes in two flavours:
  • dynamically linked: This will use your system’s openSSL libraries (if compatible) and requires opt/flink-shaded-netty-tcnative-dynamic-*.jar to be copied to lib/
  • statically linked: Due to potential licensing issues with openSSL (see LEGAL-393), we cannot ship pre-built libraries. However, you can build the required library yourself and put it into lib/:
    git clone https://github.com/apache/flink-shaded.git &amp;&amp; cd flink-shaded &amp;&amp; mvn clean package -Pinclude-netty-tcnative-static -pl flink-shaded-netty-tcnative-static

Advanced Options for the REST endpoint and Client

KeyDefaultTypeDescription
rest.await-leader-timeout
30000LongThe time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint
rest.client.max-content-length
104857600IntegerThe maximum content length in bytes that the client will handle.
rest.connection-timeout
15000LongThe maximum time in ms for the client to establish a TCP connection.
rest.idleness-timeout
300000LongThe maximum time in ms for a connection to stay idle before failing.
rest.retry.delay
3000LongThe time in ms that the client waits between retries (See also rest.retry.max-attempts).
rest.retry.max-attempts
20IntegerThe number of retries the client will attempt if a retryable operations fails.
rest.server.max-content-length
104857600IntegerThe maximum content length in bytes that the server will handle.
rest.server.numThreads
4IntegerThe number of threads for the asynchronous processing of requests.
rest.server.thread-priority
5IntegerThread priority of the REST server’s executor for processing asynchronous requests. Lowering the thread priority will give Flink’s main components more CPU time whereas increasing will allocate more time for the REST server’s processing.

Advanced Options for Flink Web UI

KeyDefaultTypeDescription
web.access-control-allow-origin
“*”StringAccess-Control-Allow-Origin header for all responses from the web-frontend.
web.backpressure.cleanup-interval
600000IntegerTime, in milliseconds, after which cached stats are cleaned up if not accessed.
web.backpressure.delay-between-samples
50IntegerDelay between samples to determine back pressure in milliseconds.
web.backpressure.num-samples
100IntegerNumber of samples to take to determine back pressure.
web.backpressure.refresh-interval
60000IntegerTime, in milliseconds, after which available stats are deprecated and need to be refreshed (by resampling).
web.checkpoints.history
10IntegerNumber of checkpoints to remember for recent history.
web.history
5IntegerNumber of archived jobs for the JobManager.
web.log.path
(none)StringPath to the log file (may be in /log for standalone but under log directory when using YARN).
web.refresh-interval
3000LongRefresh interval for the web-frontend in milliseconds.
web.submit.enable
trueBooleanFlag indicating whether jobs can be uploaded and run from the web-frontend.
web.timeout
600000LongTimeout for asynchronous operations by the web monitor in milliseconds.
web.tmpdir
System.getProperty(“java.io.tmpdir”)StringFlink web directory which is used by the webmonitor.
web.upload.dir
(none)StringDirectory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.

Full JobManager Options

JobManager

KeyDefaultTypeDescription
jobmanager.archive.fs.dir
(none)StringDictionary for JobManager to store the archives of completed jobs.
jobmanager.execution.attempts-history-size
16IntegerThe maximum number of prior execution attempts kept in history.
jobmanager.execution.failover-strategy
“region”StringThis option specifies how the job computation recovers from task failures. Accepted values are:
  • ‘full’: Restarts all tasks to recover the job.
  • ‘region’: Restarts all tasks that could be affected by the task failure. More details can be found here.
jobmanager.rpc.address
(none)StringThe config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.port
6123IntegerThe config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobstore.cache-size
52428800LongThe job store cache size in bytes which is used to keep completed jobs in memory.
jobstore.expiration-time
3600LongThe time in seconds after which a completed job expires and is purged from the job store.
jobstore.max-capacity
2147483647IntegerThe max number of completed jobs that can be kept in the job store.

Blob Server

The Blob Server is a component in the JobManager. It is used for distribution of objects that are too large to be attached to a RPC message and that benefit from caching (like Jar files or large serialized code objects).

KeyDefaultTypeDescription
blob.client.connect.timeout
0IntegerThe connection timeout in milliseconds for the blob client.
blob.client.socket.timeout
300000IntegerThe socket timeout in milliseconds for the blob client.
blob.fetch.backlog
1000IntegerThe config parameter defining the backlog of BLOB fetches on the JobManager.
blob.fetch.num-concurrent
50IntegerThe config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
blob.fetch.retries
5IntegerThe config parameter defining number of retires for failed BLOB fetches.
blob.offload.minsize
1048576IntegerThe minimum size for messages to be offloaded to the BlobServer.
blob.server.port
“0”StringThe config parameter defining the server port of the blob service.
blob.service.cleanup.interval
3600LongCleanup interval of the blob caches at the task managers (in seconds).
blob.service.ssl.enabled
trueBooleanFlag to override ssl support for the blob service transport.
blob.storage.directory
(none)StringThe config parameter defining the storage directory to be used by the blob server.

ResourceManager

These configuration keys control basic Resource Manager behavior, independent of the used resource orchestration management framework (YARN, Mesos, etc.)

KeyDefaultTypeDescription
resourcemanager.job.timeout
“5 minutes”StringTimeout for jobs which don’t have a job manager as leader assigned.
resourcemanager.rpc.port
0IntegerDefines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.
resourcemanager.standalone.start-up-time
-1LongTime in milliseconds of the start-up period of a standalone cluster. During this time, resource manager of the standalone cluster expects new task executors to be registered, and will not fail slot requests that can not be satisfied by any current registered slots. After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered slots. If not set, ‘slotmanager.request-timeout’ will be used by default.
resourcemanager.taskmanager-timeout
30000LongThe timeout for an idle task manager to be released.
slotmanager.number-of-slots.max
2147483647IntegerDefines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.

Full TaskManagerOptions

KeyDefaultTypeDescription
task.cancellation.interval
30000LongTime interval between two successive task cancellation attempts in milliseconds.
task.cancellation.timeout
180000LongTimeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog.
task.cancellation.timers.timeout
7500LongTime we wait for the timers in milliseconds to finish all pending timer threads when the stream task is cancelled.
taskmanager.data.port
0IntegerThe task manager’s external port used for data exchange operations.
taskmanager.data.ssl.enabled
trueBooleanEnable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true
taskmanager.debug.memory.log
falseBooleanFlag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
taskmanager.debug.memory.log-interval
5000LongThe interval (in ms) for the log thread to log the current memory usage.
taskmanager.host
(none)StringThe external address of the network interface where the TaskManager is exposed. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.
taskmanager.jvm-exit-on-oom
falseBooleanWhether to kill the TaskManager when the task thread throws an OutOfMemoryError.
taskmanager.memory.segment-size
32 kbMemorySizeSize of memory buffers used by the network stack and the memory manager.
taskmanager.network.bind-policy
“ip”StringThe automatic address binding policy used by the TaskManager if “taskmanager.host” is not set. The value should be one of the following:
  • “name” - uses hostname as binding address
  • “ip” - uses host’s ip address as binding address
taskmanager.numberOfTaskSlots
1IntegerThe number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.registration.timeout
5 minDurationDefines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.
taskmanager.rpc.port
“0”StringThe external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

Data Transport Network Stack

These options are for the network stack that handles the streaming and batch data exchanges between TaskManagers.

KeyDefaultTypeDescription
taskmanager.network.blocking-shuffle.compression.enabled
falseBooleanBoolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when data compression ratio is high. Currently, shuffle data compression is an experimental feature and the config option can be changed in the future.
taskmanager.network.blocking-shuffle.type
“file”StringThe blocking shuffle type, either “mmap” or “file”. The “auto” means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.
taskmanager.network.detailed-metrics
falseBooleanBoolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.
taskmanager.network.memory.buffers-per-channel
2IntegerNumber of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.
taskmanager.network.memory.floating-buffers-per-gate
8IntegerNumber of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.
taskmanager.network.memory.max-buffers-per-channel
10IntegerNumber of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer producing large amount of data.
taskmanager.network.netty.client.connectTimeoutSec
120IntegerThe Netty client connection timeout.
taskmanager.network.netty.client.numThreads
-1IntegerThe number of Netty client threads.
taskmanager.network.netty.num-arenas
-1IntegerThe number of Netty arenas.
taskmanager.network.netty.sendReceiveBufferSize
0IntegerThe Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.
taskmanager.network.netty.server.backlog
0IntegerThe netty server connection backlog.
taskmanager.network.netty.server.numThreads
-1IntegerThe number of Netty server threads.
taskmanager.network.netty.transport
“auto”StringThe Netty transport type, either “nio” or “epoll”. The “auto” means selecting the property mode automatically based on the platform. Note that the “epoll” mode can get better performance, less GC and have more advanced features which are only available on modern Linux.
taskmanager.network.request-backoff.initial
100IntegerMinimum backoff in milliseconds for partition requests of input channels.
taskmanager.network.request-backoff.max
10000IntegerMaximum backoff in milliseconds for partition requests of input channels.

RPC / Akka

Flink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.

KeyDefaultTypeDescription
akka.ask.callstack
trueBooleanIf true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.
akka.ask.timeout
“10 s”StringTimeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).
akka.client-socket-worker-pool.pool-size-factor
1.0DoubleThe pool size factor is used to determine thread pool size using the following formula: ceil(available processors factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
akka.client-socket-worker-pool.pool-size-max
2IntegerMax number of threads to cap factor-based number to.
akka.client-socket-worker-pool.pool-size-min
1IntegerMin number of threads to cap factor-based number to.
akka.fork-join-executor.parallelism-factor
2.0DoubleThe parallelism factor is used to determine thread pool size using the following formula: ceil(available processors factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.
akka.fork-join-executor.parallelism-max
64IntegerMax number of threads to cap factor-based parallelism number to.
akka.fork-join-executor.parallelism-min
8IntegerMin number of threads to cap factor-based parallelism number to.
akka.framesize
“10485760b”StringMaximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.
akka.jvm-exit-on-fatal-error
trueBooleanExit JVM on fatal Akka errors.
akka.log.lifecycle.events
falseBooleanTurns on the Akka’s remote logging of events. Set this value to ‘true’ in case of debugging.
akka.lookup.timeout
“10 s”StringTimeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).
akka.retry-gate-closed-for
50LongMilliseconds a gate should be closed for after a remote connection was disconnected.
akka.server-socket-worker-pool.pool-size-factor
1.0DoubleThe pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
akka.server-socket-worker-pool.pool-size-max
2IntegerMax number of threads to cap factor-based number to.
akka.server-socket-worker-pool.pool-size-min
1IntegerMin number of threads to cap factor-based number to.
akka.ssl.enabled
trueBooleanTurns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.
akka.startup-timeout
(none)StringTimeout after which the startup of a remote component is considered being failed.
akka.tcp.timeout
“20 s”StringTimeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value.
akka.throughput
15IntegerNumber of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.
akka.transport.heartbeat.interval
“1000 s”StringHeartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.heartbeat.pause
“6000 s”StringAcceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.threshold
300.0DoubleThreshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.


JVM and Logging Options

KeyDefaultTypeDescription
env.hadoop.conf.dir
(none)StringPath to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.
env.java.opts
(none)StringJava options to start the JVM of all Flink processes with.
env.java.opts.client
(none)StringJava options to start the JVM of the Flink Client with.
env.java.opts.historyserver
(none)StringJava options to start the JVM of the HistoryServer with.
env.java.opts.jobmanager
(none)StringJava options to start the JVM of the JobManager with.
env.java.opts.taskmanager
(none)StringJava options to start the JVM of the TaskManager with.
env.log.dir
(none)StringDefines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home)
env.log.max
5IntegerThe maximum number of old log files to keep.
env.ssh.opts
(none)StringAdditional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).
env.yarn.conf.dir
(none)StringPath to yarn configuration directory. It is required to run flink on YARN. You can also set it via environment variable.

Forwarding Environment Variables

You can configure environment variables to be set on the JobManager and TaskManager processes started on Yarn/Mesos.

  • containerized.master.env.: Prefix for passing custom environment variables to Flink’s JobManager process. For example for passing LD_LIBRARY_PATH as an env variable to the JobManager, set containerized.master.env.LD_LIBRARY_PATH: “/usr/lib/native” in the flink-conf.yaml.

  • containerized.taskmanager.env.: Similar to the above, this configuration prefix allows setting custom environment variables for the workers (TaskManagers).



Deprecated Options

These options relate to parts of Flink that are not actively developed any more. These options may be removed in a future release.

DataSet API Optimizer

KeyDefaultTypeDescription
compiler.delimited-informat.max-line-samples
10IntegerThe maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.max-sample-len
2097152IntegerThe maximal length of a line sample that the compiler takes for delimited inputs. If the length of a single sample exceeds this value (possible because of misconfiguration of the parser), the sampling aborts. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.min-line-samples
2IntegerThe minimum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters

DataSet API Runtime Algorithms

KeyDefaultTypeDescription
taskmanager.runtime.hashjoin-bloom-filters
falseBooleanFlag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.
taskmanager.runtime.max-fan
128IntegerThe maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.
taskmanager.runtime.sort-spilling-threshold
0.8FloatA sort operation starts spilling when this fraction of its memory budget is full.

DataSet File Sinks

KeyDefaultTypeDescription
fs.output.always-create-directory
falseBooleanFile writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to “true”, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to “false”, the writer will directly create the file directly at the output path, without creating a containing directory.
fs.overwrite-files
falseBooleanSpecifies whether file output writers should overwrite existing files by default. Set to “true” to overwrite by default,”false” otherwise.


Backup

Client

KeyDefaultTypeDescription
client.retry-period
2 sDurationThe interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink’s clients, wherever retry is supported (default 2sec).
client.timeout
1 minDurationTimeout on the client side.

Execution

KeyDefaultTypeDescription
execution.attached
falseBooleanSpecifies if the pipeline is submitted in attached or detached mode.
execution.job-listeners
(none)List<String>Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.
execution.shutdown-on-attached-exit
falseBooleanIf the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.
execution.target
(none)StringThe deployment target for the execution, e.g. “local” for local execution.
KeyDefaultTypeDescription
execution.savepoint.ignore-unclaimed-state
falseBooleanAllow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.
execution.savepoint.path
(none)StringPath to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).
KeyDefaultTypeDescription
execution.buffer-timeout
100 msDurationThe maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:
  • A positive value triggers flushing periodically by that interval
  • 0 triggers flushing after every record thus minimizing latency
  • -1 ms triggers flushing only when the output buffer is full thus maximizing throughput
execution.checkpointing.snapshot-compression
falseBooleanTells if we should use compression for the state snapshot data or not

Pipeline

KeyDefaultTypeDescription
pipeline.auto-generate-uids
trueBooleanWhen auto-generated UIDs are disabled, users are forced to manually specify UIDs on DataStream applications.

It is highly recommended that users specify UIDs before deploying to production since they are used to match state in savepoints to operators in a job. Because auto-generated ID’s are likely to change when modifying a job, specifying custom IDs allow an application to evolve over time without discarding state.
pipeline.auto-type-registration
trueBooleanControls whether Flink is automatically registering all types in the user programs with Kryo.
pipeline.auto-watermark-interval
0 msDurationThe interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.
pipeline.cached-files
(none)List<String>Files to be registered at the distributed cache under the given name. The files will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.

Example:
name:file1,path:file:///tmp/file1;name:file2,path:hdfs:///tmp/file2``
pipeline.classpaths
(none)List<String>A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.
pipeline.closure-cleaner-level
RECURSIVE

Enum

Possible values: [NONE, TOP_LEVEL, RECURSIVE]
Configures the mode in which the closure cleaner works
  • NONE - disables the closure cleaner completely
  • TOP_LEVEL - cleans only the top-level class without recursing into fields
  • RECURSIVE - cleans all the fields recursively
pipeline.default-kryo-serializers
(none)List<String>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers

Example:
class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2
pipeline.force-avro
falseBooleanForces Flink to use the Apache Avro serializer for POJOs.

Important: Make sure to include the flink-avro module.
pipeline.force-kryo
falseBooleanIf enabled, forces TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In some cases this might be preferable. For example, when using interfaces with subclasses that cannot be analyzed as POJO.
pipeline.generic-types
trueBooleanIf the use of generic types is disabled, Flink will throw an UnsupportedOperationException whenever it encounters a data type that would go through Kryo for serialization.

Disabling generic types can be helpful to eagerly find and eliminate the use of types that would go through Kryo serialization during runtime. Rather than checking types individually, using this option will throw exceptions eagerly in the places where generic types are used.

We recommend to use this option only during development and pre-production phases, not during actual production use. The application program and/or the input data may be such that new, previously unseen, types occur at some point. In that case, setting this option would cause the program to fail.
pipeline.global-job-parameters
(none)MapRegister a custom, serializable user configuration object. The configuration can be accessed in operators
pipeline.jars
(none)List<String>A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.
pipeline.max-parallelism
-1IntegerThe program-wide maximum parallelism used for operators which haven’t specified a maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.
pipeline.object-reuse
falseBooleanWhen enabled objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.
pipeline.operator-chaining
trueBooleanOperator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.
pipeline.registered-kryo-types
(none)List<String>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
pipeline.registered-pojo-types
(none)List<String>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
KeyDefaultTypeDescription
pipeline.time-characteristic
ProcessingTime

Enum

Possible values: [ProcessingTime, IngestionTime, EventTime]
The time characteristic for all created streams, e.g., processingtime, event time, or ingestion time.

If you set the characteristic to IngestionTime or EventTime this will set a default watermark update interval of 200 ms. If this is not applicable for your application you should change it using pipeline.auto-watermark-interval.

Checkpointing

KeyDefaultTypeDescription
execution.checkpointing.externalized-checkpoint-retention
(none)

Enum

Possible values: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status JobStatus#FAILED or JobStatus#SUSPENDED. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.

The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).

The target directory for externalized checkpoints is configured via state.checkpoints.dir.
execution.checkpointing.interval
(none)DurationGets the interval in which checkpoints are periodically scheduled.

This setting defines the base interval. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints and execution.checkpointing.min-pause
execution.checkpointing.max-concurrent-checkpoints
1IntegerThe maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.
execution.checkpointing.min-pause
0 msDurationThe minimal pause between checkpointing attempts. This setting defines how soon thecheckpoint coordinator may trigger another checkpoint after it becomes possible to triggeranother checkpoint with respect to the maximum number of concurrent checkpoints(see execution.checkpointing.max-concurrent-checkpoints).

If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.
execution.checkpointing.mode
EXACTLY_ONCE

Enum

Possible values: [EXACTLY_ONCE, AT_LEAST_ONCE]
The checkpointing mode (exactly-once vs. at-least-once).
execution.checkpointing.prefer-checkpoint-for-recovery
falseBooleanIf enabled, a job recovery should fallback to checkpoint when there is a more recent savepoint.
execution.checkpointing.timeout
10 minDurationThe maximum time that a checkpoint may take before being discarded.
execution.checkpointing.tolerable-failed-checkpoints
(none)IntegerThe tolerable checkpoint failure number. If set to 0, that meanswe do not tolerance any checkpoint failure.
execution.checkpointing.unaligned
falseBooleanEnables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.

Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.

Unaligned checkpoints can only be enabled if execution.checkpointing.mode is EXACTLY_ONCE and if execution.checkpointing.max-concurrent-checkpoints is 1

Back to top