Package Pulsar Functions

你可以在 Java ,Python 和 Go 中打包 Pulsar 函数。 在 Java 中打包窗口函数和 Java 函数打包一样。

注意
当前,Python 和 Go 不允许使用窗口函数。

前提条件

在运行 Pulsar 函数之前,你必须先启动 Pulsar。 你可以在 Docker 内独立运行 Pulsar 或者在 Kubernetes 内运行 Pulsar

验证 Docker 镜像是否启动,你可以使用命令docker ps

Java

在Java 内打包一个函数,需要完成如下步骤。

  1. 使用 pom 文件创建一个新的 maven 项目。 如下代码示例,假设你的包名称是mainClass

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>java-function</groupId>
    7. <artifactId>java-function</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>org.apache.pulsar</groupId>
    12. <artifactId>pulsar-functions-api</artifactId>
    13. <version>2.6.0</version>
    14. </dependency>
    15. </dependencies>
    16. <build>
    17. <plugins>
    18. <plugin>
    19. <artifactId>maven-assembly-plugin</artifactId>
    20. <configuration>
    21. <appendAssemblyId>false</appendAssemblyId>
    22. <descriptorRefs>
    23. <descriptorRef>jar-with-dependencies</descriptorRef>
    24. </descriptorRefs>
    25. <archive>
    26. <manifest>
    27. <mainClass>org.example.test.ExclamationFunction</mainClass>
    28. </manifest>
    29. </archive>
    30. </configuration>
    31. <executions>
    32. <execution>
    33. <id>make-assembly</id>
    34. <phase>package</phase>
    35. <goals>
    36. <goal>assembly</goal>
    37. </goals>
    38. </execution>
    39. </executions>
    40. </plugin>
    41. <plugin>
    42. <groupId>org.apache.maven.plugins</groupId>
    43. <artifactId>maven-compiler-plugin</artifactId>
    44. <configuration>
    45. <source>8</source>
    46. <target>8</target>
    47. </configuration>
    48. </plugin>
    49. </plugins>
    50. </build>
    51. </project>
  2. 编写 Java 函数

    1. package org.example.test;
    2. import java.util.function.Function;
    3. public class ExclamationFunction implements Function<String, String> {
    4. @Override
    5. public String apply(String s) {
    6. return "This is my function!";
    7. }
    8. }

    对于包的应用,你可以使用以下接口之一:

    • Java 8 提供的 Function 接口: java.util.function.Function
    • Pulsar Function 提供的接口:org.apache.pulsar.functions.api.Function

    两个接口主要的不同是,接口org.apache.pulsar.functions.api.Function接口提供了 Context 接口。 当你编写一个函数,并想和它交互时,你能够通过 Context 获的关于 Pulsar Function 的各种各样的信息和功能。

    下面的例子使用了接口org.apache.pulsar.functions.api.Function提供的 Context 对象。

    1. package org.example.functions;
    2. import org.apache.pulsar.functions.api.Context;
    3. import org.apache.pulsar.functions.api.Function;
    4. import java.util.Arrays;
    5. public class WordCountFunction implements Function<String, Void> {
    6. // This function is invoked every time a message is published to the input topic
    7. @Override
    8. public Void process(String input, Context context) throws Exception {
    9. Arrays.asList(input.split(" ")).forEach(word -> {
    10. String counterKey = word.toLowerCase();
    11. context.incrCounter(counterKey, 1);
    12. });
    13. return null;
    14. }
    15. }
  3. Java 函数打包

    1. mvn package

    Java 函数打包完成后,会自动创建target目录。 打开target目录可以看到一个叫做java-function-1.0-SNAPSHOT.jar的 JAR 文件。

  4. 运行 Java 函数。

    (1) 把 jar 包拷贝到 Pulsar 镜像里面

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of java-function-1.0-SNAPSHOT.jar> CONTAINER ID:/pulsar

    (2) 使用如下命令运行 Java 函数。

    1. ./bin/pulsar-admin functions localrun \
    2. --classname org.example.test.ExclamationFunction \
    3. --jar java-function-1.0-SNAPSHOT.jar \
    4. --inputs persistent://public/default/my-topic-1 \
    5. --output persistent://public/default/test-1 \
    6. --tenant public \
    7. --namespace default \
    8. --name JavaFunction

    显示如下日志,表明 Java 函数运行成功。

    1. ...
    2. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    3. ...

Python

Python 函数支持以下三种格式:

  • 单个 Python 文件
  • ZIP 压缩文件
  • PIP

单个 Python 文件

Python 打包单个Python文件的函数,需要完成如下步骤。

  1. 写一个Python 函数。

    1. from pulsar import Function // import the Function module from Pulsar
    2. # The classic ExclamationFunction that appends an exclamation at the end
    3. # of the input
    4. class ExclamationFunction(Function):
    5. def __init__(self):
    6. pass
    7. def process(self, input, context):
    8. return input + '!'

    这个例子中,当你写Python 函数时,你必须去继承 Funciton 类,并且实现这个process()方法。

    process() 方法主要有两个参数:

    • input 表示你的输入。

    • context是由 Pulsar 函数暴露的一个接口。 基于提供的上下文对象,您可以获取 Python 函数中的属性。

  2. 安装Python 客户端。

    Python 函数的实现取决于 Python 客户端,所以在部署一个函数之前,你必须去安装对应的 Python 客户端版本。

    1. pip install python-client==2.6.0
  3. 运行 Python 函数

    (1) 拷贝 Python 函数到 Pulsar 镜像内。

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of Python function file> CONTAINER ID:/pulsar

    (2) 使用如下命令运行 Python 函数。

    1. ./bin/pulsar-admin functions localrun \
    2. --classname org.example.test.ExclamationFunction \
    3. --py <path of Python Function file> \
    4. --inputs persistent://public/default/my-topic-1 \
    5. --output persistent://public/default/test-1 \
    6. --tenant public \
    7. --namespace default \
    8. --name PythonFunction

    显示如下日志,表明 Java 函数运行成功。

    1. ...
    2. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    3. ...

ZIP 压缩文件

Python 使用ZIP压缩文件打包函数,需要完成如下步骤。

  1. 准备ZIP 压缩文件

    Python 函数打包成ZIP 压缩文件,需要包含一下内容。

    1. 假设 ZIP 文件的名称是 `func.zip`,那么解压后的目录结构应如下:
    2. "func/src"
    3. "func/requirements.txt"
    4. "func/deps"

    exclamation.zip为例。 这个例子内部的结构如下。

    1. .
    2. ├── deps
    3. └── sh-1.12.14-py2.py3-none-any.whl
    4. └── src
    5. └── exclamation.py
  2. 运行 Python 函数

    (1) 拷贝 ZIP 文件到 Pulsar 镜像里面。

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of ZIP file> CONTAINER ID:/pulsar

    (2) 使用如下命令运行 Python 函数。

    1. ./bin/pulsar-admin functions localrun \
    2. --classname exclamation \
    3. --py <path of ZIP file> \
    4. --inputs persistent://public/default/in-topic \
    5. --output persistent://public/default/out-topic \
    6. --tenant public \
    7. --namespace default \
    8. --name PythonFunction

    显示如下日志,表明 Java 函数运行成功。

    1. ...
    2. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    3. ...

PIP

PIP 方式仅在 Kubernetes 运行时支持。 Python 使用PIP方式打包函数,需要完成如下步骤。

  1. 配置functions_worker.yml文件。

    1. #### Kubernetes Runtime ####
    2. installUserCodeDependencies: true
  2. 编写 Python 函数。

    1. from pulsar import Function
    2. import js2xml
    3. # The classic ExclamationFunction that appends an exclamation at the end
    4. # of the input
    5. class ExclamationFunction(Function):
    6. def __init__(self):
    7. pass
    8. def process(self, input, context):
    9. // add your logic
    10. return input + '!'

    您可以引入额外的依赖关系。 当 Python 函数发现这个文件使用whl并且参数installUserCodeDependencies是打开的,那么系统会使用pip install命令安装Python 函数所需要的依赖。

  3. 生成whl文件。

    1. $ cd $PULSAR_HOME/pulsar-functions/scripts/python
    2. $ chmod +x generate.sh
    3. $ ./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
    4. # 例子: ./generate.sh /path/to/python /path/to/python/output 1.0.0

    执行后会在目录/path/to/python/output生成如下文件:

    1. -rw-r--r-- 1 root staff 1.8K 8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
    2. -rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
    3. -rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl

Go

Go 打包函数,需要完成如下步骤。

  1. 编写 Go 函数。

    当前,Go 函数仅仅只能使用 SDK 实现,并且函数的接口是 SDK 暴露出来的。 在使用 Go 函数之前,你必须先引入 “github.com/apache/pulsar/pulsar-function-go/pf”包。

    1. import (
    2. "context"
    3. "fmt"
    4. "github.com/apache/pulsar/pulsar-function-go/pf"
    5. )
    6. func HandleRequest(ctx context.Context, input []byte) error {
    7. fmt.Println(string(input) + "!")
    8. return nil
    9. }
    10. func main() {
    11. pf.Start(HandleRequest)
    12. }

    你能够使用 Context 对象和 Go 函数交互。

    1. if fc, ok := pf.FromContext(ctx); ok {
    2. fmt.Printf("function ID is:%s, ", fc.GetFuncID())
    3. fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
    4. }

    写 Go 函数时,需要记得下面几点:

    • main()函数中,你仅仅需要将函数名称注册到Start(). 仅仅在函数Start()会收到函数名称。

    • Go 函数用到的 Go 反射机制,基于收到的函数名称,去校验参数列表和返回值列表是否正确。 参数列表和返回值列表必须是如下函数例子中的一个:

      1. func ()
      2. func () error
      3. func (input) error
      4. func () (output, error)
      5. func (input) (output, error)
      6. func (context.Context) error
      7. func (context.Context, input) error
      8. func (context.Context) (output, error)
      9. func (context.Context, input) (output, error)
  2. 构建 Go 函数。

    1. go build <your Go Function filename>.go
  3. 运行 Go 函数

    (1) 拷贝 Go 函数到 Pulsar 镜像里面。

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <your go function path> CONTAINER ID:/pulsar

    (2) 使用如下命令运行 Go 函数。

    1. ./bin/pulsar-admin functions localrun \
    2. --go [your go function path]
    3. --inputs [input topics] \
    4. --output [output topic] \
    5. --tenant [default:public] \
    6. --namespace [default:default] \
    7. --name [custom unique go function name]

    显示如下日志,表明 Java 函数运行成功。

    1. ...
    2. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    3. ...

集群模式部署函数

如果你想在集群模式下启动函数,你可以将上面命令的localrun替换为create。 显示如下日志,表明 Java 函数运行成功。

  1. "Created successfully"

更多关于参数的信息,如--classname--jar--py--go--inputs,可以运行命令./bin/pulsar-admin functions 或者点击查看here