Checkpoints

Overview

Checkpoints make state in Flink fault tolerant by allowing state and thecorresponding stream positions to be recovered, thereby giving the applicationthe same semantics as a failure-free execution.

See Checkpointing for how to enable andconfigure checkpoints for your program.

Retained Checkpoints

Checkpoints are by default not retained and are only used to resume ajob from failures. They are deleted when a program is cancelled.You can, however, configure periodic checkpoints to be retained.Depending on the configuration these retained checkpoints are _not_automatically cleaned up when the job fails or is canceled.This way, you will have a checkpoint around to resume from if your job fails.

  1. CheckpointConfig config = env.getCheckpointConfig();
  2. config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

The ExternalizedCheckpointCleanup mode configures what happens with checkpoints when you cancel the job:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.

  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.

Directory Structure

Similarly to savepoints, a checkpoint consistsof a meta data file and, depending on the state backend, some additional datafiles. The meta data file and data files are stored in the directory that isconfigured via state.checkpoints.dir in the configuration files, and also can be specified for per job in the code.

Configure globally via configuration files

  1. state.checkpoints.dir: hdfs:///checkpoints/

Configure for per job when constructing the state backend

  1. env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));

Difference to Savepoints

Checkpoints have a few differences from savepoints. They

  • use a state backend specific (low-level) data format, may be incremental.
  • do not support Flink specific features like rescaling.

Resuming from a retained checkpoint

A job may be resumed from a checkpoint just as from a savepointby using the checkpoint’s meta data file instead (see thesavepoint restore guide). Note that if themeta data file is not self-contained, the jobmanager needs to have access tothe data files it refers to (see Directory Structureabove).

  1. $ bin/flink run -s :checkpointMetaDataPath [:runArgs]