SDK for Flink DataStream Integration
This SDK may be used if you want your Stateful Functions application to consume events from, or output events to Flink DataStreams. Using this SDK, you may combine pipelines written with the Flink DataStream API or higher-level libraries (such as Table API, CEP etc., basically anything that produces a DataStream
) with the programming constructs provided by Stateful Functions to build complex streaming applications.
To use this, add the Flink DataStream Integration SDK as a dependency to your application:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-datastream</artifactId>
<version>2.2.0</version>
</dependency>
SDK Overview
The following sections covers the important parts on getting started with the SDK. For the full code and working example, please take a look at this example.
Preparing a DataStream Ingress
All DataStream
s used as ingresses must contain stream elements of type RoutableMessage
. Each RoutableMessage
carries information about the target function’s address alongside the input event payload.
You can use the RoutableMessageBuilder
to transform your DataStream
s:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> names = env.addSource(...)
DataStream<RoutableMessage> namesIngress =
names.map(name ->
RoutableMessageBuilder.builder()
.withTargetAddress(new FunctionType("example", "greet"), name)
.withMessageBody(name)
.build()
);
In the above example, we transformed a DataStream<String>
into a DataStream<RoutableMessage>
by mapping element in the original stream to a RoutableMessage
, with each element targeted for the function type (example:greet)
.
Binding Functions, Ingresses, and Egresses
Once you have transformed your stream ingresses, you may start binding functions to consume the stream events, as well as DataStream egresses to produce the outputs to:
FunctionType GREET = new FunctionType("example", "greet");
FunctionType REMOTE_GREET = new FunctionType("example", "remote-greet");
EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> names = env.addSource(...);
DataStream<RoutableMessage> namesIngress =
names.map(name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET, name)
.withMessageBody(name)
.build()
);
StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
.withPersistedState("seen_count")
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500))
.withFunctionProvider(GREET, unused -> new MyFunction())
.withEgressId(GREETINGS)
.build(env);
As you can see, instead of binding functions, ingresses, and egresses through modules as you would with a typical Stateful Functions application, you bind them directly to the DataStream
job using a StatefulFunctionDataStreamBuilder
:
- Remote functions are bound using the
withRequestReplyRemoteFunction
method. Specification of the remote function such as service endpoint and various connection configurations can be set using the providedRequestReplyFunctionBuilder
. - Embedded functions are bind using
withFunctionProvider
. - Egress identifiers used by functions need to be bind with the
withEgressId
method.
Consuming a DataStream Egress
Finally, you can obtain an egress as a DataStream
from the result StatefulFunctionEgressStreams
:
EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
StatefulFunctionEgressStreams egresses = ...
DataStream<String> greetingsEgress = egresses.getDataStreamForEgressId(GREETINGS);
The obtained egress DataStream
can be further processed as in a typical Flink streaming application.
Configuration
Like a typical Stateful Functions application, configuration specific to Stateful Functions is set through the flink-conf.yaml
file, as explained here. You can also overwrite the base settings for each individual job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setGlobalConfiguration("someGlobalConfig", "foobar");
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
...
.withConfiguration(statefunConfig)
.build(env);
Attention: The setFlinkJobName method on StatefulFunctionsConfig does not have effect using this SDK. You need to define the job name as you normally would via Flink’s DataStream API.