Pulsar Functions 是轻量级计算机处理程序,具有如下特点:

  • 从一个或多个 Pulsar topic 中消费消息;
  • 将用户提供的处理逻辑应用于每条消息;
  • 将计算结果发布到另一个主题

这是一个基于java的例子(使用原生接口)

  1. import java.util.Function;
  2. public class ExclamationFunction implements Function<String, String> {
  3. @Override
  4. public String apply(String input) { return String.format("%s!", input); }
  5. }

这是一个基于Python实现的类似的函数(也使用了原生接口)

  1. def process(input):
  2. return "{0}!".format(input)

一条消息每次被发布到输入的主题时,函数都会被执行。 例如,如果一个函数监听在主题 tweet-stream上,在每次有消息发布到这个主题的时候函数都会运行。

Goals

Pulsar Functions背后的核心目标是使您能够轻松创建各种级别的复杂的的处理逻辑,而无需部署单独的类似系统(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions本质上是现成的计算基础设施,作为Pulsar消息系统的一部分,供你使用。 这个核心目标与一系列其他目标相关联:

Inspirations

  1. Pulsar Functions功能受到若干系统和模式的启发(并从中汲取线索):

Pulsar Functions能被这样描述

  • Lambda 样式的 functions
  • 专门设计的使用Pulsar来作为消息总线

Programming model

Pulsar Functions背后的核心编程模型非常简单:

  • Functions receive messages from one or more input topics. Every time a message is received, the function can do a variety of things:
    • 将某些处理逻辑应用到输入并写入到输出:
    • 写入日志到 日志主题 (可能用于调试目的)
    • 增量 计数器

Pulsar Functions core programming model

词数统计示例

如果你使用Pulsar Functions 执行经典的字词计数示例,那么它可能看起来像这样:

Pulsar Functions word count example

If you were writing the function in Java using the Pulsar Functions SDK for Java, you could write the function like below:

  1. package org.example.functions;
  2. import org.apache.pulsar.functions.api.Context;
  3. import org.apache.pulsar.functions.api.Function;
  4. import java.util.Arrays;
  5. public class WordCountFunction implements Function<String, Void> {
  6. // This function is invoked every time a message is published to the input topic
  7. @Override
  8. public Void process(String input, Context context) throws Exception {
  9. Arrays.asList(input.split(" ")).forEach(word -> {
  10. String counterKey = word.toLowerCase();
  11. context.incrCounter(counterKey, 1);
  12. });
  13. return null;
  14. }
  15. }

Next, you need to bundle and build the jar file to be deployed, the approaches can be found in “Creating an Uber JAR” and “Creating a NAR package”. Then deploy it in your Pulsar cluster using the command line like below:

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-jar-with-dependencies.jar \
  3. --classname org.example.functions.WordCountFunction \
  4. --tenant public \
  5. --namespace default \
  6. --name word-count \
  7. --inputs persistent://public/default/sentences \
  8. --output persistent://public/default/count

Content-based routing example

Pulsar Functions的使用案例有很多,下面展示一个更复杂的例子,该示例涉及基于内容的路由。

想象一个函数, 它将内容 (字符串) 作为输入, 并根据内容将它们发布到不同的主题中(例如fruits或者vegetables主题) 如果这个内容既不属于fruit也不属于vegetable主题,就会有一个警告被记录到日志主题中. 下面是一个可视化表示形式:

Pulsar Functions routing example

如果使用Python语言实现此路由功能, 它可能如下所示:

  1. from pulsar import Function
  2. class RoutingFunction(Function):
  3. def __init__(self):
  4. self.fruits_topic = "persistent://public/default/fruits"
  5. self.vegetables_topic = "persistent://public/default/vegetables"
  6. def is_fruit(item):
  7. return item in ["apple", "orange", "pear", "other fruits..."]
  8. def is_vegetable(item):
  9. return item in ["carrot", "lettuce", "radish", "other vegetables..."]
  10. def process(self, item, context):
  11. if self.is_fruit(item):
  12. context.publish(self.fruits_topic, item)
  13. elif self.is_vegetable(item):
  14. context.publish(self.vegetables_topic, item)
  15. else:
  16. warning = "The item {0} is neither a fruit nor a vegetable".format(item)
  17. context.get_logger().warn(warning)

Command-line interface

Pulsar Functions使用 pulsar-admin 工具进行管理 (主要是functions命令). 这里是一个示例命令,它将运行一个函数在localrun模式下:

  1. $ bin/pulsar-admin functions localrun \
  2. --inputs persistent://public/default/test_src \
  3. --output persistent://public/default/test_result \
  4. --jar examples/api-examples.jar \
  5. --classname org.apache.pulsar.functions.api.examples.ExclamationFunction

Fully Qualified Function Name (FQFN)

Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function’s tenant, namespace, and function name. FQFN’s look like this:

  1. tenant/namespace/name

例如, FQFN使您能够创建具有相同名称的多个函数, 前提是它们位于不同的命名空间中。

Configuration

可以通过两种方式配置Pulsar Functions:

If you’re supplying a YAML configuration, you must specify a path to the file on the command line. Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --function-config-file ./my-function.yaml

下面是一个my-function.yaml文件的示例:

  1. name: my-function
  2. tenant: public
  3. namespace: default
  4. jar: ./target/my-functions.jar
  5. className: org.example.pulsar.functions.MyFunction
  6. inputs:
  7. - persistent://public/default/test_src
  8. output: persistent://public/default/test_result

您还可以通过命令行工具指定一些函数属性,通过 yaml 配置一些其他的函数属性, 从而同时使用这两种配置方法。

Supported languages

Pulsar Functions当前支持Java and Python两种语言. 对其他语言的支持即将推出。

The Pulsar Functions API

使用Pulsar Functions api 您可以创建以下处理逻辑:

  • Type safe. Pulsar Functions can process raw bytes or more complex, application-specific types.
  • Based on SerDe (Serialization/Deserialization). A variety of types are supported “out of the box” but you can also create your own custom SerDe logic.

Function context

使用 Pulsar Functions SDK创建的每个 Pulsar 函数 都支持获取一个上下文对象:

  1. 有关该函数的各种信息,包括:
  1. 特殊功能,包括:
  • 生成 日志 到指定日志主题的能力
  • 生成 metircs的能力

Language-native functions

支持Java和Python的原生函数,即没有依赖的Pulsar Functions

原生函数的好处是除了Java/Python中现有的“开箱即用”的变量之外,它们无需其他依赖。缺点是它们不提供对函数 上下文的访问,这对于各种功能都是必要的,包括 logging user configuration等等。

The Pulsar Functions SDK

如果你想通过Pulsar Function来获取 上下文对象, 你可以使用 Pulsar Functions SDK, 支持 JavaPython两种语言。

Java

下面是一个使用有关其上下文的信息的 java 函数示例:

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. public class ContextAwareFunction implements Function<String, Void> {
  5. @Override
  6. public Void process(String input, Context, context) {
  7. Logger LOG = context.getLogger();
  8. String functionTenant = context.getTenant();
  9. String functionNamespace = context.getNamespace();
  10. String functionName = context.getName();
  11. LOG.info("Function tenant/namespace/name: {}/{}/{}", functionTenant, functionNamespace, functionName);
  12. return null;
  13. }
  14. }

Python

下面是一个使用有关其上下文的信息的 java 函数示例:

  1. from pulsar import Function
  2. class ContextAwareFunction(Function):
  3. def process(self, input, context):
  4. log = context.get_logger()
  5. function_tenant = context.get_function_tenant()
  6. function_namespace = context.get_function_namespace()
  7. function_name = context.get_function_name()
  8. log.info("Function tenant/namespace/name: {0}/{1}/{2}".format(function_tenant, function_namespace, function_name))

部署

The Pulsar Functions feature was built to support a variety of deployment options. At the moment, there are two ways to run Pulsar Functions:

部署模式Description
Local run modeThe function runs in your local environment, for example on your laptop
Cluster mode函数在运行在你的Pulsar 集群上,与你的Pulsar brokers在相同的机器上

Local run mode

如果你使用 localrun 模式运行一Pulsar函数,它将在运行命令的机器上运行(这可能是你的笔记本电脑,AWS EC2 实例等)。 这里是一个localrun模式下运行的命令:

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

默认情况下,函数连接到同一机器上运行的 Pulsar 集群通过本地的broker服务,URL是 pulsar://localhost:6650。 如果您想使用localrun模式来运行一个函数,但连接到非本地Pulsar 集群,你可以通过 --brokerServiceUrl 标志来指定一个不同的broker URL。 Here’s an example:

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster run mode

当你运行 Pulsar Function在集群模式下时, 函数代码将被上传到Pulsar broker上,并与代理broker一起运行,而不是在您的本地环境中运行。 您可以使用 create 命令在群集模式下运行函数。 Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

此命令将上传 myfunc.py 到 Pulsar,它将使用代码启动一个 或更多 函数实例。

Parallelism

默认情况下,当您在集群模式创建并运行Pulsar函数时,只有一个实例运行。 但是, 您也可以并行运行多个实例。 您可以在创建函数时指定实例数, 也可以使用新的并行性因子更新现有的单实例函数。

例如, 此命令将创建并运行并行性为 5 (即5个实例) 的函数:

  1. $ bin/pulsar-admin functions create \
  2. --name parallel-fun \
  3. --tenant public \
  4. --namespace default \
  5. --py func.py \
  6. --classname func.ParallelFunction \
  7. --parallelism 5

Function instance resources

群集模式运行 pusar Functions 时, 可以指定资源分配给每个函数 实例:

ResourceSpecified as…Runtimes
CPUThe number of coresDocker (coming soon)
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

下面是一个函数创建命令的示例, 它将分配8核、8GB内存和10GB的磁盘空间给一个函数:

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

有关资源的详细信息, 请参阅 Deploying and Managing Pulsar Functions 文档。

Logging

使用 Pulsar Functions SDK创建的Pulsar Functions可以将日志发送到一个日志的主题,这个可以在你的函数配置里指定。 例如,使用下面的命令创建的函数将在 persistent://public/default/my-func-1-log 主题上生成所有日志:

  1. $ bin/pulsar-admin functions create \
  2. --name my-func-1 \
  3. --log-topic persistent://public/default/my-func-1-log \
  4. # Other configs

这个Java function的例子记录日志使用了不同的日志级别基于函数的输入:

  1. public class LoggerFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) {
  4. Logger LOG = context.getLogger();
  5. if (input.length() <= 100) {
  6. LOG.info("This string has a length of {}", input);
  7. } else {
  8. LOG.warn("This string is getting too long! It has {} characters", input);
  9. }
  10. }
  11. }

User configuration

Pulsar Functions可以通过命令行传递任意的key-values(key和values必须是字符串)。 这些key-values的设置通过调用函数的用户配置. 用户配置必须包括 JSON 字符串.

这是一个函数使用用户配置的额示例

  1. $ bin/pulsar-admin functions create \
  2. --user-config '{"key-1":"value-1","key-2","value-2"}' \
  3. # Other configs

这是一个在函数中获取配置信息的示例

  1. public class ConfigMapFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) {
  4. String val1 = context.getUserConfigValue("key1").get();
  5. String val2 = context.getUserConfigValue("key2").get();
  6. context.getLogger().info("The user-supplied values are {} and {}", val1, val2);
  7. return null;
  8. }
  9. }

Triggering Pulsar Functions

Pulsar Functions 运行在集群模式下可以使用触发器参考command line 通过触发器,您可以轻松地将特定值传递给函数,并获得函数的返回值,而无需担心创建客户端、向正确的输入主题发送消息等。 触发器非常有用绝不限于测试和调试目的。

Triggering a function is ultimately no different from invoking a function by producing a message on one of the function’s input topics. pulsar-admin functions trigger本质上是一种非常方便的机制用于向函数发送消息,而不需要使用pulsar-client工具或指定语言的客户端库。

让我我们看一个基于Python的函数示例(原生接口),它简单地反转字符串输入:

  1. def process(input):
  2. return input[::-1]

如果该函数在Pulsar集群中运行, 则可以像这样被触发:

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name reverse-func \
  5. --trigger-value "snoitcnuf raslup ot emoclew"

这应该在控制台返回 welcome to pulsar functions

您也可以使用--triggerFile标志,用文件的内容触发Pulsar函数,而不是通过命令行工具传递字符串。

Processing guarantees

Pulsar 函数功能提供三种不同的信息语义,你可以应用于任何函数中:

Delivery semanticsDescription
At-most-once delivery发送给函数的每个消息最多会被处理一次
At-least-once delivery发送给函数的每条消息至少被处理一次
Effectively-once delivery发送函数的每条消息没精确的处理一次

例如, 此命令将在 群集模式 中运行一个函数, 并effectively-once保证有效性:

  1. $ bin/pulsar-admin functions create \
  2. --name my-effectively-once-function \
  3. --processing-guarantees EFFECTIVELY_ONCE \
  4. # Other function configs

Metrics

Pulsar Functions使用Pulsar Functions SDK可以发布meitrics到Pulsar 关于更多信息,请参阅 Metrics for Pulsar Functions.

State storage

Pulsar Functions使用 Apache Bookerper 存储状态。 所有Pulsar安装,包括local standalone安装,BookKeeper bookies的部署等都将状态存储到[Apache BookKeeper](https://bookkeeper. apache. org)。