Flink Connectors

The source-sink I/O module allows you to plug in existing, or custom, Flink connectors that are not already integrated into a dedicated I/O module. For details details of how to build a custom connector see the official Apache Flink documentation.

Dependency

To use a custom Flink connector, please include the following dependency in your pom.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>statefun-flink-io</artifactId>
  4. <version>2.1.0</version>
  5. <scope>provided</scope>
  6. </dependency>

Source Spec

A source function spec creates an ingress from a Flink source function.

  1. package org.apache.flink.statefun.docs.io.flink;
  2. import java.util.Map;
  3. import org.apache.flink.statefun.docs.models.User;
  4. import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
  5. import org.apache.flink.statefun.sdk.io.IngressIdentifier;
  6. import org.apache.flink.statefun.sdk.io.IngressSpec;
  7. import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
  8. public class ModuleWithSourceSpec implements StatefulFunctionModule {
  9. @Override
  10. public void configure(Map<String, String> globalConfiguration, Binder binder) {
  11. IngressIdentifier<User> id = new IngressIdentifier<>(User.class, "example", "users");
  12. IngressSpec<User> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
  13. binder.bindIngress(spec);
  14. }
  15. }

Sink Spec

A sink function spec creates an egress from a Flink sink function.

  1. package org.apache.flink.statefun.docs.io.flink;
  2. import java.util.Map;
  3. import org.apache.flink.statefun.docs.models.User;
  4. import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
  5. import org.apache.flink.statefun.sdk.io.EgressIdentifier;
  6. import org.apache.flink.statefun.sdk.io.EgressSpec;
  7. import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
  8. public class ModuleWithSinkSpec implements StatefulFunctionModule {
  9. @Override
  10. public void configure(Map<String, String> globalConfiguration, Binder binder) {
  11. EgressIdentifier<User> id = new EgressIdentifier<>("example", "user", User.class);
  12. EgressSpec<User> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
  13. binder.bindEgress(spec);
  14. }
  15. }