Manage Functions

Pulsar Functions 是轻量级计算流程,具有以下特点:

  • 从一个或多个 Pulsar topic 中消费信息
  • 将用户提供的处理逻辑应用于每条信息
  • 将计算结果发布到另一个主题

可以通过以下方法来管理 Functions。

Method说明
Admin CLI(命令行界面)pulsar-admin 工具里的 functions 命令。
REST APIadmin REST API 中的 /admin/v3/functions 端点。
Java Admin APIJava APIPulsarAdmin 对象的 functions 方法。

Function 资源

可以对 functions 执行以下操作。

创建 function

可以在集群模式下使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 创建 Pulsar function(在 Pulsar 集群上部署)。

Admin CLI

REST API

Java Admin API

使用 create 子命令。

示例

  1. $ pulsar-admin functions create \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --inputs test-input-topic \ --output persistent://public/default/test-output-topic \ --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \ --jar /examples/api-examples.jar

POST /admin/v3/functions/:tenant/:namespace/:functionName?version=2.8.0

  1. FunctionConfig functionConfig = new FunctionConfig();functionConfig.setTenant(tenant);functionConfig.setNamespace(namespace);functionConfig.setName(functionName);functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);functionConfig.setParallelism(1);functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);functionConfig.setTopicsPattern(sourceTopicPattern);functionConfig.setSubName(subscriptionName);functionConfig.setAutoAck(true);functionConfig.setOutput(sinkTopic);admin.functions().createFunction(functionConfig, fileName);

更新 function

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来更新已经部署到 Pulsar 集群的 Pulsar function。

Admin CLI

REST Admin API

Java Admin API

使用 update 子命令。

示例

  1. $ pulsar-admin functions update \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --output persistent://public/default/update-output-topic \ # other options

PUT /admin/v3/functions/:tenant/:namespace/:functionName?version=2.8.0

  1. FunctionConfig functionConfig = new FunctionConfig();functionConfig.setTenant(tenant);functionConfig.setNamespace(namespace);functionConfig.setName(functionName);functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);functionConfig.setParallelism(1);functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");UpdateOptions updateOptions = new UpdateOptions();updateOptions.setUpdateAuthData(updateAuthData);admin.functions().updateFunction(functionConfig, userCodeFile, updateOptions);

启动 function 的单个实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来启动已停止的 function 实例( 通过 instance-id)。

Admin CLI

REST API

Java Admin API

使用 start 子命令。

  1. $ pulsar-admin functions start \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --instance-id 1

POST /admin/v3/functions/:tenant/:namespace/:functionName/:instanceId/start?version=2.8.0

  1. admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

启动 function 的所有实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来启动所有已停止的 function 实例。

Admin CLI

REST API

Java

使用 start 子命令。

示例

  1. $ pulsar-admin functions start \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \

POST /admin/v3/functions/:tenant/:namespace/:functionName/start?version=2.8.0

  1. admin.functions().startFunction(tenant, namespace, functionName);

停止 function 的单个实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来停止一个 function 实例(通过 instance-id)。

Admin CLI

REST API

Java Admin API

使用 stop 子命令。

示例

  1. $ pulsar-admin functions stop \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --instance-id 1

POST /admin/v3/functions/:tenant/:namespace/:functionName/:instanceId/stop?version=2.8.0

  1. admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

停止 function 的所有实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来停止 function 的所有实例。

Admin CLI

REST API

Java Admin API

使用 stop 子命令。

示例

  1. $ pulsar-admin functions stop \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \

POST /admin/v3/functions/:tenant/:namespace/:functionName/stop?version=2.8.0

  1. admin.functions().stopFunction(tenant, namespace, functionName);

重启 function 的单个实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来重启单个 function 实例(通过 instance-id)。

Admin CLI

REST API

Java Admin API

使用 restart 子命令。

示例

  1. $ pulsar-admin functions restart \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --instance-id 1

POST /admin/v3/functions/:tenant/:namespace/:functionName/:instanceId/restart?version=2.8.0

  1. admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

重启 function 的所有实例

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 重启 function 的所有实例。

Admin CLI

REST API

Java Admin API

使用 restart 子命令。

示例

  1. $ pulsar-admin functions restart \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \

POST /admin/v3/functions/:tenant/:namespace/:functionName/restart?version=2.8.0

  1. admin.functions().restartFunction(tenant, namespace, functionName);

Function 列表

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 列出指定租户和命名空间下正在运行的所有 Pulsar function。

Admin CLI

REST API

Java Admin API

使用 list子命令。

示例

  1. $ pulsar-admin functions list \ --tenant public \ --namespace default

GET /admin/v3/functions/:tenant/:namespace?version=2.8.0

  1. admin.functions().getFunctions(tenant, namespace);

删除 function

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 删除在 Pulsar 集群上运行的 Pulsar function。

Admin CLI

REST API

Java Admin API

使用 delete 子命令。

示例

  1. $ pulsar-admin functions delete \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)

DELETE /admin/v3/functions/:tenant/:namespace/:functionName?version=2.8.0

  1. admin.functions().deleteFunction(tenant, namespace, functionName);

获取 function 的信息

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来获取某个在集群模式下运行的 Pulsar function 的信息。

Admin CLI

REST API

Java Admin API

使用 get子命令。

示例

  1. $ pulsar-admin functions get \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)

GET /admin/v3/functions/:tenant/:namespace/:functionName?version=2.8.0

  1. admin.functions().getFunction(tenant, namespace, functionName);

获取 function 单个实例的状态

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id 来获取单个 function 实例的当前状态。

Admin CLI

REST API

Java Admin API

使用 status 子命令。

示例

  1. $ pulsar-admin functions status \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)\ --instance-id 1

GET /admin/v3/functions/:tenant/:namespace/:functionName/:instanceId/status?version=2.8.0

  1. admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));

获取 function 所有实例的状态

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前状态。

Admin CLI

REST API

Java Admin API

使用 status 子命令。

示例

  1. $ pulsar-admin functions status \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)

GET /admin/v3/functions/:tenant/:namespace/:functionName/status?version=2.8.0

  1. admin.functions().getFunctionStatus(tenant, namespace, functionName);

获取 function 单个实例的数据

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id 获取单个 function 实例的当前数据。

Admin CLI

REST API

Java Admin API

使用 stats 子命令。

示例

  1. $ pulsar-admin functions stats \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)\ --instance-id 1

GET /admin/v3/functions/:tenant/:namespace/:functionName/:instanceId/stats?version=2.8.0

  1. admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId));

获取 function 所有实例的数据

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前数据。

Admin CLI

REST API

Java Admin API

使用 stats 子命令。

示例

  1. $ pulsar-admin functions stats \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)

GET /admin/v3/functions/:tenant/:namespace/:functionName/stats?version=2.8.0

  1. admin.functions().getFunctionStats(tenant, namespace, functionName);

触发 function

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过触发值触发指定的 Pulsar function。

Admin CLI

REST API

Java Admin API

使用 trigger 子命令。

示例

  1. $ pulsar-admin functions trigger \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)\ --topic(输入 topic 的名称)\ --trigger-value \"hello pulsar\" # or --trigger-file(触发文件的路径)

POST /admin/v3/functions/:tenant/:namespace/:functionName/trigger?version=2.8.0

  1. admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);

为 function 关联状态

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 关联 Pulsar function 的状态。

Admin CLI

REST API

Java Admin API

使用 putstate 子命令。

示例

  1. $ pulsar-admin functions putstate \ --tenant public \ --namespace default \ --namePulsar Functions 的名称)\ --state "{\"key\":\"pulsar\", \"stringValue\":\"hello pulsar\"}"

POST /admin/v3/functions/:tenant/:namespace/:functionName/state/:key?version=2.8.0

  1. TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {};FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);admin.functions().putFunctionState(tenant, namespace, functionName, stateRepr);

获取与 function 关联的状态

可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取与 Pulsar function 关联的状态。

Admin CLI

REST API

Java Admin CLI

使用 querystate 子命令。

示例

  1. $ pulsar-admin functions querystate \ --tenant public \ --namespace default \ --name (Pulsar Functions 的名称) \ --key (状态的键值)

GET /admin/v3/functions/:tenant/:namespace/:functionName/state/:key?version=2.8.0

  1. admin.functions().getFunctionState(tenant, namespace, functionName, key);