API 迁移指南

API changes for serializer snapshots

This would be relevant mostly for users implementing custom TypeSerializers for their state.

The old TypeSerializerConfigSnapshot abstraction is now deprecated, and will be fully removed in the futurein favor of the new TypeSerializerSnapshot. For details and guides on how to migrate, please seeMigrating from deprecated serializer snapshot APIs before Flink 1.7.

There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in theirspecific documentations. The following is a consolidated list of API changes and links to details for migration whenupgrading to Flink 1.3.

TypeSerializer interface changes

This would be relevant mostly for users implementing custom TypeSerializers for their state.

Since Flink 1.3, two additional methods have been added that are related to serializer compatibilityacross savepoint restores. Please seeHandling serializer upgrades and compatibilityfor further details on how to implement these methods.

ProcessFunction is always a RichFunction

In Flink 1.2, ProcessFunction and its rich variant RichProcessFunction was introduced.Since Flink 1.3, RichProcessFunction was removed and ProcessFunction is now always a RichFunction with access tothe lifecycle methods and runtime context.

The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.Please visit the CEP Migration docs for details.

In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts arenow clean of specific logger dependencies.

Example and quickstart archetypes already have loggers specified and should not be affected.For other custom projects, make sure to add logger dependencies. For example, in Maven’s pom.xml, you can add:

  1. <dependency>
  2. <groupId>org.slf4j</groupId>
  3. <artifactId>slf4j-log4j12</artifactId>
  4. <version>1.7.7</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>log4j</groupId>
  8. <artifactId>log4j</artifactId>
  9. <version>1.2.17</version>
  10. </dependency>

As mentioned in the State documentation, Flink has two types of state:keyed and non-keyed state (also called operator state). Both types are available toboth operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern thedeprecation of the aligned window operators from Flink 1.1 (see Aligned Processing Time Window Operators).

The migration process will serve two goals:

  • allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,

  • make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by itsFlink 1.1 predecessor.

After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2simply by taking a savepoint with your Flink 1.1 job and giving it toyour Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where itsFlink 1.1 predecessor left off.

Example User Functions

As running examples for the remainder of this document we will use the CountMapper and the BufferingSinkfunctions. The first is an example of a function with keyed state, whilethe second has non-keyed state. The code for the aforementioned two functions in Flink 1.1 is presented below:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
  2. private transient ValueState<Integer> counter;
  3. private final int numberElements;
  4. public CountMapper(int numberElements) {
  5. this.numberElements = numberElements;
  6. }
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. counter = getRuntimeContext().getState(
  10. new ValueStateDescriptor<>("counter", Integer.class, 0));
  11. }
  12. @Override
  13. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  14. int count = counter.value() + 1;
  15. counter.update(count);
  16. if (count % numberElements == 0) {
  17. out.collect(Tuple2.of(value.f0, count));
  18. counter.update(0); // reset to 0
  19. }
  20. }
  21. }
  22. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  23. Checkpointed<ArrayList<Tuple2<String, Integer>>> {
  24. private final int threshold;
  25. private ArrayList<Tuple2<String, Integer>> bufferedElements;
  26. BufferingSink(int threshold) {
  27. this.threshold = threshold;
  28. this.bufferedElements = new ArrayList<>();
  29. }
  30. @Override
  31. public void invoke(Tuple2<String, Integer> value) throws Exception {
  32. bufferedElements.add(value);
  33. if (bufferedElements.size() == threshold) {
  34. for (Tuple2<String, Integer> element: bufferedElements) {
  35. // send it to the sink
  36. }
  37. bufferedElements.clear();
  38. }
  39. }
  40. @Override
  41. public ArrayList<Tuple2<String, Integer>> snapshotState(
  42. long checkpointId, long checkpointTimestamp) throws Exception {
  43. return bufferedElements;
  44. }
  45. @Override
  46. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  47. bufferedElements.addAll(state);
  48. }
  49. }

The CountMapper is a RichFlatMapFunction which assumes a grouped-by-key input stream of the form(word, 1). The function keeps a counter for each incoming key (ValueState<Integer> counter) and ifthe number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emittedcontaining the word itself and the number of occurrences.

The BufferingSink is a SinkFunction that receives elements (potentially the output of the CountMapper)and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.This is a common way to avoid many expensive calls to a database or an external storage system. To do thebuffering in a fault-tolerant manner, the buffered elements are kept in a list (bufferedElements) which isperiodically checkpointed.

State API Migration

To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.After doing these changes, you will be able to change the parallelism of your job (scale up or down) and youare guaranteed that the new version of your job will start from where its predecessor left off.

Keyed State: Something to note before delving into the details of the migration process is that if your functionhas only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full supportfor the new features and full backwards compatibility. Changes could be made just for better code organization,but this is just a matter of style.

With the above said, the rest of this section focuses on the non-keyed state.

Rescaling and new state abstractions

The first modification is the transition from the old Checkpointed<T extends Serializable> state interfaceto the new ones. In Flink 1.2, a stateful function can implement either the more general CheckpointedFunctioninterface, or the ListCheckpointed<T extends Serializable> interface, which is semantically closer to the oldCheckpointed one.

In both cases, the non-keyed state is expected to be a List of serializable objects, independent from each other,thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at whichnon-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the BufferingSinkcontains elements (test1, 2) and (test2, 2), when increasing the parallelism to 2, (test1, 2) may end up in task 0,while (test2, 2) will go to task 1.

More details on the principles behind rescaling of both keyed state and non-keyed state can be found inthe State documentation.

ListCheckpointed

The ListCheckpointed interface requires the implementation of two methods:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

Their semantics are the same as their counterparts in the old Checkpointed interface. The only differenceis that now snapshotState() should return a list of objects to checkpoint, as stated earlier, andrestoreState has to handle this list upon recovery. If the state is not re-partitionable, you can alwaysreturn a Collections.singletonList(MY_STATE) in the snapshotState(). The updated code for BufferingSinkis included below:

  1. public class BufferingSinkListCheckpointed implements
  2. SinkFunction<Tuple2<String, Integer>>,
  3. ListCheckpointed<Tuple2<String, Integer>>,
  4. CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  5. private final int threshold;
  6. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  7. private List<Tuple2<String, Integer>> bufferedElements;
  8. public BufferingSinkListCheckpointed(int threshold) {
  9. this.threshold = threshold;
  10. this.bufferedElements = new ArrayList<>();
  11. }
  12. @Override
  13. public void invoke(Tuple2<String, Integer> value) throws Exception {
  14. this.bufferedElements.add(value);
  15. if (bufferedElements.size() == threshold) {
  16. for (Tuple2<String, Integer> element: bufferedElements) {
  17. // send it to the sink
  18. }
  19. bufferedElements.clear();
  20. }
  21. }
  22. @Override
  23. public List<Tuple2<String, Integer>> snapshotState(
  24. long checkpointId, long timestamp) throws Exception {
  25. return this.bufferedElements;
  26. }
  27. @Override
  28. public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
  29. if (!state.isEmpty()) {
  30. this.bufferedElements.addAll(state);
  31. }
  32. }
  33. @Override
  34. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  35. // this is from the CheckpointedRestoring interface.
  36. this.bufferedElements.addAll(state);
  37. }
  38. }

As shown in the code, the updated function also implements the CheckpointedRestoring interface. This is for backwardscompatibility reasons and more details will be explained at the end of this section.

CheckpointedFunction

The CheckpointedFunction interface requires again the implementation of two methods:

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

As in Flink 1.1, snapshotState() is called whenever a checkpoint is performed, but now initializeState() (which isthe counterpart of the restoreState()) is called every time the user-defined function is initialized, rather than onlyin the case that we are recovering from a failure. Given this, initializeState() is not only the place where differenttypes of state are initialized, but also where state recovery logic is included. An implementation of theCheckpointedFunction interface for BufferingSink is presented below.

  1. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  2. CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  3. private final int threshold;
  4. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  5. private List<Tuple2<String, Integer>> bufferedElements;
  6. public BufferingSink(int threshold) {
  7. this.threshold = threshold;
  8. this.bufferedElements = new ArrayList<>();
  9. }
  10. @Override
  11. public void invoke(Tuple2<String, Integer> value) throws Exception {
  12. bufferedElements.add(value);
  13. if (bufferedElements.size() == threshold) {
  14. for (Tuple2<String, Integer> element: bufferedElements) {
  15. // send it to the sink
  16. }
  17. bufferedElements.clear();
  18. }
  19. }
  20. @Override
  21. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  22. checkpointedState.clear();
  23. for (Tuple2<String, Integer> element : bufferedElements) {
  24. checkpointedState.add(element);
  25. }
  26. }
  27. @Override
  28. public void initializeState(FunctionInitializationContext context) throws Exception {
  29. checkpointedState = context.getOperatorStateStore().
  30. getSerializableListState("buffered-elements");
  31. if (context.isRestored()) {
  32. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  33. bufferedElements.add(element);
  34. }
  35. }
  36. }
  37. @Override
  38. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  39. // this is from the CheckpointedRestoring interface.
  40. this.bufferedElements.addAll(state);
  41. }
  42. }

The initializeState takes as argument a FunctionInitializationContext. This is used to initializethe non-keyed state “container”. This is a container of type ListState where the non-keyed state objectsare going to be stored upon checkpointing:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

After initializing the container, we use the isRestored() method of the context to check if we arerecovering after a failure. If this is true, i.e. we are recovering, the restore logic is applied.

As shown in the code of the modified BufferingSink, this ListState recovered during stateinitialization is kept in a class variable for future use in snapshotState(). There the ListState is clearedof all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. This can be doneusing the FunctionInitializationContext given as argument, instead of the RuntimeContext, which is the casefor Flink 1.1. If the CheckpointedFunction interface was to be used in the CountMapper example,the old open() method could be removed and the new snapshotState() and initializeState() methodswould look like this:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
  2. implements CheckpointedFunction {
  3. private transient ValueState<Integer> counter;
  4. private final int numberElements;
  5. public CountMapper(int numberElements) {
  6. this.numberElements = numberElements;
  7. }
  8. @Override
  9. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  10. int count = counter.value() + 1;
  11. counter.update(count);
  12. if (count % numberElements == 0) {
  13. out.collect(Tuple2.of(value.f0, count));
  14. counter.update(0); // reset to 0
  15. }
  16. }
  17. @Override
  18. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  19. // all managed, nothing to do.
  20. }
  21. @Override
  22. public void initializeState(FunctionInitializationContext context) throws Exception {
  23. counter = context.getKeyedStateStore().getState(
  24. new ValueStateDescriptor<>("counter", Integer.class, 0));
  25. }
  26. }

Notice that the snapshotState() method is empty as Flink itself takes care of snapshotting managed keyed stateupon checkpointing.

So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my alreadyrunning job from Flink 1.1 stopped?”.

The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has toimplement the CheckpointedRestoring interface, as shown in the code above. This has a single method, thefamiliar restoreState() from the old Checkpointed interface from Flink 1.1. As shown in the modified code ofthe BufferingSink, the restoreState() method is identical to its predecessor.

Aligned Processing Time Window Operators

In Flink 1.1, and only when operating on processing time with no specified evictor or trigger,the command timeWindow() on a keyed stream would instantiate a special type of WindowOperator. This could beeither an AggregatingProcessingTimeWindowOperator or an AccumulatingProcessingTimeWindowOperator. Both ofthese operators are referred to as aligned window operators as they assume their input elements arrive inorder. This is valid when operating in processing time, as elements get as timestamp the wall-clock time atthe moment they arrive at the window operator. These operators were restricted to using the memory state backend, andhad optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.

In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the genericWindowOperator. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparentlyread the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a formatthat is compatible with the generic WindowOperator, and resume execution using the generic WindowOperator.

Note Although deprecated, you can still use the aligned window operatorsin Flink 1.2 through special WindowAssigners introduced for exactly this purpose. These assigners are theSlidingAlignedProcessingTimeWindows and the TumblingAlignedProcessingTimeWindows assigners, for sliding and tumblingwindows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way toresume execution from a Flink 1.1 savepoint while using these operators.

Attention The aligned window operators provide no rescaling capabilitiesand no backwards compatibility with Flink 1.1.

The code to use the aligned window operators in Flink 1.2 is presented below:

  1. // for tumbling windows
  2. DataStream<Tuple2<String, Integer>> window1 = source
  3. .keyBy(0)
  4. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  5. .apply(your-function)
  6. // for sliding windows
  7. DataStream<Tuple2<String, Integer>> window1 = source
  8. .keyBy(0)
  9. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  10. .apply(your-function)
  1. // for tumbling windows
  2. val window1 = source
  3. .keyBy(0)
  4. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  5. .apply(your-function)
  6. // for sliding windows
  7. val window2 = source
  8. .keyBy(0)
  9. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  10. .apply(your-function)