Procedures

Flink Table API & SQL empowers users to perform data manipulation and administrative tasks with procedures. Procedures can run FLINK jobs with the provided StreamExecutionEnvironment, making them more powerful and flexible.

Implementation Guide

To call a procedure, it must be available in a catalog. To provide procedures in a catalog, you need to implement the procedure and then return it using the Catalog.getProcedure(ObjectPath procedurePath) method. The following steps will guild you on how to implement and provide a procedure in a catalog.

Procedure Class

An implementation class must implement the interface org.apache.flink.table.procedures.Procedure.

The class must be declared public, not abstract, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed.

Call Methods

The interface doesn’t provide any method,you have to define a method named call in which you can implement the logic of the procedure. The methods must be declared public and take a well-defined set of arguments.

Please note:

  • The first parameter of the method call should always be ProcedureContext which provides the method getExecutionEnvironment to get a StreamExecutionEnvironment for running a Flink Job
  • The return type should always be an array, like int[], String[], etc

More detail can be found in the Java doc of the class org.apache.flink.table.procedures.Procedure.

Regular JVM method calling semantics apply. Therefore, it is possible to:

  • implement overloaded methods such as call(ProcedureContext, Integer) and call(ProcedureContext, LocalDateTime)
  • use var-args such as call(ProcedureContext, Integer...)
  • use object inheritance such as call(ProcedureContext, Object) that takes both LocalDateTime and Integer
  • and combinations of the above such as call(ProcedureContext, Object...) that takes all kinds of arguments

If you intend to implement procedures in Scala, please add the scala.annotation.varargs annotation in case of variable arguments. Furthermore, it is recommended to use boxed primitives (e.g. java.lang.Integer instead of Int) to support NULL.

The following snippets shows an example of an overloaded procedure:

Java

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.procedure.ProcedureContext;
  3. import org.apache.flink.table.procedures.Procedure;
  4. // procedure with overloaded call methods
  5. public class GenerateSequenceProcedure implements Procedure {
  6. public long[] call(ProcedureContext context, int n) {
  7. return generate(context.getExecutionEnvironment(), n);
  8. }
  9. public long[] call(ProcedureContext context, String n) {
  10. return generate(context.getExecutionEnvironment(), Integer.parseInt(n));
  11. }
  12. private long[] generate(StreamExecutionEnvironment env, int n) throws Exception {
  13. long[] sequenceN = new long[n];
  14. int i = 0;
  15. try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) {
  16. while (result.hasNext()) {
  17. sequenceN[i++] = result.next();
  18. }
  19. }
  20. return sequenceN;
  21. }
  22. }

Scala

  1. import org.apache.flink.table.procedure.ProcedureContext
  2. import org.apache.flink.table.procedures.Procedure
  3. import scala.annotation.varargs
  4. // procedures with overloaded call methods
  5. class GenerateSequenceProcedure extends Procedure {
  6. def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
  7. Array(a + b)
  8. }
  9. def call(context: ProcedureContext, a: String, b: String): Array[Integer] = {
  10. Array(Integer.valueOf(a) + Integer.valueOf(b))
  11. }
  12. @varargs // generate var-args like Java
  13. def call(context: ProcedureContext, d: Double*): Array[Integer] = {
  14. Array(d.sum.toInt)
  15. }
  16. }

Type Inference

The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both procedure parameters and return types must be mapped to a data type.

From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a procedure.

The logic for validating input arguments and deriving data types for both the parameters and the result of a procedure is summarized under the term type inference.

Flink’s procedures implement an automatic type inference extraction that derives data types from the procedure’s class and its call methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with @DataTypeHint and @ProcedureHint. More examples on how to annotate procedures are shown below.

Note: although the return type in call method must be array type T[], if use @DataTypeHint to annotate the return type, it’s actually expected to annotate the component type of the array type, which is actually T.

Automatic Type Inference

The automatic type inference inspects the procedure’s class and call methods to derive data types for the arguments and result of a procedure. @DataTypeHint and @ProcedureHint annotations support the automatic extraction.

For a full list of classes that can be implicitly mapped to a data type, please refer to the data type extraction section.

@DataTypeHint

In many scenarios, it is required to support the automatic extraction inline for parameters and return types of a procedure

The following example shows how to use data type hints. More information can be found in the documentation of the annotation class.

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.InputGroup;
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure;
  5. import org.apache.flink.types.Row;
  6. // procedure with overloaded call methods
  7. public static class OverloadedProcedure implements Procedure {
  8. // no hint required
  9. public Long[] call(ProcedureContext context, long a, long b) {
  10. return new Long[] {a + b};
  11. }
  12. // define the precision and scale of a decimal
  13. public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext context, double a, double b) {
  14. return new BigDecimal[] {BigDecimal.valueOf(a + b)};
  15. }
  16. // define a nested data type
  17. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  18. public Row[] call(ProcedureContext context, int i) {
  19. return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))};
  20. }
  21. // allow wildcard input and custom serialized output
  22. @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  23. public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
  24. return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)};
  25. }
  26. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.InputGroup
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure
  5. import org.apache.flink.types.Row
  6. // procedure with overloaded call methods
  7. class OverloadedProcedure extends Procedure {
  8. // no hint required
  9. def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = {
  10. Array(a + b)
  11. }
  12. // define the precision and scale of a decimal
  13. @DataTypeHint("DECIMAL(12, 3)")
  14. def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] = {
  15. Array(BigDecimal.valueOf(a + b))
  16. }
  17. // define a nested data type
  18. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  19. def call(context: ProcedureContext, i: Integer): Array[Row] = {
  20. Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
  21. }
  22. // allow wildcard input and custom serialized output
  23. @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
  24. def call(context: ProcedureContext, @DataTypeHint(inputGroup = InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = {
  25. Array[MyUtils.serializeToByteBuffer(o)]
  26. }
  27. }

@ProcedureHint

In some scenarios, it is desirable that one call method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded call methods have a common result type that should be declared only once.

The @ProcedureHint annotation can provide a mapping from argument data types to a result data type. It enables annotating entire procedure classes or call methods for input and result data types. One or more annotations can be declared on top of a class or individually for each call method for overloading procedure signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on top of a procedure class are inherited by all call methods.

The following example shows how to use procedure hints. More information can be found in the documentation of the annotation class.

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.ProcedureHint;
  3. import org.apache.flink.table.procedure.ProcedureContext;
  4. import org.apache.flink.table.procedures.Procedure;
  5. import org.apache.flink.types.Row;
  6. // procedure with overloaded call methods
  7. // but globally defined output type
  8. @ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
  9. public static class OverloadedProcedure implements Procedure {
  10. public Row[] call(ProcedureContext context, int a, int b) {
  11. return new Row[] {Row.of("Sum", a + b)};
  12. }
  13. // overloading of arguments is still possible
  14. public Row[] call(ProcedureContext context) {
  15. return new Row[] {Row.of("Empty args", -1)};
  16. }
  17. }
  18. // decouples the type inference from call methods,
  19. // the type inference is entirely determined by the procedure hints
  20. @ProcedureHint(
  21. input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  22. output = @DataTypeHint("INT")
  23. )
  24. @ProcedureHint(
  25. input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  26. output = @DataTypeHint("BIGINT")
  27. )
  28. @ProcedureHint(
  29. input = {},
  30. output = @DataTypeHint("BOOLEAN")
  31. )
  32. public static class OverloadedProcedure implements Procedure {
  33. // an implementer just needs to make sure that a method exists
  34. // that can be called by the JVM
  35. public Object[] call(ProcedureContext context, Object... o) {
  36. if (o.length == 0) {
  37. return new Object[] {false};
  38. }
  39. return new Object[] {o[0]};
  40. }
  41. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.ProcedureHint
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure
  5. import org.apache.flink.types.Row
  6. import scala.annotation.varargs
  7. // procedure with overloaded call methods
  8. // but globally defined output type
  9. @ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>"))
  10. class OverloadedFunction extends Procedure {
  11. def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = {
  12. Array(Row.of("Sum", Int.box(a + b)))
  13. }
  14. // overloading of arguments is still possible
  15. def call(context: ProcedureContext): Array[Row] = {
  16. Array(Row.of("Empty args", Int.box(-1)))
  17. }
  18. }
  19. // decouples the type inference from call methods,
  20. // the type inference is entirely determined by the function hints
  21. @ProcedureHint(
  22. input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")),
  23. output = new DataTypeHint("INT")
  24. )
  25. @ProcedureHint(
  26. input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")),
  27. output = new DataTypeHint("BIGINT")
  28. )
  29. @ProcedureHint(
  30. input = Array(),
  31. output = new DataTypeHint("BOOLEAN")
  32. )
  33. class OverloadedProcedure extends Procedure {
  34. // an implementer just needs to make sure that a method exists
  35. // that can be called by the JVM
  36. @varargs
  37. def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= {
  38. if (o.length == 0) {
  39. Array(Boolean.box(false))
  40. }
  41. Array(o(0))
  42. }
  43. }

Return Procedure in Catalog

After implementing a procedure, the catalog can then return the procedure in method Catalog.getProcedure(ObjectPath procedurePath). The following example shows how to return it in a catalog. Also, it’s expected to list all the procedures in method Catalog.listProcedures(String dbName).

Java

  1. import org.apache.flink.table.catalog.Catalog;
  2. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  3. import org.apache.flink.table.catalog.ObjectPath;
  4. import org.apache.flink.table.catalog.exceptions.CatalogException;
  5. import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
  6. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  7. import org.apache.flink.table.procedure.ProcedureContext;
  8. import org.apache.flink.table.procedures.Procedure;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. // catalog with built-in procedures
  12. public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
  13. static {
  14. PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  15. }
  16. public CatalogWithBuiltInProcedure(String name) {
  17. super(name);
  18. }
  19. @Override
  20. public List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException {
  21. if (!databaseExists(dbName)) {
  22. throw new DatabaseNotExistException(getName(), dbName);
  23. }
  24. return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> procedurePath.getDatabaseName().equals(dbName))
  25. .map(ObjectPath::getObjectName).collect(Collectors.toList());
  26. }
  27. @Override
  28. public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
  29. if (PROCEDURE_MAP.containsKey(procedurePath)) {
  30. return PROCEDURE_MAP.get(procedurePath);
  31. } else {
  32. throw new ProcedureNotExistException(getName(), procedurePath);
  33. }
  34. }
  35. }

Scala

  1. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  2. import org.apache.flink.table.catalog.ObjectPath;
  3. import org.apache.flink.table.catalog.exceptions.CatalogException;
  4. import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
  5. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  6. import org.apache.flink.table.procedures.Procedure;
  7. // catalog with built-in procedures
  8. class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) {
  9. val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](
  10. ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  11. @throws(classOf[DatabaseNotExistException])
  12. @throws(classOf[CatalogException])
  13. override def listProcedures(dbName: String): List[String] = {
  14. if (!databaseExists(dbName)) {
  15. throw new DatabaseNotExistException(getName, dbName);
  16. }
  17. PROCEDURE_MAP.keySet.filter(procedurePath => procedurePath.getDatabaseName.equals(dbName))
  18. .map(procedurePath => procedurePath.getObjectName).toList
  19. }
  20. @throws(classOf[ProcedureNotExistException])
  21. override def getProcedure(procedurePath: ObjectPath): Procedure = {
  22. if (PROCEDURE_MAP.contains(procedurePath)) {
  23. PROCEDURE_MAP(procedurePath);
  24. } else {
  25. throw new ProcedureNotExistException(getName, procedurePath)
  26. }
  27. }
  28. }

Examples

The following example shows how to provide a procedure in a Catalog and call it with CALL statement. See the Implementation Guide for more details.

Java

  1. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  2. import org.apache.flink.table.catalog.ObjectPath;
  3. import org.apache.flink.table.catalog.exceptions.CatalogException;
  4. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  5. import org.apache.flink.table.procedure.ProcedureContext;
  6. import org.apache.flink.table.procedures.Procedure;
  7. // first implement a procedure
  8. public class GenerateSequenceProcedure implements Procedure {
  9. public long[] call(ProcedureContext context, int n) {
  10. long[] sequenceN = new long[n];
  11. int i = 0;
  12. try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) {
  13. while (result.hasNext()) {
  14. sequenceN[i++] = result.next();
  15. }
  16. }
  17. return sequenceN;
  18. }
  19. }
  20. // then provide the procedure in a custom catalog
  21. public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
  22. static {
  23. PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  24. }
  25. // emit some methods
  26. // ...
  27. @Override
  28. public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
  29. if (PROCEDURE_MAP.containsKey(procedurePath)) {
  30. return PROCEDURE_MAP.get(procedurePath);
  31. } else {
  32. throw new ProcedureNotExistException(getName(), procedurePath);
  33. }
  34. }
  35. }
  36. TableEnvironment tEnv = TableEnvironment.create(...);
  37. // register the catalog
  38. tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure());
  39. // call the procedure with CALL statement
  40. tEnv.executeSql("call my_catalog.`system`.generate_n(5)");

Scala

  1. import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
  2. import org.apache.flink.table.catalog.exceptions.{CatalogException, ProcedureNotExistException}
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure
  5. // first implement a procedure
  6. class GenerateSequenceProcedure extends Procedure {
  7. def call(context: ProcedureContext, n: Integer): Array[Long] = {
  8. val env = context.getExecutionEnvironment
  9. val sequenceN = Array[Long]
  10. var i = 0;
  11. env.fromSequence(0, n - 1).executeAndCollect()
  12. .forEachRemaining(r => {
  13. sequenceN(i) = r
  14. i = i + 1
  15. })
  16. sequenceN;
  17. }
  18. }
  19. // then provide the procedure in a custom catalog
  20. class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) {
  21. val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](ObjectPath.fromString("system.generate_n"),
  22. new GenerateSequenceProcedure());
  23. // emit some methods
  24. // ...
  25. @throws(classOf[ProcedureNotExistException])
  26. override def getProcedure(procedurePath: ObjectPath): Procedure = {
  27. if (PROCEDURE_MAP.contains(procedurePath)) {
  28. PROCEDURE_MAP(procedurePath);
  29. } else {
  30. throw new ProcedureNotExistException(getName, procedurePath)
  31. }
  32. }
  33. }
  34. TableEnvironment tEnv = TableEnvironment.create(...)
  35. // register the catalog
  36. tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure())
  37. // call the procedure with CALL statement
  38. tEnv.executeSql("call my_catalog.`system`.generate_n(5)")