本教程将引导你在计算机上运行 独立 Pulsar 集群并使用此集群运行你的第一个 Pulsar Function。 The first function will run in local run mode (outside your Pulsar cluster), while the second will run in cluster mode (inside your cluster).

在本地模式下,Pulsar Function会与 Pulsar 集群通信,但在集群之外运行。

先决条件

为了遵循此教程,你需要在机器上安装 Maven

运行 Pulsar 独立集群

为了运行 Pulsar Functions,我们需要先运行本地 Pulsar 集群。 简单的方式就是用 standalone 模式运行 Pulsar。 根据下面的步骤启动一个独立集群:

  1. $ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz
  2. $ tar xvfz apache-pulsar-2.6.1-bin.tar.gz
  3. $ cd apache-pulsar-2.6.1
  4. $ bin/pulsar standalone \
  5. --advertised-address 127.0.0.1

在独立模式下运行 Pulsar 会自动创建public 租户和default命名空间。 That tenant and namespace will be used throughout this tutorial.

在本地模式下运行 Pulsar Function

让我们从一个简单的函数开始,该函数将字符串作为Pulsar主题的输入,在字符串的末尾添加一个感叹号,然后将该新字符串发布到另一个Pulsar主题。 这是函数的代码:

  1. public class ExclamationFunction implements Function<String, String> {
  2. @Override
  3. public String apply(String input) {
  4. return String.format("%s!", input);
  5. }
  6. }

包含此函数和其他几个函数(用Java写的) 的 JAR 文件包含在你下载的二进制发行版里(在examples文件夹下)。 To run the function in local mode, i.e. on our laptop but outside our Pulsar cluster:

  1. $ bin/pulsar-admin functions localrun \
  2. --jar examples/api-examples.jar \
  3. --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
  4. --inputs persistent://public/default/exclamation-input \
  5. --output persistent://public/default/exclamation-output \
  6. --name exclamation

允许多个输入主题

In the example above, a single topic was specified using the --inputs flag. You can also specify multiple input topics as a comma-separated list using the same flag. Here’s an example:

--inputs topic1,topic2

  1. We can open up another shell and use the [`pulsar-client`](reference-cli-tools.md#pulsar-client) tool to listen for messages on the output topic:
  2. ```bash
  3. $ bin/pulsar-client consume persistent://public/default/exclamation-output \
  4. --subscription-name my-subscription \
  5. --num-messages 0

Setting the --num-messages flag to 0 means that the consumer will listen on the topic indefinitely (rather than only accepting a certain number of messages).

With a listener up and running, we can open up another shell and produce a message on the input topic that we specified:

  1. $ bin/pulsar-client produce persistent://public/default/exclamation-input \
  2. --num-produce 1 \
  3. --messages "Hello world"

In the output, you should see the following:

  1. ----- got message -----
  2. Hello world!

Success! As you can see, the message has been successfully processed by the exclamation function. To shut down the function, simply hit Ctrl+C.

Here’s what happened:

  • The Hello world message that we published to the input topic (persistent://public/default/exclamation-input) was passed to the exclamation function that we ran on our machine
  • The exclamation function processed the message (providing a result of Hello world!) and published the result to the output topic (persistent://public/default/exclamation-output).
  • If our exclamation function hadn’t been running, Pulsar would have durably stored the message data published to the input topic in Apache BookKeeper until a consumer consumed and acknowledged the message

Run a Pulsar Function in cluster mode

Local run mode is useful for development and experimentation, but if you want to use Pulsar Functions in a real Pulsar deployment, you’ll want to run them in cluster mode. In this mode, Pulsar Functions run inside your Pulsar cluster and are managed using the same pulsar-admin functions interface that we’ve been using thus far.

This command, for example, would deploy the same exclamation function we ran locally above in our Pulsar cluster (rather than outside it):

  1. $ bin/pulsar-admin functions create \
  2. --jar examples/api-examples.jar \
  3. --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
  4. --inputs persistent://public/default/exclamation-input \
  5. --output persistent://public/default/exclamation-output \
  6. --name exclamation

You should see Created successfully in the output. Now, let’s see a list of functions running in our cluster:

  1. $ bin/pulsar-admin functions list \
  2. --tenant public \
  3. --namespace default

We should see just the exclamation function listed there. We can also check the status of our deployed function using the getstatus command:

  1. $ bin/pulsar-admin functions getstatus \
  2. --tenant public \
  3. --namespace default \
  4. --name exclamation

You should see this JSON output:

  1. {
  2. "functionStatusList": [
  3. {
  4. "running": true,
  5. "instanceId": "0"
  6. }
  7. ]
  8. }

As we can see, (a) the instance is currently running and (b) there is one instance, with an ID of 0, running. We can get other information about the function (topics, tenant, namespace, etc.) using the get command instead of getstatus:

  1. $ bin/pulsar-admin functions get \
  2. --tenant public \
  3. --namespace default \
  4. --name exclamation

You should see this JSON output:

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "exclamation",
  5. "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
  6. "output": "persistent://public/default/exclamation-output",
  7. "autoAck": true,
  8. "inputs": [
  9. "persistent://public/default/exclamation-input"
  10. ],
  11. "parallelism": 1
  12. }

As we can see, the parallelism of the function is 1, meaning that only one instance of the function is running in our cluster. Let’s update our function to a parallelism of 3 using the update command:

  1. $ bin/pulsar-admin functions update \
  2. --jar examples/api-examples.jar \
  3. --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
  4. --inputs persistent://public/default/exclamation-input \
  5. --output persistent://public/default/exclamation-output \
  6. --tenant public \
  7. --namespace default \
  8. --name exclamation \
  9. --parallelism 3

You should see Updated successfully in the output. If you run the get command from above for the function, you can see that the parallelism has increased to 3, meaning that there are now three instances of the function running in our cluster:

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "exclamation",
  5. "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
  6. "output": "persistent://public/default/exclamation-output",
  7. "autoAck": true,
  8. "inputs": [
  9. "persistent://public/default/exclamation-input"
  10. ],
  11. "parallelism": 3
  12. }

Finally, we can shut down our running function using the delete command:

  1. $ bin/pulsar-admin functions delete \
  2. --tenant public \
  3. --namespace default \
  4. --name exclamation

If you see Deleted successfully in the output, then you’ve succesfully run, updated, and shut down a Pulsar Function running in cluster mode. Congrats! Now, let’s go even further and run a brand new function in the next section.

Writing and running a new function

In order to write and run the Python function below, you’ll need to install a few dependencies:

$ pip install pulsar-client

  1. In the above examples, we ran and managed a pre-written Pulsar Function and saw how it worked. To really get our hands dirty, let's write and our own function from scratch, using the Python API. This simple function will also take a string as input but it will reverse the string and publish the resulting, reversed string to the specified topic.
  2. First, create a new Python file:
  3. ```bash
  4. $ touch reverse.py

In that file, add the following:

  1. def process(input):
  2. return input[::-1]

Here, the process method defines the processing logic of the Pulsar Function. It simply uses some Python slice magic to reverse each incoming string. Now, we can deploy the function using create:

  1. $ bin/pulsar-admin functions create \
  2. --py reverse.py \
  3. --classname reverse \
  4. --inputs persistent://public/default/backwards \
  5. --output persistent://public/default/forwards \
  6. --tenant public \
  7. --namespace default \
  8. --name reverse

If you see Created successfully, the function is ready to accept incoming messages. Because the function is running in cluster mode, we can trigger the function using the trigger command. This command will send a message that we specify to the function and also give us the function’s output. Here’s an example:

  1. $ bin/pulsar-admin functions trigger \
  2. --name reverse \
  3. --tenant public \
  4. --namespace default \
  5. --trigger-value "sdrawrof won si tub sdrawkcab saw gnirts sihT"

You should get this output:

  1. This string was backwards but is now forwards

Once again, success! We created a brand new Pulsar Function, deployed it in our Pulsar standalone cluster in cluster mode and successfully triggered the function. If you’re ready for more, check out one of these docs:

Packaging python dependencies

For python functions requiring dependencies to be deployable in pulsar worker instances in an offline manner, we need to package the artifacts as below.

Client Requirements

Following programs are required to be installed on the client machine

  1. pip \\ rquired for getting python dependencies
  2. zip \\ for building zip archives

Python Dependencies

A file named requirements.txt is needed with required dependencies for the python function

  1. sh==1.12.14

Prepare the pulsar function in folder called src.

Run the following command to gather the python dependencies in the folder caller deps

  1. pip download \
  2. --only-binary :all: \
  3. --platform manylinux1_x86_64 \
  4. --python-version 27 \
  5. --implementation cp \
  6. --abi cp27m -r requirements.txt -d deps

Sample ouptut

  1. Collecting sh==1.12.14 (from -r requirements.txt (line 1))
  2. Using cached https://files.pythonhosted.org/packages/4a/22/17b22ef5b049f12080f5815c41bf94de3c229217609e469001a8f80c1b3d/sh-1.12.14-py2.py3-none-any.whl
  3. Saved ./deps/sh-1.12.14-py2.py3-none-any.whl
  4. Successfully downloaded sh

Note pulsar-client is not needed as a dependency as it already installed in the worker node.

Packaging

Create a destination folder with the desired pacaking name eg : exclamation, copy src and deps folder into it and finally compress the folder into a zip archive.

Sample sequence

  1. cp -R deps exclamation/
  2. cp -R src exclamation/
  3. ls -la exclamation/
  4. total 7
  5. drwxr-xr-x 5 a.ahmed staff 160 Nov 6 17:51 .
  6. drwxr-xr-x 12 a.ahmed staff 384 Nov 6 17:52 ..
  7. drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 deps
  8. drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 src
  9. zip -r exclamation.zip exclamation

Archive exclamation.zip can we deployed as function into a pulsar worker, the worker does not need internet connectivity to download packages as they are all included in the zip file.