Pulsar Functions overview

Pulsar Functions 是轻量级计算流程,具有以下特点:

  • 从一个或多个 Pulsar topic 中消费消息;
  • 将用户提供的处理逻辑应用于每条消息;
  • 将运行结果发布到另一个 topic。

目标

Pulsar Functions背后的核心目标是使您能够轻松创建各种级别的复杂的的处理逻辑,而无需部署单独的类似系统(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions are computing infrastructure of Pulsar messaging system. The core goal is tied to a series of other goals:

  • 提高开发者的生产力(用开发者熟悉的语言和Pulsar Function 的函数SDK)
  • 简单的故障排查
  • 操作简单(不需要外部处理系统)

灵感来源

Pulsar Functions are inspired by (and take cues from) several systems and paradigms:

Pulsar Functions can be described as

  • Lambda 样式的 functions
  • 专门设计使用 Pulsar 来作为消息总线。

编程模型

Pulsar Functions provide a wide range of functionality, and the core programming model is simple. Functions receive messages from one or more input topics. Each time a message is received, the function will complete the following tasks.

  • 将某些处理逻辑应用到输入并写入到输出:
  • Write logs to a log topic (potentially for debugging purposes)
  • 增量 计数器

Pulsar Functions core programming model

You can use Pulsar Functions to set up the following processing chain:

  • 使用Python实现的函数去监听主题raw-sentences,并对输入的字符串进行”清洗”(如删除无关的空格,并将所有的字符串转化为小写),然后将这些结果发布到主题sanitized-sentences
  • 使用Java实现的函数去监听主题 sanitized-sentences,并计算指定时间窗口每个单词出现的次数,并将结果发布到主题 results
  • Finally, a Python function listens for the results topic and writes the results to a MySQL table.

词数统计示例

If you implement the classic word count example using Pulsar Functions, it looks something like this:

Pulsar Functions word count example

使用 Pulsar Functions SDK for Java 实现一个 Java 函数,你可以实现如下函数:

  1. package org.example.functions;
  2. import org.apache.pulsar.functions.api.Context;
  3. import org.apache.pulsar.functions.api.Function;
  4. import java.util.Arrays;
  5. public class WordCountFunction implements Function<String, Void> {
  6. // This function is invoked every time a message is published to the input topic
  7. @Override
  8. public Void process(String input, Context context) throws Exception {
  9. Arrays.asList(input.split(" ")).forEach(word -> {
  10. String counterKey = word.toLowerCase();
  11. context.incrCounter(counterKey, 1);
  12. });
  13. return null;
  14. }
  15. }

将上面的代码编译成可部署的 JAR 文件,可以使用如下命令行将 JAR 包部署到 Pulsar 集群中。

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-jar-with-dependencies.jar \
  3. --classname org.example.functions.WordCountFunction \
  4. --tenant public \
  5. --namespace default \
  6. --name word-count \
  7. --inputs persistent://public/default/sentences \
  8. --output persistent://public/default/count

基于内容的路由示例

Pulsar Functions are used in many cases. The following is a sophisticated example that involves content-based routing.

For example, a function takes items (strings) as input and publishes them to either a fruits or vegetables topic, depending on the item. 或者说,如果这个元素既不属于 fruit 或者 vegetable,那么就往日志主题记录一个警告的信息。 The following is a visual representation.

Pulsar Functions routing example

If you implement this routing functionality in Python, it looks something like this:

  1. from pulsar import Function
  2. class RoutingFunction(Function):
  3. def __init__(self):
  4. self.fruits_topic = "persistent://public/default/fruits"
  5. self.vegetables_topic = "persistent://public/default/vegetables"
  6. @staticmethod
  7. def is_fruit(item):
  8. return item in [b"apple", b"orange", b"pear", b"other fruits..."]
  9. @staticmethod
  10. def is_vegetable(item):
  11. return item in [b"carrot", b"lettuce", b"radish", b"other vegetables..."]
  12. def process(self, item, context):
  13. if self.is_fruit(item):
  14. context.publish(self.fruits_topic, item)
  15. elif self.is_vegetable(item):
  16. context.publish(self.vegetables_topic, item)
  17. else:
  18. warning = "The item {0} is neither a fruit nor a vegetable".format(item)
  19. context.get_logger().warn(warning)

将上面的代码保存到~/router.py文件,可以使用如下命令行将该文件部署到 Pulsar 集群中。

  1. $ bin/pulsar-admin functions create \
  2. --py ~/router.py \
  3. --classname router.RoutingFunction \
  4. --tenant public \
  5. --namespace default \
  6. --name route-fruit-veg \
  7. --inputs persistent://public/default/basket-items

Functions, messages and message types

Pulsar Functions take byte arrays as inputs and spit out byte arrays as output. However in languages that support typed interfaces(Java), you can write typed Functions, and bind messages to types in the following ways.

Fully Qualified Function Name (FQFN)

Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function tenant, namespace, and function name. FQFN looks like this:

  1. tenant/namespace/name

FQFNs enable you to create multiple functions with the same name provided that they are in different namespaces.

Supported languages

Currently, you can write Pulsar Functions in Java, Python, and Go. For details, refer to Develop Pulsar Functions.

Processing guarantees

Pulsar Functions provide three different messaging semantics that you can apply to any function.

Delivery semantics说明
At-most-once deliveryEach message sent to the function is likely to be processed, or not to be processed (hence “at most”).
At-least-once deliveryEach message sent to the function can be processed more than once (hence the “at least”).
Effectively-once deliveryEach message sent to the function will have one output associated with it.

Apply processing guarantees to a function

创建 Function 时,您可以为 Pulsar Function 设置 processing guarantees 。 使用如下 pulsar-function create 命令创建一个具有 effectively-once(严格一次) 语义的函数,保证消息只能被处理一次。

  1. $ bin/pulsar-admin functions create \
  2. --name my-effectively-once-function \
  3. --processing-guarantees EFFECTIVELY_ONCE \
  4. # Other function configs

--processing-guarantees 有如下可用选项:

  • ATMOST_ONCE
  • ATLEAST_ONCE
  • EFFECTIVELY_ONCE

By default, Pulsar Functions provide at-least-once delivery guarantees. So if you create a function without supplying a value for the --processingGuarantees flag, the function provides at-least-once guarantees.

Update the processing guarantees of a function

You can change the processing guarantees applied to a function using the update command. The following is an example.

  1. $ bin/pulsar-admin functions update \
  2. --processing-guarantees ATMOST_ONCE \
  3. # Other function configs