存储过程

Flink 允许用户在 Table API 和 SQL 中调用存储过程来完成一些特定任务,比如处理数据,数据管理类任务等。存储过程可以通过 StreamExecutionEnvironment 来运行 Flink 作业,这使得存储过程更加强大和灵活。

开发指南


为了调用一个存储过程,需要确保一个 Catalog 可以提供这个存储过程。为了让一个 Catalog 提供存储过程,你首先需要实现一个存储过程,然后在方法 Catalog.getProcedure(ObjectPath procedurePath) 返回这个存储过程。 下面的步骤将展示如何实现一个存储过程并让一个 Catalog 提供这个存储过程。

存储过程类

存储过程的实现类必须实现接口 org.apache.flink.table.procedures.Procedure

该实现类必须声明为 public, 而不是 abstract, 并且可以被全局访问。不允许使用非静态内部类或匿名类。

Call 方法

存储过程的接口不提供任何方法,存储过程的实现类必须有名为 call 的方法,在该方法里面可以实现存储过程实际的逻辑。call 方法必须被声明为 public, 并且带有一组定义明确的参数。

请注意:

  • call 方法的第一个参数总是应该为 ProcedureContext,该参数提供了方法 getExecutionEnvironment() 来得到当前的 StreamExecutionEnvironment。通过 StreamExecutionEnvironment 可以运行一个 Flink 作业;
  • call 方法的返回类型应该永远都是一个数组类型,比如 int[]String[],等等;

更多的细节请参考类 org.apache.flink.table.procedures.Procedure 的 Java 文档。

常规的 JVM 方法调用语义是适用的,因此可以:

  • 实现重载的方法,例如 call(ProcedureContext, Integer) and call(ProcedureContext, LocalDateTime)
  • 使用变长参数,例如 call(ProcedureContext, Integer...)
  • 使用对象继承,例如 call(ProcedureContext, Object) 可接受 LocalDateTimeInteger 作为参数;
  • 也可组合使用,例如 call(ProcedureContext, Object...) 可接受所有类型的参数;

如果你希望用 Scala 来实现一个存储过程,对应可变长参数的情况,请添加 scala.annotation.varargs。另外,推荐使用装箱的基本类型(比如,使用 java.lang.Integer 而不是 Int)来支持 NULL

下面的代码片段展示来一个重载存储过程的例子:

Java

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.procedure.ProcedureContext;
  3. import org.apache.flink.table.procedures.Procedure;
  4. // 有多个重载 call 方法的存储过程
  5. public class GenerateSequenceProcedure implements Procedure {
  6. public long[] call(ProcedureContext context, int n) {
  7. return generate(context.getExecutionEnvironment(), n);
  8. }
  9. public long[] call(ProcedureContext context, String n) {
  10. return generate(context.getExecutionEnvironment(), Integer.parseInt(n));
  11. }
  12. private long[] generate(StreamExecutionEnvironment env, int n) throws Exception {
  13. long[] sequenceN = new long[n];
  14. int i = 0;
  15. try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) {
  16. while (result.hasNext()) {
  17. sequenceN[i++] = result.next();
  18. }
  19. }
  20. return sequenceN;
  21. }
  22. }

Scala

  1. import org.apache.flink.table.procedure.ProcedureContext
  2. import org.apache.flink.table.procedures.Procedure
  3. import scala.annotation.varargs
  4. // 有多个重载 call 方法的存储过程
  5. class GenerateSequenceProcedure extends Procedure {
  6. def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
  7. Array(a + b)
  8. }
  9. def call(context: ProcedureContext, a: String, b: String): Array[Integer] = {
  10. Array(Integer.valueOf(a) + Integer.valueOf(b))
  11. }
  12. @varargs // 类似 Java 的变长参数
  13. def call(context: ProcedureContext, d: Double*): Array[Integer] = {
  14. Array(d.sum.toInt)
  15. }
  16. }

类型推导

Table(类似于 SQL 标准)是一种强类型的 API。 因此,存储过程的参数和返回类型都必须映射到 data type

从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用存储过程时需要知道如何将内部数据结构表示为 JVM 对象。

术语 类型推导 概括了意在验证输入值、推导出参数/返回值数据类型的逻辑。

Flink 存储过程实现了自动的类型推导提取,通过反射从存储过程的类及其 call 方法中推导数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 @DataTypeHint@ProcedureHint 注解相关参数、类或方法来支持提取存储过程的参数和返回类型,下面展示了有关如何注解存储过程的例子。

需要注意的是虽然存储过程的 call 方法必须返回数组类型 T[],但是如果用 @DataTypeHint 来注解返回类型,实际上注解的是该数组的元素的类型,即 T

自动类型推导

自动类型推导会检查存储过程的类和 call 方法,推导出存储过程参数和结果的数据类型, @DataTypeHint@ProcedurenHint 注解支持自动类型推导。

有关可以隐式映射到数据类型的类的完整列表, 请参阅data type extraction section

@DataTypeHint

在许多情况下,需要支持以 内联 方式自动提取出存储过程参数、返回值的类型。

以下例子展示了如何使用 @DataTypeHint,详情可参考该注解类的文档。

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.InputGroup;
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure;
  5. import org.apache.flink.types.Row;
  6. // 有多个重载 call 方法的存储过程
  7. public static class OverloadedProcedure implements Procedure {
  8. // 不需要 hint
  9. public Long[] call(ProcedureContext context, long a, long b) {
  10. return new Long[] {a + b};
  11. }
  12. // 定义 decimal 的精度和小数位
  13. public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext context, double a, double b) {
  14. return new BigDecimal[] {BigDecimal.valueOf(a + b)};
  15. }
  16. // 定义嵌套数据类型
  17. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  18. public Row[] call(ProcedureContext context, int i) {
  19. return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))};
  20. }
  21. // 允许任意类型的输入,并输出序列化定制后的值
  22. @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  23. public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
  24. return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)};
  25. }
  26. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.InputGroup
  3. import org.apache.flink.table.functions.ScalarFunction
  4. import org.apache.flink.types.Row
  5. import scala.annotation.varargs
  6. import org.apache.flink.table.annotation.DataTypeHint
  7. import org.apache.flink.table.annotation.InputGroup
  8. import org.apache.flink.table.procedure.ProcedureContext
  9. import org.apache.flink.table.procedures.Procedure
  10. import org.apache.flink.types.Row
  11. // 有多个重载 call 方法的存储过程
  12. class OverloadedProcedure extends Procedure {
  13. // 不需要 hint
  14. def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = {
  15. Array(a + b)
  16. }
  17. // 定义 decimal 的精度和小数位
  18. @DataTypeHint("DECIMAL(12, 3)")
  19. def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] = {
  20. Array(BigDecimal.valueOf(a + b))
  21. }
  22. // 定义嵌套数据类型
  23. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  24. def call(context: ProcedureContext, i: Integer): Array[Row] = {
  25. Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
  26. }
  27. // 允许任意类型的输入,并输出序列化定制后的值
  28. @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
  29. def call(context: ProcedureContext, @DataTypeHint(inputGroup = InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = {
  30. Array[MyUtils.serializeToByteBuffer(o)]
  31. }
  32. }

@ProcedureHint

有时我们希望一种 call 方法可以同时处理多种数据类型,有时又要求对重载的多个 call 方法仅声明一次通用的返回类型。

@ProcedureHint 注解可以提供从入参数据类型到返回数据类型的映射,它可以在整个存储过程类或 call 方法上注解输入和返回的数据类型。可以在类顶部声明一个或多个注解,也可以为类的所有 call 方法分别声明一个或多个注解。所有的 hint 参数都是可选的,如果未定义参数,则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有 call 方法继承。

以下例子展示了如何使用 @ProcedureHint,详情可参考该注解类的文档。

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.ProcedureHint;
  3. import org.apache.flink.table.procedure.ProcedureContext;
  4. import org.apache.flink.table.procedures.Procedure;
  5. import org.apache.flink.types.Row;
  6. // 为存储过程类的所有 call 方法指定同一个返回类型
  7. @ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
  8. public static class OverloadedProcedure implements Procedure {
  9. public Row[] call(ProcedureContext context, int a, int b) {
  10. return new Row[] {Row.of("Sum", a + b)};
  11. }
  12. // 仍然可以重载 call 方法
  13. public Row[] call(ProcedureContext context) {
  14. return new Row[] {Row.of("Empty args", -1)};
  15. }
  16. }
  17. // 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint
  18. @ProcedureHint(
  19. input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  20. output = @DataTypeHint("INT")
  21. )
  22. @ProcedureHint(
  23. input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  24. output = @DataTypeHint("BIGINT")
  25. )
  26. @ProcedureHint(
  27. input = {},
  28. output = @DataTypeHint("BOOLEAN")
  29. )
  30. public static class OverloadedProcedure implements Procedure {
  31. // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用
  32. public Object[] call(ProcedureContext context, Object... o) {
  33. if (o.length == 0) {
  34. return new Object[] {false};
  35. }
  36. return new Object[] {o[0]};
  37. }
  38. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.ProcedureHint
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure
  5. import org.apache.flink.types.Row
  6. import scala.annotation.varargs
  7. // 为存储过程类的所有 call 方法指定同一个返回类型
  8. @ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>"))
  9. class OverloadedFunction extends Procedure {
  10. def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = {
  11. Array(Row.of("Sum", Int.box(a + b)))
  12. }
  13. // 仍然可以重载 call 方法
  14. def call(context: ProcedureContext): Array[Row] = {
  15. Array(Row.of("Empty args", Int.box(-1)))
  16. }
  17. }
  18. // 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint
  19. @ProcedureHint(
  20. input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")),
  21. output = new DataTypeHint("INT")
  22. )
  23. @ProcedureHint(
  24. input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")),
  25. output = new DataTypeHint("BIGINT")
  26. )
  27. @ProcedureHint(
  28. input = Array(),
  29. output = new DataTypeHint("BOOLEAN")
  30. )
  31. class OverloadedProcedure extends Procedure {
  32. // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用
  33. @varargs
  34. def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= {
  35. if (o.length == 0) {
  36. Array(Boolean.box(false))
  37. }
  38. Array(o(0))
  39. }
  40. }

在 Catalog 中返回存储过程

在实现了一个存储过程后,Catalog 可以通过方法 Catalog.getProcedure(ObjectPath procedurePath) 来返回该存储过程,下面的例子展示了如何在 Catalog 中返回存储过程。 另外也可以在 Catalog.listProcedures(String dbName) 方法中列出所有的存储过程。

Java

  1. import org.apache.flink.table.catalog.Catalog;
  2. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  3. import org.apache.flink.table.catalog.ObjectPath;
  4. import org.apache.flink.table.catalog.exceptions.CatalogException;
  5. import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
  6. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  7. import org.apache.flink.table.procedure.ProcedureContext;
  8. import org.apache.flink.table.procedures.Procedure;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. // 有内置 procedure 的 Catalog
  12. public class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
  13. static {
  14. PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  15. }
  16. public CatalogWithBuiltInProcedure(String name) {
  17. super(name);
  18. }
  19. @Override
  20. public List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException {
  21. return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> procedurePath.getDatabaseName().equals(dbName))
  22. .map(ObjectPath::getObjectName).collect(Collectors.toList());
  23. }
  24. @Override
  25. public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
  26. if (PROCEDURE_MAP.containsKey(procedurePath)) {
  27. return PROCEDURE_MAP.get(procedurePath);
  28. } else {
  29. throw new ProcedureNotExistException(getName(), procedurePath);
  30. }
  31. }
  32. }

Scala

  1. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  2. import org.apache.flink.table.catalog.ObjectPath;
  3. import org.apache.flink.table.catalog.exceptions.CatalogException;
  4. import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
  5. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  6. import org.apache.flink.table.procedures.Procedure;
  7. // 有内置 procedure 的 Catalog
  8. class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) {
  9. val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](
  10. ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  11. @throws(classOf[DatabaseNotExistException])
  12. @throws(classOf[CatalogException])
  13. override def listProcedures(dbName: String): List[String] = {
  14. if (!databaseExists(dbName)) {
  15. throw new DatabaseNotExistException(getName, dbName);
  16. }
  17. PROCEDURE_MAP.keySet.filter(procedurePath => procedurePath.getDatabaseName.equals(dbName))
  18. .map(procedurePath => procedurePath.getObjectName).toList
  19. }
  20. @throws(classOf[ProcedureNotExistException])
  21. override def getProcedure(procedurePath: ObjectPath): Procedure = {
  22. if (PROCEDURE_MAP.contains(procedurePath)) {
  23. PROCEDURE_MAP(procedurePath);
  24. } else {
  25. throw new ProcedureNotExistException(getName, procedurePath)
  26. }
  27. }
  28. }

例子

下面的例子展示了如何在一个 Catalog 中提供一个存储过程并且通过 CALL 语句来调用这个存储过程。详情可参考开发指南

Java

  1. import org.apache.flink.table.catalog.GenericInMemoryCatalog;
  2. import org.apache.flink.table.catalog.ObjectPath;
  3. import org.apache.flink.table.catalog.exceptions.CatalogException;
  4. import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
  5. import org.apache.flink.table.procedure.ProcedureContext;
  6. import org.apache.flink.table.procedures.Procedure;
  7. // 首先实现一个存储过程
  8. public static class GenerateSequenceProcedure implements Procedure {
  9. public long[] call(ProcedureContext context, int n) {
  10. long[] sequenceN = new long[n];
  11. int i = 0;
  12. try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) {
  13. while (result.hasNext()) {
  14. sequenceN[i++] = result.next();
  15. }
  16. }
  17. return sequenceN;
  18. }
  19. }
  20. // 自定义一个 Catalog,并返回该存储过程
  21. public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
  22. static {
  23. PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure());
  24. }
  25. // 省略一些方法
  26. // ...
  27. @Override
  28. public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
  29. if (PROCEDURE_MAP.containsKey(procedurePath)) {
  30. return PROCEDURE_MAP.get(procedurePath);
  31. } else {
  32. throw new ProcedureNotExistException(getName(), procedurePath);
  33. }
  34. }
  35. }
  36. TableEnvironment tEnv = TableEnvironment.create(...);
  37. // 注册这个 Catalog
  38. tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure());
  39. // 通过 Call 语句调用该存储过程
  40. tEnv.executeSql("call my_catalog.`system`.generate_n(5)");

Scala

  1. import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
  2. import org.apache.flink.table.catalog.exceptions.{CatalogException, ProcedureNotExistException}
  3. import org.apache.flink.table.procedure.ProcedureContext
  4. import org.apache.flink.table.procedures.Procedure
  5. // 首先实现一个存储过程
  6. class GenerateSequenceProcedure extends Procedure {
  7. def call(context: ProcedureContext, n: Integer): Array[Long] = {
  8. val env = context.getExecutionEnvironment
  9. val sequenceN = Array[Long]
  10. var i = 0;
  11. env.fromSequence(0, n - 1).executeAndCollect()
  12. .forEachRemaining(r => {
  13. sequenceN(i) = r
  14. i = i + 1
  15. })
  16. sequenceN;
  17. }
  18. }
  19. // 然后在一个自定义的 catalog 返回该 procedure
  20. class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) {
  21. val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](ObjectPath.fromString("system.generate_n"),
  22. new GenerateSequenceProcedure());
  23. // 省略一些方法
  24. // ...
  25. @throws(classOf[ProcedureNotExistException])
  26. override def getProcedure(procedurePath: ObjectPath): Procedure = {
  27. if (PROCEDURE_MAP.contains(procedurePath)) {
  28. PROCEDURE_MAP(procedurePath);
  29. } else {
  30. throw new ProcedureNotExistException(getName, procedurePath)
  31. }
  32. }
  33. }
  34. TableEnvironment tEnv = TableEnvironment.create(...)
  35. // 注册该 catalog
  36. tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure())
  37. // 通过 Call 语句调用该存储过程
  38. tEnv.executeSql("call my_catalog.`system`.generate_n(5)")