要求

需要通过运行 Pulsar 集群来部署和管理 Pulsar Functions。 There are several options for this:

运行一个非 standalone 集群,需要获取集群服务的 URL。 如何获取集群服务的 URL 取决于如何部署 Pulsar 集群。

想要部署并触发 Python 的用户自定义 function,需要在所有运行 functions workers 的设备上安装 the pulsar python client

Command-line interface

Pulsar Functions 使用 pulsar-admin functions 接口进行部署和管理,通过 createcluster mode 下部署 functions;通过 trigger 使用 triggering functions,通过 list 列出已部署的 functions。

了解更多命令,请参阅 pulsar-admin functions

Default arguments

在管理 Pulsar Functions 时,需要指定关于 functions 的各种信息,包括租户、命名空间、输入主题、输出主题等。 但是,在不输入信息时,有些参数会使用默认值。 如下表所示。

Parameter默认值
Function name可以指定类名称(除了org、library 或相似的名称)。 For example, when you specify the flag —classname org.example.MyFunction, the function name is MyFunction.
Tenant来自输入 topic 的名称。 如果输入 topic 在 marketing 租户下,即 topic 名称形式为 persistent://marketing/{namespace}/{topicName},则租户为 marketing
Namespace来自输入 topic 的名称。 如果输入 topic 在 asia 命名空间中,在 marketing 租户下,即主题名称形式为 persistent://marketing/asia/{topicName},则命名空间为 asia
Output topic{input topic}-{function name}-output. 例如,一个 function 的输入 topic 名称是 incoming,function 名称为 exclamation,则输出 topic 名称为 incoming-exclamation-output
Subscription type对于 at-least-onceat-most-once 处理保证,默认使用 SHARED 模式;对于 effectively-once 保证,默认使用 FAILOVER 模式。
Processing guaranteesATLEAST_ONCE
Pulsar service URLpulsar://localhost:6650

Example of default arguments

create 命令为例。

  1. $ bin/pulsar-admin functions create \
  2. --jar my-pulsar-functions.jar \
  3. --classname org.example.MyFunction \
  4. --inputs my-function-input-topic1,my-function-input-topic2

此 function 具有默认值的参数包括:function 名称(MyFunction)、租户(public)、命名空间(default)、订阅类型(SHARED)、处理保证(ATLEAST_ONCE)、Pulsar 服务 URL (pulsar://localhost:6650)。

Local run mode

If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an AWS EC2 instance, and so on). 本地运行命令示例如下。

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

默认情况下,该 function 通过 broker 服务的 URL pulsar://localhost:6650 连接到在同一设备上运行的 Pulsar 集群。 如果想要在本地运行模式下运行一个 function ,并连接到非本地 Pulsar 集群,则可以通过 --brokerServiceUrl 标志来指定不同的 broker URL。 The following is an example.

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster mode

When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your local environment. 您可以使用 create 命令在群集模式下运行函数。

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

在集群模式下更新 function

You can use the update command to update a Pulsar Function running in cluster mode. 以下命令用于更新在集群模式下创建的 function。

  1. $ bin/pulsar-admin functions update \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/new-input-topic \
  5. --output persistent://public/default/new-output-topic

Parallelism

Pulsar Functions run as processes or threads, which are called instances. 在运行 Pulsar Function 时,默认为单个实例。 使用一个本地运行命令只能运行 function 的单个实例。 想要运行多个实例则需要多次使用本地运行命令。

When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the create command.

  1. $ bin/pulsar-admin functions create \
  2. --parallelism 3 \
  3. # Other function info

You can adjust the parallelism of an already created function using the update interface.

  1. $ bin/pulsar-admin functions update \
  2. --parallelism 5 \
  3. # Other function

通过 YAML ,使用 parallelism function 指定其配置。 配置示例如下。

  1. # function-config.yaml
  2. parallelism: 3
  3. inputs:
  4. - persistent://public/default/input-1
  5. output: persistent://public/default/output-1
  6. # other parameters

相关更新命令如下。

  1. $ bin/pulsar-admin functions update \
  2. --function-config-file function-config.yaml

Function instance resources

集群模式下运行 Pulsar Functions 时,可以指定资源分配给 function 的每个实例

ResourceSpecified asRuntimes
CPUThe number of coresKubernetes
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

为一个 function 分配 8 个内核、8GB 内存、10GB 磁盘空间的 function 创建命令如下。

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

Resources are per instance

应用于给定 Pulsar Function 的资源适用于此 function 的每个实例。 例如,对一个并行度为 5 的 function 使用 8GB 内存,则此 function 在应用的总内存为 40GB。 在资源计算时要考虑并行度(实例数量)。

Trigger Pulsar Functions

If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. 触发 function 意味着向其发送具有特定值的消息,并通过命令行获取其输出(如有输入)。

触发 function,即通过在某个输入主题上生成的消息来调用 function。 通过 pulsar-admin functions trigger 命令,可以在不使用 pulsar-client 工具或特定语言客户端库的条件下向 function 发送消息。

要学习如何触发 function,可以从 Python function 开始,Python function 会返回基于输入的简单字符串。

  1. # myfunc.py
  2. def process(input):
  3. return "This function has been triggered with a value of {0}".format(input)

可以在本地运行模式下运行 function。

  1. $ bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --py myfunc.py \
  6. --classname myfunc \
  7. --inputs persistent://public/default/in \
  8. --output persistent://public/default/out

指定 consumer 以 pulsar-client consume 命令在输出 topic 上接收来自 myfunc function 的消息。

  1. $ bin/pulsar-client consume persistent://public/default/out \
  2. --subscription-name my-subscription
  3. --num-messages 0 # Listen indefinitely

然后可以触发 function。

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --trigger-value "hello world"

监听输出 topic 的 consumer 会在日志中生成如下内容。

  1. ----- got message -----
  2. This function has been triggered with a value of hello world

主题信息非必需

trigger 命令中,只需指定 function 的基本信息(租户、命名空间、名称)。 触发 function 前无需了解其输入 topic。