Pulsar Functions提供了简便易行的API,开发人员可以利用这些API来给Apache Pulsar消息处理系统创建和管理处理逻辑。 通过使用Pulsar Functions,你可以编写任何复杂程度的JavaPython函数,而且能与Pulsar集群一起运行它们,而无需运行单独的流处理引擎。

更多关于Pulsar Functions功能介绍,请参见Pulsar Functions overview

核心编程模型

Pulsar Functions提供了十分丰富的功能,但都是简单的编程示例。 你可以将Pulsar Functions当作轻量级的程序。

  • 首先,消费来自一个或多个Pulsar主题的消息;
  • 然后依照一系列自定义的处理逻辑对流入的每一条消息进行处理。处理逻辑完全可以由你设计,包括:
    • 在另一个Pulsar主题上产生生成的,已处理的消息
    • 或者对消息做一些诸如写入数据库之类的操作。

例如,你可以使用Pulsar Functions做如下的一系列操作:

  • 一个监听raw_sentences主题,并对输入的字符串 “清洗“ 的Python函数(删除无关的空格并将所有字符转换为小写),之后会把结果发布到sanitized-sentences主题。
  • A Java 函数负责监听sanitized-sentences 主题,计算每个单词在指定时间窗口内出现的次数,并将结果发布到results 主题。
  • 最后,负责监听results主题的一个Python函数将结果写入MySQL数据表。

示例函数

这儿有一个用Python语言实现的 “input sanitizer” 示例函数,它被存放在sanitizer.py文件里:

  1. def clean_string(s):
  2. return s.strip().lower()
  3. def process(input):
  4. return clean_string(input)

关于Pulsar Function的一些注意事项:

  • 没有客户端,生产者和消费者对象参与其中。所有的消息“plumbing”已经创建好了,你只需要关注处理的逻辑。
  • 函数逻辑本身并不指定主题,订阅类型,租户或命名空间。 而是在 部署中指定主题。 也就是,我们可以跨主题、租户和命名空间来重复使用Pulsar Functions而不用对这些属性强制编码。

部署示例

部署Pulsar Function可以使用pulsar-admin 命令行工具,特别是functions命令。 如下的示例命令,它将在 local run模式下运行我们上述的 sanitizer 函数:

  1. $ bin/pulsar-admin functions localrun \
  2. --py sanitizer.py \ # The Python file with the function's code
  3. --classname sanitizer \ # The class or function holding the processing logic
  4. --tenant public \ # The function's tenant (derived from the topic name by default)
  5. --namespace default \ # The function's namespace (derived from the topic name by default)
  6. --name sanitizer-function \ # The name of the function (the class name by default)
  7. --inputs dirty-strings-in \ # The input topic(s) for the function
  8. --output clean-strings-out \ # The output topic for the function
  9. --log-topic sanitizer-logs # The topic to which all functions logs are published

有关在Pulsar集群中运行函数的说明,请参阅部署Pulsar Functions 文档。

可用的APIs

在写Pulsar Functions时,有Java和Python两种语言可供你选择:

InterfaceDescription使用场景
语言原生接口不依赖特定Pulsar库或特殊依赖项(仅使用Java/Python自身的核心库)不需要获取函数 内容的函数。
用于Java/Python的Pulsar函数sdk提供开发语言自身接口不提供的一系列功能的特定Pulsar类库。不需要获取函数 内容的函数。

例如, 在 python 中, 语言自带函数没有外部依赖,它会向所有传入的字符串添加一个感叹号,并将结果字符串发布到主题:

  1. def process(input):
  2. return "{}!".format(input)

然而,在这个函数里你会看到Pulsar Function为Python提供的SDK

  1. from pulsar import Function
  2. class DisplayFunctionName(Function):
  3. def process(self, input, context):
  4. function_name = context.function_name()
  5. return "The function processing this message has the name {0}".format(function_name)

函数、消息和消息类型

Pulsar函数可以将字节数据作为输入,并且同样可以输出字节数组。 但是,在支持类型化接口的语言中(目前只有Java),也可以编写类型化函数。 在这种情况下, 有两种方法可以将消息绑定到类型。 * 注册Schema * SerDe

Schema 管理

Pulsar内置了 Schema注册表,并且捆绑了许多主流的schema类型(avro, json and protobuf)。 Pulsar函数可以利用输入主题中的已有的Schema信息来约束输入类型。 从主题输出也同样如此。

SerDe

SerDe stands for Serialization and Deserialization. All Pulsar Functions use SerDe for message handling. How SerDe works by default depends on the language you’re using for a particular function:

  • Python 中, 默认的 serde 是标识, 这意味着该类型被序列化为生成函数返回的任何类型。
  • Java里,许多常用的类型(String,Integer, 等等) 都默认支持。

In both languages, however, you can write your own custom SerDe logic for more complex, application-specific types. See the docs for Java and Python for language-specific instructions.

Context

Both the Java and Python SDKs provide access to a context object that can be used by the function. This context object provides a wide variety of information and functionality to the function:

  • The name and ID of the Pulsar Function
  • The message ID of each message. Each Pulsar message is automatically assigned an ID.
  • The name of the topic on which the message was sent
  • The names of all input topics as well as the output topic associated with the function
  • The name of the class used for SerDe
  • The tenant and namespace associated with the function
  • The ID of the Pulsar Functions instance running the function
  • The version of the function
  • The logger object used by the function, which can be used to create function log messages
  • Access to arbitrary user config values supplied via the CLI
  • An interface for recording metrics
  • An interface for storing and retrieving state in state storage

User config

When you run or update Pulsar Functions created using the SDK, you can pass arbitrary key/values to them via the command line with the --userConfig flag. Key/values must be specified as JSON. Here’s an example of a function creation command that passes a user config key/value to a function:

  1. $ bin/pulsar-admin functions create \
  2. --name word-filter \
  3. # Other function configs
  4. --user-config '{"forbidden-word":"rosebud"}'

If the function were a Python function, that config value could be accessed like this:

  1. from pulsar import Function
  2. class WordFilter(Function):
  3. def process(self, context, input):
  4. forbidden_word = context.user_config()["forbidden-word"]
  5. # Don't publish the message if it contains the user-supplied
  6. # forbidden word
  7. if forbidden_word in input:
  8. pass
  9. # Otherwise publish the message
  10. else:
  11. return input

Functions for Java

Writing Pulsar Functions in Java involves implementing one of two interfaces:

Getting started

In order to write Pulsar Functions in Java, you’ll need to install the proper dependencies and package your function as a JAR.

Dependencies

How you get started writing Pulsar Functions in Java depends on which API you’re using:

  • If you’re writing a Java native function, you won’t need any external dependencies.

  • If you’re writing a Java SDK function, you’ll need to import the pulsar-functions-api library.

    Here’s an example for a Maven pom.xml configuration file:

    1. <dependency>
    2. <groupId>org.apache.pulsar</groupId>
    3. <artifactId>pulsar-functions-api</artifactId>
    4. <version>2.1.1-incubating</version>
    5. </dependency>

    Here’s an example for a Gradle build.gradle configuration file:

    1. dependencies {
    2. compile group: 'org.apache.pulsar', name: 'pulsar-functions-api', version: '2.1.1-incubating'
    3. }

Packaging

Whether you’re writing Java Pulsar Functions using the native Java java.util.Function interface or using the Java SDK, you’ll need to package your function(s) as a “fat” JAR.

Starter repo

If you’d like to get up and running quickly, you can use this repo, which contains the necessary Maven configuration to build a fat JAR as well as some example functions.

Java native functions

If your function doesn’t require access to its context, you can create a Pulsar Function by implementing the java.util.Function interface, which has this very simple, single-method signature:

  1. public interface Function<I, O> {
  2. O apply(I input);
  3. }

Here’s an example function that takes a string as its input, adds an exclamation point to the end of the string, and then publishes the resulting string:

  1. import java.util.Function;
  2. public class ExclamationFunction implements Function<String, String> {
  3. @Override
  4. public String process(String input) {
  5. return String.format("%s!", input);
  6. }
  7. }

In general, you should use native functions when you don’t need access to the function’s context. If you do need access to the function’s context, then we recommend using the Pulsar Functions Java SDK.

Java native examples

There is one example Java native function in this folder :

Java SDK functions

To get started developing Pulsar Functions using the Java SDK, you’ll need to add a dependency on the pulsar-functions-api artifact to your project. Instructions can be found above.

An easy way to get up and running with Pulsar Functions in Java is to clone the pulsar-functions-java-starter repo and follow the instructions there.

Java SDK examples

There are several example Java SDK functions in this folder :

Function nameDescription
ContextFunctionIllustrates context-specific functionality like logging and metrics
WordCountFunctionIllustrates usage of Pulsar Function state-storage
ExclamationFunctionA basic string manipulation function for the Java SDK
LoggingFunctionA function that shows how logging works for Java
PublishFunctionPublishes results to a topic specified in the function’s user config (rather than on the function’s output topic)
UserConfigFunctionA function that consumes user-supplied configuration values
UserMetricFunctionA function that records metrics
VoidFunctionA simple void function

Java context object

The Context interface provides a number of methods that you can use to access the function’s context. The various method signatures for the Context interface are listed below:

  1. public interface Context {
  2. Record<?> getCurrentRecord();
  3. Collection<String> getInputTopics();
  4. String getOutputTopic();
  5. String getOutputSchemaType();
  6. String getTenant();
  7. String getNamespace();
  8. String getFunctionName();
  9. String getFunctionId();
  10. String getInstanceId();
  11. String getFunctionVersion();
  12. Logger getLogger();
  13. void incrCounter(String key, long amount);
  14. long getCounter(String key);
  15. void putState(String key, ByteBuffer value);
  16. ByteBuffer getState(String key);
  17. Map<String, Object> getUserConfigMap();
  18. Optional<Object> getUserConfigValue(String key);
  19. Object getUserConfigValueOrDefault(String key, Object defaultValue);
  20. void recordMetric(String metricName, double value);
  21. <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
  22. <O> CompletableFuture<Void> publish(String topicName, O object);
  23. }

Here’s an example function that uses several methods available via the Context object:

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. import java.util.stream.Collectors;
  5. public class ContextFunction implements Function<String, Void> {
  6. public Void process(String input, Context context) {
  7. Logger LOG = context.getLogger();
  8. String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
  9. String functionName = context.getFunctionName();
  10. String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n",
  11. input,
  12. inputTopics);
  13. LOG.info(logMessage);
  14. String metricName = String.format("function-%s-messages-received", functionName);
  15. context.recordMetric(metricName, 1);
  16. return null;
  17. }
  18. }

Void functions

Pulsar Functions can publish results to an output topic, but this isn’t required. You can also have functions that simply produce a log, write results to a database, etc. Here’s a function that writes a simple log every time a message is received:

  1. import org.slf4j.Logger;
  2. public class LogFunction implements PulsarFunction<String, Void> {
  3. public String apply(String input, Context context) {
  4. Logger LOG = context.getLogger();
  5. LOG.info("The following message was received: {}", input);
  6. return null;
  7. }
  8. }

When using Java functions in which the output type is Void, the function must always return null.

Java SerDe

Pulsar Functions use SerDe when publishing data to and consuming data from Pulsar topics. When you’re writing Pulsar Functions in Java, the following basic Java types are built in and supported by default:

  • String
  • Double
  • Integer
  • Float
  • Long
  • Short
  • Byte

Built-in vs. custom. For custom, you need to implement this interface:

  1. public interface SerDe<T> {
  2. T deserialize(byte[] input);
  3. byte[] serialize(T input);
  4. }

Java SerDe example

Imagine that you’re writing Pulsar Functions in Java that are processing tweet objects. Here’s a simple example Tweet class:

  1. public class Tweet {
  2. private String username;
  3. private String tweetContent;
  4. public Tweet(String username, String tweetContent) {
  5. this.username = username;
  6. this.tweetContent = tweetContent;
  7. }
  8. // Standard setters and getters
  9. }

In order to be able to pass Tweet objects directly between Pulsar Functions, you’ll need to provide a custom SerDe class. In the example below, Tweet objects are basically strings in which the username and tweet content are separated by a |.

  1. package com.example.serde;
  2. import org.apache.pulsar.functions.api.SerDe;
  3. import java.util.regex.Pattern;
  4. public class TweetSerde implements SerDe<Tweet> {
  5. public Tweet deserialize(byte[] input) {
  6. String s = new String(input);
  7. String[] fields = s.split(Pattern.quote("|"));
  8. return new Tweet(fields[0], fields[1]);
  9. }
  10. public byte[] serialize(Tweet input) {
  11. return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes();
  12. }
  13. }

To apply this custom SerDe to a particular Pulsar Function, you would need to:

  • Package the Tweet and TweetSerde classes into a JAR
  • Specify a path to the JAR and SerDe class name when deploying the function

Here’s an example create operation:

  1. $ bin/pulsar-admin functions create \
  2. --jar /path/to/your.jar \
  3. --output-serde-classname com.example.serde.TweetSerde \
  4. # Other function attributes

Custom SerDe classes must be packaged with your function JARs

Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. That means that you’ll need to always include your SerDe classes in your function JARs. If not, Pulsar will return an error.

Java logging

Pulsar Functions that use the Java SDK have access to an SLF4j Logger object that can be used to produce logs at the chosen log level. Here’s a simple example function that logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger:

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. public class LoggingFunction implements Function<String, Void> {
  5. @Override
  6. public void apply(String input, Context context) {
  7. Logger LOG = context.getLogger();
  8. String messageId = new String(context.getMessageId());
  9. if (input.contains("danger")) {
  10. LOG.warn("A warning was received in message {}", messageId);
  11. } else {
  12. LOG.info("Message {} received\nContent: {}", messageId, input);
  13. }
  14. return null;
  15. }
  16. }

If you want your function to produce logs, you need to specify a log topic when creating or running the function. Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --jar my-functions.jar \
  3. --classname my.package.LoggingFunction \
  4. --log-topic persistent://public/default/logging-function-logs \
  5. # Other function configs

Now, all logs produced by the LoggingFunction above can be accessed via the persistent://public/default/logging-function-logs topic.

Java user config

The Java SDK’s Context object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here’s an example function creation command that passes a key/value pair:

  1. $ bin/pulsar-admin functions create \
  2. # Other function configs
  3. --user-config '{"word-of-the-day":"verdure"}'

To access that value in a Java function:

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. import java.util.Optional;
  5. public class UserConfigFunction implements Function<String, Void> {
  6. @Override
  7. public void apply(String input, Context context) {
  8. Logger LOG = context.getLogger();
  9. Optional<String> wotd = context.getUserConfigValue("word-of-the-day");
  10. if (wotd.isPresent()) {
  11. LOG.info("The word of the day is {}", wotd);
  12. } else {
  13. LOG.warn("No word of the day provided");
  14. }
  15. return null;
  16. }
  17. }

The UserConfigFunction function will log the string "The word of the day is verdure" every time the function is invoked (i.e. every time a message arrives). The word-of-the-day user config will be changed only when the function is updated with a new config value via the command line.

You can also access the entire user config map or set a default value in case no value is present:

  1. // Get the whole config map
  2. Map<String, String> allConfigs = context.getUserConfigMap();
  3. // Get value or resort to default
  4. String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");

For all key/value pairs passed to Java Pulsar Functions, both the key and the value are Strings. If you’d like the value to be of a different type, you will need to deserialize from the String type.

Java metrics

You can record metrics using the Context object on a per-key basis. You can, for example, set a metric for the key process-count and a different metric for the key elevens-count every time the function processes a message. Here’s an example:

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. public class MetricRecorderFunction implements Function<Integer, Void> {
  4. @Override
  5. public void apply(Integer input, Context context) {
  6. // Records the metric 1 every time a message arrives
  7. context.recordMetric("hit-count", 1);
  8. // Records the metric only if the arriving number equals 11
  9. if (input == 11) {
  10. context.recordMetric("elevens-count", 1);
  11. }
  12. return null;
  13. }
  14. }

For instructions on reading and using metrics, see the Monitoring guide.

Functions for Python

Writing Pulsar Functions in Python entails implementing one of two things:

  • A process function that takes an input (message data from the function’s input topic(s)), applies some kind of logic to it, and either returns an object (to be published to the function’s output topic) or passes and thus doesn’t produce a message
  • A Function class that has a process method that provides a message input to process and a context object

Getting started

Regardless of which deployment mode you’re using, ‘pulsar-client’ python library has to installed on any machine that’s running Pulsar Functions written in Python.

That could be your local machine for local run mode or a machine running a Pulsar broker for cluster mode. To install those libraries using pip:

  1. $ pip install pulsar-client

Packaging

At the moment, the code for Pulsar Functions written in Python must be contained within a single Python file. In the future, Pulsar Functions may support other packaging formats, such as Python EXecutables (PEXes).

Python native functions

If your function doesn’t require access to its context, you can create a Pulsar Function by implementing a process function, which provides a single input object that you can process however you wish. Here’s an example function that takes a string as its input, adds an exclamation point at the end of the string, and then publishes the resulting string:

  1. def process(input):
  2. return "{0}!".format(input)

In general, you should use native functions when you don’t need access to the function’s context. If you do need access to the function’s context, then we recommend using the Pulsar Functions Python SDK.

Python native examples

There is one example Python native function in this folder :

Python SDK functions

To get started developing Pulsar Functions using the Python SDK, you’ll need to install the pulsar-client library using the instructions above.

Python SDK examples

There are several example Python functions in this folder :

Function fileDescription
exclamation_function.pyAdds an exclamation point at the end of each incoming string
logging_function.pyLogs each incoming message
thumbnailer.pyTakes image data as input and outputs a 128x128 thumbnail of each image

Python context object

The Context class provides a number of methods that you can use to access the function’s context. The various methods for the Context class are listed below:

MethodWhat it provides
get_message_idThe message ID of the message being processed
get_current_message_topic_nameThe topic of the message being currently being processed
get_function_tenantThe tenant under which the current Pulsar Function runs under
get_function_namespaceThe namespace under which the current Pulsar Function runs under
get_function_nameThe name of the current Pulsar Function
get_function_idThe ID of the current Pulsar Function
get_instance_idThe ID of the current Pulsar Functions instance
get_function_versionThe version of the current Pulsar Function
get_loggerA logger object that can be used for logging
get_user_config_valueReturns the value of a user-defined config (or None if the config doesn’t exist)
get_user_config_mapReturns the entire user-defined config as a dict
record_metricRecords a per-key metric
publishPublishes a message to the specified Pulsar topic
get_output_serde_class_nameThe name of the output SerDe class
ackAcks the message being processed to Pulsar

Python SerDe

Pulsar Functions use SerDe when publishing data to and consuming data from Pulsar topics (this is true of both native functions and SDK functions). You can specify the SerDe when creating or running functions. Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name my_function \
  5. --py my_function.py \
  6. --classname my_function.MyFunction \
  7. --custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \
  8. --output-serde-classname Serde3 \
  9. --output output-topic-1

In this case, there are two input topics, input-topic-1 and input-topic-2, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, output-topic-1, uses the Serde3 class for SerDe. At the moment, all Pulsar Function logic, include processing function and SerDe classes, must be contained within a single Python file.

When using Pulsar Functions for Python, you essentially have three SerDe options:

  1. You can use the IdentitySerde, which leaves the data unchanged. The IdentitySerDe is the default. Creating or running a function without explicitly specifying SerDe will mean that this option is used.
  2. You can use the PickeSerDe, which uses Python’s pickle for SerDe.
  3. You can create a custom SerDe class by implementing the baseline SerDe class, which has just two methods: serialize for converting the object into bytes, and deserialize for converting bytes into an object of the required application-specific type.

The table below shows when you should use each SerDe:

SerDe optionWhen to use
IdentitySerdeWhen you’re working with simple types like strings, Booleans, integers, and the like
PickleSerDeWhen you’re working with complex, application-specific types and are comfortable with pickle‘s “best effort” approach
Custom SerDeWhen you require explicit control over SerDe, potentially for performance or data compatibility purposes

Python SerDe example

Imagine that you’re writing Pulsar Functions in Python that are processing tweet objects. Here’s a simple Tweet class:

  1. class Tweet(object):
  2. def __init__(self, username, tweet_content):
  3. self.username = username
  4. self.tweet_content = tweet_content

In order to use this class in Pulsar Functions, you’d have two options:

  1. You could specify PickleSerDe, which would apply the pickle library’s SerDe
  2. You could create your own SerDe class. Here’s a simple example:
  1. from pulsar import SerDe
  2. class TweetSerDe(SerDe):
  3. def __init__(self, tweet):
  4. self.tweet = tweet
  5. def serialize(self, input):
  6. return bytes("{0}|{1}".format(self.tweet.username, self.tweet.tweet_content))
  7. def deserialize(self, input_bytes):
  8. tweet_components = str(input_bytes).split('|')
  9. return Tweet(tweet_components[0], tweet_componentsp[1])

Python logging

Pulsar Functions that use the Python SDK have access to a logging object that can be used to produce logs at the chosen log level. Here’s a simple example function that logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger:

  1. from pulsar import Function
  2. class LoggingFunction(Function):
  3. def process(self, input, context):
  4. logger = context.get_logger()
  5. msg_id = context.get_message_id()
  6. if 'danger' in input:
  7. logger.warn("A warning was received in message {0}".format(context.get_message_id()))
  8. else:
  9. logger.info("Message {0} received\nContent: {1}".format(msg_id, input))

If you want your function to produce logs on a Pulsar topic, you need to specify a log topic when creating or running the function. Here’s an example:

  1. $ bin/pulsar-admin functions create \
  2. --py logging_function.py \
  3. --classname logging_function.LoggingFunction \
  4. --log-topic logging-function-logs \
  5. # Other function configs

Now, all logs produced by the LoggingFunction above can be accessed via the logging-function-logs topic.

Python user config

The Python SDK’s Context object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here’s an example function creation command that passes a key/value pair:

  1. $ bin/pulsar-admin functions create \
  2. # Other function configs \
  3. --user-config '{"word-of-the-day":"verdure"}'

To access that value in a Python function:

  1. from pulsar import Function
  2. class UserConfigFunction(Function):
  3. def process(self, input, context):
  4. logger = context.get_logger()
  5. wotd = context.get_user_config_value('word-of-the-day')
  6. if wotd is None:
  7. logger.warn('No word of the day provided')
  8. else:
  9. logger.info("The word of the day is {0}".format(wotd))

Python metrics

You can record metrics using the Context object on a per-key basis. You can, for example, set a metric for the key process-count and a different metric for the key elevens-count every time the function processes a message. Here’s an example:

  1. from pulsar import Function
  2. class MetricRecorderFunction(Function):
  3. def process(self, input, context):
  4. context.record_metric('hit-count', 1)
  5. if input == 11:
  6. context.record_metric('elevens-count', 1)