SDK for Flink DataStream Integration

This SDK may be used if you want your Stateful Functions application to consume events from, or output events to Flink DataStreams. Using this SDK, you may combine pipelines written with the Flink DataStream API or higher-level libraries (such as Table API, CEP etc., basically anything that produces a DataStream) with the programming constructs provided by Stateful Functions to build complex streaming applications.

To use this, add the Flink DataStream Integration SDK as a dependency to your application:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>statefun-flink-datastream</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>

SDK Overview

The following sections covers the important parts on getting started with the SDK. For the full code and working example, please take a look at this example.

Preparing a DataStream Ingress

All DataStreams used as ingresses must contain stream elements of type RoutableMessage. Each RoutableMessage carries information about the target function’s address alongside the input event payload.

You can use the RoutableMessageBuilder to transform your DataStreams:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> names = env.addSource(...)
  3. DataStream<RoutableMessage> namesIngress =
  4. names.map(name ->
  5. RoutableMessageBuilder.builder()
  6. .withTargetAddress(new FunctionType("example", "greet"), name)
  7. .withMessageBody(name)
  8. .build()
  9. );

In the above example, we transformed a DataStream<String> into a DataStream<RoutableMessage> by mapping element in the original stream to a RoutableMessage, with each element targeted for the function type (example:greet).

Binding Functions, Ingresses, and Egresses

Once you have transformed your stream ingresses, you may start binding functions to consume the stream events, as well as DataStream egresses to produce the outputs to:

  1. FunctionType GREET = new FunctionType("example", "greet");
  2. FunctionType REMOTE_GREET = new FunctionType("example", "remote-greet");
  3. EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStream<String> names = env.addSource(...);
  6. DataStream<RoutableMessage> namesIngress =
  7. names.map(name ->
  8. RoutableMessageBuilder.builder()
  9. .withTargetAddress(GREET, name)
  10. .withMessageBody(name)
  11. .build()
  12. );
  13. StatefulFunctionEgressStreams egresses =
  14. StatefulFunctionDataStreamBuilder.builder("example")
  15. .withDataStreamAsIngress(namesIngress)
  16. .withRequestReplyRemoteFunction(
  17. RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
  18. REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
  19. .withPersistedState("seen_count")
  20. .withMaxRequestDuration(Duration.ofSeconds(15))
  21. .withMaxNumBatchRequests(500))
  22. .withFunctionProvider(GREET, unused -> new MyFunction())
  23. .withEgressId(GREETINGS)
  24. .build(env);

As you can see, instead of binding functions, ingresses, and egresses through modules as you would with a typical Stateful Functions application, you bind them directly to the DataStream job using a StatefulFunctionDataStreamBuilder:

  • Remote functions are bound using the withRequestReplyRemoteFunction method. Specification of the remote function such as service endpoint and various connection configurations can be set using the provided RequestReplyFunctionBuilder.
  • Embedded functions are bind using withFunctionProvider.
  • Egress identifiers used by functions need to be bind with the withEgressId method.

Consuming a DataStream Egress

Finally, you can obtain an egress as a DataStream from the result StatefulFunctionEgressStreams:

  1. EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
  2. StatefulFunctionEgressStreams egresses = ...
  3. DataStream<String> greetingsEgress = egresses.getDataStreamForEgressId(GREETINGS);

The obtained egress DataStream can be further processed as in a typical Flink streaming application.

Configuration

Like a typical Stateful Functions application, configuration specific to Stateful Functions is set through the flink-conf.yaml file, as explained here. You can also overwrite the base settings for each individual job:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
  3. statefunConfig.setGlobalConfiguration("someGlobalConfig", "foobar");
  4. statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
  5. StatefulFunctionEgressStreams egresses =
  6. StatefulFunctionDataStreamBuilder.builder("example")
  7. ...
  8. .withConfiguration(statefunConfig)
  9. .build(env);

Attention: The setFlinkJobName method on StatefulFunctionsConfig does not have effect using this SDK. You need to define the job name as you normally would via Flink’s DataStream API.