At the moment, there are two deployment modes available for Pulsar Functions:

发送模式Description
Local run modeThe function runs in your local environment, for example on your laptop
Cluster modeThe function runs inside of your Pulsar cluster, on the same machines as your Pulsar brokers

Contributing new deployment modes

The Pulsar Functions feature was designed, however, with extensibility in mind. Other deployment options will be available in the future. If you’d like to add a new deployment option, we recommend getting in touch with the Pulsar developer community at dev@pulsar.apache.org.

要求

In order to deploy and manage Pulsar Functions, you need to have a Pulsar cluster running. There are several options for this:

If you’re running a non-standalone cluster, you’ll need to obtain the service URL for the cluster. How you obtain the service URL will depend on how you deployed your Pulsar cluster.

If you’re going to deploy and trigger python user-defined functions, you should install the pulsar python client first.

Command-line interface

Pulsar Functions are deployed and managed using the pulsar-admin functions interface, which contains commands such as create for deploying functions in cluster mode, trigger for triggering functions, list for listing deployed functions, and several others.

Fully Qualified Function Name (FQFN)

Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function’s tenant, namespace, and function name. FQFN’s look like this:

  1. tenant/namespace/name

例如, FQFN使您能够创建具有相同名称的多个函数, 前提是它们位于不同的命名空间中。

Default arguments

When managing Pulsar Functions, you’ll need to specify a variety of information about those functions, including tenant, namespace, input and output topics, etc. There are some parameters, however, that have default values that will be supplied if omitted. The table below lists the defaults:

Parameter默认值
Function nameWhichever value is specified for the class name (minus org, library, etc.). The flag —classname org.example.MyFunction, for example, would give the function a name of MyFunction.
TenantDerived from the input topics’ names. If the input topics are under the marketing tenant—-i.e. the topic names have the form persistent://marketing/{namespace}/{topicName}—-then the tenant will be marketing.
NamespaceDerived from the input topics’ names. If the input topics are under the asia namespace under the marketing tenant—-i.e. the topic names have the form persistent://marketing/asia/{topicName}, then the namespace will be asia.
Output topic{input topic}-{function name}-output. A function with an input topic name of incoming and a function name of exclamation, for example, would have an output topic of incoming-exclamation-output.
Subscription typeFor at-least-once and at-most-once processing guarantees, the SHARED is applied by default; for effectively-once guarantees, FAILOVER is applied
Processing guaranteesATLEAST_ONCE
Pulsar service URLpulsar://localhost:6650

Example use of defaults

Take this create command:

  1. $ bin/pulsar-admin functions create \
  2. --jar my-pulsar-functions.jar \
  3. --classname org.example.MyFunction \
  4. --inputs my-function-input-topic1,my-function-input-topic2

The created function would have default values supplied for the function name (MyFunction), tenant (public), namespace (default), subscription type (SHARED), processing guarantees (ATLEAST_ONCE), and Pulsar service URL (pulsar://localhost:6650).

Local run mode

如果你使用 localrun 模式运行一Pulsar函数,它将在运行命令的机器上运行(这可能是你的笔记本电脑,AWS EC2 实例等)。 这里是一个localrun模式下运行的命令:

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

By default, the function will connect to a Pulsar cluster running on the same machine, via a local broker service URL of pulsar://localhost:6650. 如果您想使用localrun模式来运行一个函数,但连接到非本地Pulsar 集群,你可以通过 --brokerServiceUrl 标志来指定一个不同的broker URL。 Here’s an example:

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster mode

当你运行 Pulsar Function在集群模式下时, 函数代码将被上传到Pulsar broker上,并与代理broker一起运行,而不是在您的本地环境中运行。 您可以使用 create 命令在群集模式下运行函数。 Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

Updating cluster mode functions

You can use the update command to update a Pulsar Function running in cluster mode. This command, for example, would update the function created in the section above:

  1. $ bin/pulsar-admin functions update \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/new-input-topic \
  5. --output persistent://public/default/new-output-topic

Parallelism

Pulsar Functions run as processes called instances. When you run a Pulsar Function, it runs as a single instance by default (and in local run mode you can only run a single instance of a function).

You can also specify the parallelism of a function, i.e. the number of instances to run, when you create the function. You can set the parallelism factor using the --parallelism flag of the create command. Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --parallelism 3 \
  3. # Other function info

You can adjust the parallelism of an already created function using the update interface.

  1. $ bin/pulsar-admin functions update \
  2. --parallelism 5 \
  3. # Other function

If you’re specifying a function’s configuration via YAML, use the parallelism parameter. Here’s an example config file:

  1. # function-config.yaml
  2. parallelism: 3
  3. inputs:
  4. - persistent://public/default/input-1
  5. output: persistent://public/default/output-1
  6. # other parameters

And here’s the corresponding update command:

  1. $ bin/pulsar-admin functions update \
  2. --function-config-file function-config.yaml

Function instance resources

When you run Pulsar Functions in cluster run mode, you can specify the resources that are assigned to each function instance:

ResourceSpecified as…Runtimes
CPUThe number of coresDocker (coming soon)
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

下面是一个函数创建命令的示例, 它将分配8核、8GB内存和10GB的磁盘空间给一个函数:

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

Resources are per instance

The resources that you apply to a given Pulsar Function are applied to each instance of the function. If you apply 8 GB of RAM to a function with a parallelism of 5, for example, then you are applying 40 GB of RAM total for the function. You should always make sure to factor parallelism—-i.e. the number of instances—-into your resource calculations

Triggering Pulsar Functions

If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. Triggering a function means that you send a message with a specific value to the function and get the function’s output (if any) via the command line.

Triggering a function is ultimately no different from invoking a function by producing a message on one of the function’s input topics. pulsar-admin functions trigger本质上是一种非常方便的机制用于向函数发送消息,而不需要使用pulsar-client工具或指定语言的客户端库。

To show an example of function triggering, let’s start with a simple Python function that returns a simple string based on the input:

  1. # myfunc.py
  2. def process(input):
  3. return "This function has been triggered with a value of {0}".format(input)

Let’s run that function in local run mode:

  1. $ bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --py myfunc.py \
  6. --classname myfunc \
  7. --inputs persistent://public/default/in \
  8. --output persistent://public/default/out

Now let’s make a consumer listen on the output topic for messages coming from the myfunc function using the pulsar-client consume command:

  1. $ bin/pulsar-client consume persistent://public/default/out \
  2. --subscription-name my-subscription
  3. --num-messages 0 # Listen indefinitely

Now let’s trigger that function:

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --trigger-value "hello world"

The consumer listening on the output topic should then produce this in its logs:

  1. ----- got message -----
  2. This function has been triggered with a value of hello world

Topic info not required

In the trigger command above, you may have noticed that you only need to specify basic information about the function (tenant, namespace, and name). To trigger the function, you didn’t need to know the function’s input topic(s).