Develop Pulsar Functions

You learn how to develop Pulsar Functions with different APIs for Java, Python and Go.

可用的APIs

在 Java 和 Python 中,编写 Pulsar Functions 的方式有两种。 在 Go 中,可以使用 Pulsar Functions SDK。

Interface说明使用场景
语言原生接口不需要特定于 Pulsar 的库或特殊依赖(仅需要 Java/Python 的核心库)。不需要访问 context function 的 functions。
适用于 Java/Python/Go 的 Pulsar Function SDK特定于 Pulsar 的库,提供“本地”接口未提供的一系列功能。需要访问 context function 的 functions。

语言原生 function 没有外部依赖关系,该 function 为所有传入的字符串添加感叹号,并将结果发布到 topic。 语言原生 function示例如下。

Java

Python

  1. import java.util.function.Function;public class JavaNativeExclamationFunction implements Function<String, String> { @Override public String apply(String input) { return String.format("%s!", input); }}

查看完整代码,请点击这里

  1. def process(input): return "{}!".format(input)

查看完整代码,请点击这里

注意 可以在 Python2 或 Python3 中编写 Pulsar Functions。 但是,Pulsar 只将 python 作为解释器。

如果在仅支持 python3 的 Ubuntu 系统上运行 Pulsar Functions,则可能无法运行 functions。 在这种情况下,可以创建符号链接。 Your system will fail if you subsequently install any other package that depends on Python 2.x. A solution is under development in Issue 5518.

  1. sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10

使用 Pulsar Functions SDK 的示例如下。

Java

Python

Go

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;public class ExclamationFunction implements Function<String, String> { @Override public String process(String input, Context context) { return String.format("%s!", input); }}

查看完整代码,请点击这里

  1. from pulsar import Functionclass ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): return input + '!'

查看完整代码,请点击这里

  1. package mainimport ( "context" "fmt" "github.com/apache/pulsar/pulsar-function-go/pf")func HandleRequest(ctx context.Context, in []byte) error{ fmt.Println(string(in) + "!") return nil}func main() { pf.Start(HandleRequest)}

For complete code, see here.

Schema registry

Pulsar has a built-in schema registry and is bundled with popular schema types, such as Avro, JSON and Protobuf. Pulsar Functions can leverage the existing schema information from input topics and derive the input type. Schema registry 也适用于输出 topic。

SerDe

SerDe stands for Serialization and Deserialization. Pulsar Functions 使用 SerDe 向 Pulsar topic 发布数据或使用其中的数据。 默认情况下,SerDe 的工作方式取决于特定 function 所使用的语言。

Java

Python

Go

When you write Pulsar Functions in Java, the following basic Java types are built in and supported by default: String, Double, Integer, Float, Long, Short, and Byte.

要自定义 Java 类型,需要实现以下接口。

  1. public interface SerDe<T> { T deserialize(byte[] input); byte[] serialize(T input);}

SerDe works in the following ways in Java Functions.

  • If the input and output topics have schema, Pulsar Functions use schema for SerDe.
  • If the input or output topics do not exist, Pulsar Functions adopt the following rules to determine SerDe:
    • If the schema type is specified, Pulsar Functions use the specified schema type.
    • If SerDe is specified, Pulsar Functions use the specified SerDe, and the schema type for input and output topics is Byte.
    • If neither the schema type nor SerDe is specified, Pulsar Functions use the built-in SerDe. For non-primitive schema type, the built-in SerDe serializes and deserializes objects in the JSON format.

在 Python 中,默认的 SerDe 是 identity,即该类型将被序列化为 producer function 返回的类型。

创建运行 function 时,可以指定 SerDe。

  1. $ bin/pulsar-admin functions create \ --tenant public \ --namespace default \ --name my_function \ --py my_function.py \ --classname my_function.MyFunction \ --custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \ --output-serde-classname Serde3 \ --output output-topic-1

此案例包含两个输入 topic:input-topic-1input-topic-2,它们分别映射到不同的 SerDe 类(映射必须指定为 JSON 字符串)。 The output topic, output-topic-1, uses the Serde3 class for SerDe. 目前,所有 Pulsar Functions 逻辑(包括处理函数和 SerDe 类)都必须包含在同一个 Python 文件中。

在将 Pulsar Functions 用于 Python 时,有三个 SerDe 可供选择:

  1. You can use the IdentitySerde, which leaves the data unchanged. The IdentitySerDe is the default. 在未显式指定 SerDe 的情况下创建或运行 function 即使用此选项。
  2. 可以使用 PickleSerDe,将 Python pickle 用于 SerDe。
  3. You can create a custom SerDe class by implementing the baseline SerDe class, which has just two methods: serialize for converting the object into bytes, and deserialize for converting bytes into an object of the required application-specific type.

下表显示了每个 SerDe 的使用情形。

SerDe optionWhen to use
IdentitySerde在使用简单类型(如:字符串、布尔值、整型)时。
PickleSerDe在使用复杂的、特定于某应用程序的类型,并且适合于 pickle 的 “best effort”方法时。
Custom SerDe在需要显示控制 SerDe 时,可能是出于性能或数据兼容性的考虑。

当前,该功能在 Go 中不可用。

示例

Imagine that you’re writing Pulsar Functions that are processing tweet objects, you can refer to the following example of Tweet class.

Java

Python

  1. public class Tweet { private String username; private String tweetContent; public Tweet(String username, String tweetContent) { this.username = username; this.tweetContent = tweetContent; } // Standard setters and getters}

To pass Tweet objects directly between Pulsar Functions, you need to provide a custom SerDe class. In the example below, Tweet objects are basically strings in which the username and tweet content are separated by a |.

  1. package com.example.serde;import org.apache.pulsar.functions.api.SerDe;import java.util.regex.Pattern;public class TweetSerde implements SerDe<Tweet> { public Tweet deserialize(byte[] input) { String s = new String(input); String[] fields = s.split(Pattern.quote("|")); return new Tweet(fields[0], fields[1]); } public byte[] serialize(Tweet input) { return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes(); }}

To apply this customized SerDe to a particular Pulsar Function, you need to:

  • TweetTweetSerde 类打包到一个 JAR 中。
  • 部署 function 时,指定 JAR 和 SerDe 类名称的路径。

The following is an example of create operation.

  1. $ bin/pulsar-admin functions create \ --jar /path/to/your.jar \ --output-serde-classname com.example.serde.TweetSerde \ # Other function attributes

Custom SerDe classes must be packaged with your function JARs

Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. 需要在 function JAR 中包含 SerDe 类。 否则,Pulsar 将返回错误。

  1. class Tweet(object): def __init__(self, username, tweet_content): self.username = username self.tweet_content = tweet_content

In order to use this class in Pulsar Functions, you have two options:

  1. 可以指定 PickleSerDe,它将应用 pickle 库 SerDe.
  2. 可以自己创建 SerDe 类。 The following is an example.
  1. from pulsar import SerDeclass TweetSerDe(SerDe): def serialize(self, input): return bytes("{0}|{1}".format(input.username, input.tweet_content)) def deserialize(self, input_bytes): tweet_components = str(input_bytes).split('|') return Tweet(tweet_components[0], tweet_componentsp[1])

For complete code, see here.

In both languages, however, you can write custom SerDe logic for more complex, application-specific types.

Context

Java, Python and Go SDKs provide access to a context object that can be used by a function. 并提供了各种各样的信息和功能。

  • The name and ID of a Pulsar Function.
  • The message ID of each message. 每条 Pulsar 消息都会自动分配一个 ID。
  • The key, event time, properties and partition key of each message.
  • The name of the topic to which the message is sent.
  • The names of all input topics as well as the output topic associated with the function.
  • The name of the class used for SerDe.
  • The tenant and namespace associated with the function.
  • The ID of the Pulsar Functions instance running the function.
  • The version of the function.
  • The logger object used by the function, which can be used to create function log messages.
  • Access to arbitrary user configuration values supplied via the CLI.
  • An interface for recording metrics.
  • An interface for storing and retrieving state in state storage.
  • A function to publish new messages onto arbitrary topics.
  • A function to ack the message being processed (if auto-ack is disabled).
  • (Java) get Pulsar admin client.

Java

Python

Go

The Context interface provides a number of methods that you can use to access the function context. The various method signatures for the Context interface are listed as follows.

  1. public interface Context { Record<?> getCurrentRecord(); Collection<String> getInputTopics(); String getOutputTopic(); String getOutputSchemaType(); String getTenant(); String getNamespace(); String getFunctionName(); String getFunctionId(); String getInstanceId(); String getFunctionVersion(); Logger getLogger(); void incrCounter(String key, long amount); void incrCounterAsync(String key, long amount); long getCounter(String key); long getCounterAsync(String key); void putState(String key, ByteBuffer value); void putStateAsync(String key, ByteBuffer value); void deleteState(String key); ByteBuffer getState(String key); ByteBuffer getStateAsync(String key); Map<String, Object> getUserConfigMap(); Optional<Object> getUserConfigValue(String key); Object getUserConfigValueOrDefault(String key, Object defaultValue); void recordMetric(String metricName, double value); <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName); <O> CompletableFuture<Void> publish(String topicName, O object); <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException; <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException; PulsarAdmin getPulsarAdmin(); PulsarAdmin getPulsarAdmin(String clusterName);}

The following example uses several methods available via the Context object.

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;import java.util.stream.Collectors;public class ContextFunction implements Function<String, Void> { public Void process(String input, Context context) { Logger LOG = context.getLogger(); String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", ")); String functionName = context.getFunctionName(); String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n", input, inputTopics); LOG.info(logMessage); String metricName = String.format("function-%s-messages-received", functionName); context.recordMetric(metricName, 1); return null; }}
  1. class ContextImpl(pulsar.Context): def get_message_id(self): ... def get_message_key(self): ... def get_message_eventtime(self): ... def get_message_properties(self): ... def get_current_message_topic_name(self): ... def get_partition_key(self): ... def get_function_name(self): ... def get_function_tenant(self): ... def get_function_namespace(self): ... def get_function_id(self): ... def get_instance_id(self): ... def get_function_version(self): ... def get_logger(self): ... def get_user_config_value(self, key): ... def get_user_config_map(self): ... def record_metric(self, metric_name, metric_value): ... def get_input_topics(self): ... def get_output_topic(self): ... def get_output_serde_class_name(self): ... def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): ... def ack(self, msgid, topic): ... def get_and_reset_metrics(self): ... def reset_metrics(self): ... def get_metrics(self): ... def incr_counter(self, key, amount): ... def get_counter(self, key): ... def del_counter(self, key): ... def put_state(self, key, value): ... def get_state(self, key): ...
  1. func (c *FunctionContext) GetInstanceID() int { return c.instanceConf.instanceID}func (c *FunctionContext) GetInputTopics() []string { return c.inputTopics}func (c *FunctionContext) GetOutputTopic() string { return c.instanceConf.funcDetails.GetSink().Topic}func (c *FunctionContext) GetFuncTenant() string { return c.instanceConf.funcDetails.Tenant}func (c *FunctionContext) GetFuncName() string { return c.instanceConf.funcDetails.Name}func (c *FunctionContext) GetFuncNamespace() string { return c.instanceConf.funcDetails.Namespace}func (c *FunctionContext) GetFuncID() string { return c.instanceConf.funcID}func (c *FunctionContext) GetFuncVersion() string { return c.instanceConf.funcVersion}func (c *FunctionContext) GetUserConfValue(key string) interface{} { return c.userConfigs[key]}func (c *FunctionContext) GetUserConfMap() map[string]interface{} { return c.userConfigs}func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) { c.record = record}func (c *FunctionContext) GetCurrentRecord() pulsar.Message { return c.record}func (c *FunctionContext) NewOutputMessage(topic string) pulsar.Producer { return c.outputMessage(topic)}

The following example uses several methods available via the Context object.

  1. import ( "context" "fmt" "github.com/apache/pulsar/pulsar-function-go/pf")func contextFunc(ctx context.Context) { if fc, ok := pf.FromContext(ctx); ok { fmt.Printf("function ID is:%s, ", fc.GetFuncID()) fmt.Printf("function version is:%s\n", fc.GetFuncVersion()) }}

For complete code, see here.

User config

当你使用SDK 运行或者更新函数时,你能够通过--user-config参数传递任意的key/value格式的参数。 Key/values must be specified as JSON. The following function creation command passes a user configured key/value to a function.

  1. $ bin/pulsar-admin functions create \
  2. --name word-filter \
  3. # Other function configs
  4. --user-config '{"forbidden-word":"rosebud"}'

Java

Python

Go

The Java SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.

  1. $ bin/pulsar-admin functions create \ # Other function configs --user-config '{"word-of-the-day":"verdure"}'

To access that value in a Java function:

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;import java.util.Optional;public class UserConfigFunction implements Function<String, Void> { @Override public void apply(String input, Context context) { Logger LOG = context.getLogger(); Optional<String> wotd = context.getUserConfigValue("word-of-the-day"); if (wotd.isPresent()) { LOG.info("The word of the day is {}", wotd); } else { LOG.warn("No word of the day provided"); } return null; }}

The UserConfigFunction function will log the string "The word of the day is verdure" every time the function is invoked (which means every time a message arrives). The word-of-the-day user config will be changed only when the function is updated with a new config value via the command line.

You can also access the entire user config map or set a default value in case no value is present:

  1. // Get the whole config mapMap<String, String> allConfigs = context.getUserConfigMap();// Get value or resort to defaultString wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");

For all key/value pairs passed to Java functions, both the key and the value are String. 要将值设置为其他类型,则需要对 String 类型反序列化。

In Python function, you can access the configuration value like this.

  1. from pulsar import Functionclass WordFilter(Function): def process(self, context, input): forbidden_word = context.user_config()["forbidden-word"] # Don't publish the message if it contains the user-supplied # forbidden word if forbidden_word in input: pass # Otherwise publish the message else: return input

The Python SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.

  1. $ bin/pulsar-admin functions create \ # Other function configs \ --user-config '{"word-of-the-day":"verdure"}'

To access that value in a Python function:

  1. from pulsar import Functionclass UserConfigFunction(Function): def process(self, input, context): logger = context.get_logger() wotd = context.get_user_config_value('word-of-the-day') if wotd is None: logger.warn('No word of the day provided') else: logger.info("The word of the day is {0}".format(wotd))

The Go SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.

  1. $ bin/pulsar-admin functions create \ --go path/to/go/binary --user-config '{"word-of-the-day":"lackadaisical"}'

To access that value in a Go function:

  1. func contextFunc(ctx context.Context) { fc, ok := pf.FromContext(ctx) if !ok { logutil.Fatal("Function context is not defined") } wotd := fc.GetUserConfValue("word-of-the-day") if wotd == nil { logutil.Warn("The word of the day is empty") } else { logutil.Infof("The word of the day is %s", wotd.(string)) }}

Logger

Java

Python

Go

Pulsar Functions that use the Java SDK have access to an SLF4j Logger object that can be used to produce logs at the chosen log level. The following example logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger.

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;public class LoggingFunction implements Function<String, Void> { @Override public void apply(String input, Context context) { Logger LOG = context.getLogger(); String messageId = new String(context.getMessageId()); if (input.contains("danger")) { LOG.warn("A warning was received in message {}", messageId); } else { LOG.info("Message {} received\nContent: {}", messageId, input); } return null; }}

If you want your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.

  1. $ bin/pulsar-admin functions create \ --jar my-functions.jar \ --classname my.package.LoggingFunction \ --log-topic persistent://public/default/logging-function-logs \ # Other function configs

All logs produced by LoggingFunction above can be accessed via the persistent://public/default/logging-function-logs topic.

Customize Function log level

Additionally, you can use the XML file, functions_log4j2.xml, to customize the function log level. To customize the function log level, create or update functions_log4j2.xml in your Pulsar conf directory (for example, /etc/pulsar/ on bare-metal, or /pulsar/conf on Kubernetes) to contain contents such as:

  1. <Configuration> <name>pulsar-functions-instance</name> <monitorInterval>30</monitorInterval> <Properties> <Property> <name>pulsar.log.appender</name> <value>RollingFile</value> </Property> <Property> <name>pulsar.log.level</name> <value>debug</value> </Property> <Property> <name>bk.log.level</name> <value>debug</value> </Property> </Properties> <Appenders> <Console> <name>Console</name> <target>SYSTEM_OUT</target> <PatternLayout> <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout> </Console> <RollingFile> <name>RollingFile</name> <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log</fileName> <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz</filePattern> <immediateFlush>true</immediateFlush> <PatternLayout> <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout> <Policies> <TimeBasedTriggeringPolicy> <interval>1</interval> <modulate>true</modulate> </TimeBasedTriggeringPolicy> <SizeBasedTriggeringPolicy> <size>1 GB</size> </SizeBasedTriggeringPolicy> <CronTriggeringPolicy> <schedule>0 0 0 * * ?</schedule> </CronTriggeringPolicy> </Policies> <DefaultRolloverStrategy> <Delete> <basePath>${sys:pulsar.function.log.dir}</basePath> <maxDepth>2</maxDepth> <IfFileName> <glob>*/${sys:pulsar.function.log.file}*log.gz</glob> </IfFileName> <IfLastModified> <age>30d</age> </IfLastModified> </Delete> </DefaultRolloverStrategy> </RollingFile> <RollingRandomAccessFile> <name>BkRollingFile</name> <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk</fileName> <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz</filePattern> <immediateFlush>true</immediateFlush> <PatternLayout> <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout> <Policies> <TimeBasedTriggeringPolicy> <interval>1</interval> <modulate>true</modulate> </TimeBasedTriggeringPolicy> <SizeBasedTriggeringPolicy> <size>1 GB</size> </SizeBasedTriggeringPolicy> <CronTriggeringPolicy> <schedule>0 0 0 * * ?</schedule> </CronTriggeringPolicy> </Policies> <DefaultRolloverStrategy> <Delete> <basePath>${sys:pulsar.function.log.dir}</basePath> <maxDepth>2</maxDepth> <IfFileName> <glob>*/${sys:pulsar.function.log.file}.bk*log.gz</glob> </IfFileName> <IfLastModified> <age>30d</age> </IfLastModified> </Delete> </DefaultRolloverStrategy> </RollingRandomAccessFile> </Appenders> <Loggers> <Logger> <name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name> <level>${sys:bk.log.level}</level> <additivity>false</additivity> <AppenderRef> <ref>BkRollingFile</ref> </AppenderRef> </Logger> <Root> <level>${sys:pulsar.log.level}</level> <AppenderRef> <ref>${sys:pulsar.log.appender}</ref> <level>${sys:pulsar.log.level}</level> </AppenderRef> </Root> </Loggers></Configuration>

The properties set like:

  1. <Property> <name>pulsar.log.level</name> <value>debug</value> </Property>

propagate to places where they are referenced, such as:

  1. <Root> <level>${sys:pulsar.log.level}</level> <AppenderRef> <ref>${sys:pulsar.log.appender}</ref> <level>${sys:pulsar.log.level}</level> </AppenderRef> </Root>

In the above example, debug level logging would be applied to ALL function logs. This may be more verbose than you desire. To be more selective, you can apply different log levels to different classes or modules. 例如:

  1. <Logger> <name>com.example.module</name> <level>info</level> <additivity>false</additivity> <AppenderRef> <ref>${sys:pulsar.log.appender}</ref> </AppenderRef> </Logger>

You can be more specific as well, such as applying a more verbose log level to a class in the module, such as:

  1. <Logger> <name>com.example.module.className</name> <level>debug</level> <additivity>false</additivity> <AppenderRef> <ref>Console</ref> </AppenderRef> </Logger>

Each <AppenderRef> entry allows you to output the log to a target specified in the definition of the Appender.

Additivity pertains to whether log messages will be duplicated if multiple Logger entries overlap. To disable additivity, specify

  1. <additivity>false</additivity>

as shown in examples above. Disabling additivity prevents duplication of log messages when one or more <Logger> entries contain classes or modules that overlap.

The <AppenderRef> is defined in the <Appenders> section, such as:

  1. <Console> <name>Console</name> <target>SYSTEM_OUT</target> <PatternLayout> <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout></Console>

Pulsar Functions that use the Python SDK have access to a logging object that can be used to produce logs at the chosen log level. The following example function that logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger.

  1. from pulsar import Functionclass LoggingFunction(Function): def process(self, input, context): logger = context.get_logger() msg_id = context.get_message_id() if 'danger' in input: logger.warn("A warning was received in message {0}".format(context.get_message_id())) else: logger.info("Message {0} received\nContent: {1}".format(msg_id, input))

If you want your function to produce logs on a Pulsar topic, you need to specify a log topic when creating or running the function. The following is an example.

  1. $ bin/pulsar-admin functions create \ --py logging_function.py \ --classname logging_function.LoggingFunction \ --log-topic logging-function-logs \ # Other function configs

All logs produced by LoggingFunction above can be accessed via the logging-function-logs topic. Additionally, you can specify the function log level through the broker XML file as described in Customize Function log level.

The following Go Function example shows different log levels based on the function input.

  1. import ( "context" "github.com/apache/pulsar/pulsar-function-go/pf" log "github.com/apache/pulsar/pulsar-function-go/logutil")func loggerFunc(ctx context.Context, input []byte) { if len(input) <= 100 { log.Infof("This input has a length of: %d", len(input)) } else { log.Warnf("This input is getting too long! It has {%d} characters", len(input)) }}func main() { pf.Start(loggerFunc)}

当你在 GO 函数里面使用logTopic相关函数时,需要导入github.com/apache/pulsar/pulsar-function-go/logutil包,这时你不必使用getLogger() 上下文对象。

Additionally, you can specify the function log level through the broker XML file, as described here: Customize Function log level

Pulsar admin

Pulsar Functions using the Java SDK has access to the Pulsar admin client, which allows the Pulsar admin client to manage API calls to current Pulsar clusters or external clusters (if external-pulsars is provided).

Java

Below is an example of how to use the Pulsar admin client exposed from the Function context.

  1. import org.apache.pulsar.client.admin.PulsarAdmin;import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;/** * In this particular example, for every input message, * the function resets the cursor of the current function's subscription to a * specified timestamp. */public class CursorManagementFunction implements Function<String, String> { @Override public String process(String input, Context context) throws Exception { PulsarAdmin adminClient = context.getPulsarAdmin(); if (adminClient != null) { String topic = context.getCurrentRecord().getTopicName().isPresent() ? context.getCurrentRecord().getTopicName().get() : null; String subName = context.getTenant() + "/" + context.getNamespace() + "/" + context.getFunctionName(); if (topic != null) { // 1578188166 below is a random-pick timestamp adminClient.topics().resetCursor(topic, subName, 1578188166); return "reset cursor successfully"; } } return null; }}

If you want your function to get access to the Pulsar admin client, you need to enable this feature by setting exposeAdminClientEnabled=true in the functions_worker.yml file. You can test whether this feature is enabled or not using the command pulsar-admin functions localrun with the flag --web-service-url.

  1. $ bin/pulsar-admin functions localrun \ --jar my-functions.jar \ --classname my.package.CursorManagementFunction \ --web-service-url http://pulsar-web-service:8080 \ # Other function configs

Metrics

Pulsar Functions allows you to deploy and manage processing functions that consume messages from and publish messages to Pulsar topics easily. It is important to ensure that the running functions are healthy at any time. Pulsar Functions can publish arbitrary metrics to the metrics interface which can be queried.

Note

如果 Pulsar Function 使用 Java 或 Python 的语言本机接口,则该 function 无法将度量和统计信息发布到 Pulsar。

You can monitor Pulsar Functions that have been deployed with the following methods:

  • 检查由 Pulsar 提供的 Metrics。

    Pulsar Functions expose the metrics that can be collected and used for monitoring the health of Java, Python, and Go functions. 你可以通过关注 监控 指南来检查这些指标。

    For the complete list of the function metrics, see here.

  • 设置并检查你的自定义 Metrics。

    In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for Java and Python functions. Function workers 会自动将这些 Metrics 数据收集并发送到 Prometheus,你可以在 Grafana 中查看这些指标。

Here are examples of how to customize metrics for Java and Python functions.

Java

Python

Go

You can record metrics using the Context object on a per-key basis. For example, you can set a metric for the process-count key and a different metric for the elevens-count key every time the function processes a message.

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;public class MetricRecorderFunction implements Function<Integer, Void> { @Override public void apply(Integer input, Context context) { // Records the metric 1 every time a message arrives context.recordMetric("hit-count", 1); // Records the metric only if the arriving number equals 11 if (input == 11) { context.recordMetric("elevens-count", 1); } return null; }}

You can record metrics using the Context object on a per-key basis. For example, you can set a metric for the process-count key and a different metric for the elevens-count key every time the function processes a message. The following is an example.

  1. from pulsar import Functionclass MetricRecorderFunction(Function): def process(self, input, context): context.record_metric('hit-count', 1) if input == 11: context.record_metric('elevens-count', 1)

当前,该功能在 Go 中不可用。

安全

如果你想在函数里面启用安全机制,首先你需要在Functions Workers上启用安全机制。 了解更多详情,参考 安全设置

Pulsar Function 可以支持如下的 provider:

  • ClearTextSecretsProvider
  • EnvironmentBasedSecretsProvider

Pulsar Function 默认支持 ClearTextSecretsProvider。

At the same time, Pulsar Functions provides two interfaces, SecretsProvider and SecretsProviderConfigurator, allowing users to customize secret provider.

Java

Python

Go

你能够使用Context 对象获取到安全 provider。 如下所示:

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;public class GetSecretProviderFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { Logger LOG = context.getLogger(); String secretProvider = context.getSecret(input); if (!secretProvider.isEmpty()) { LOG.info("The secret provider is {}", secretProvider); } else { LOG.warn("No secret provider"); } return null; }}

你能够使用Context 对象获取到安全 provider。 如下所示:

  1. from pulsar import Functionclass GetSecretProviderFunction(Function): def process(self, input, context): logger = context.get_logger() secret_provider = context.get_secret(input) if secret_provider is None: logger.warn('No secret provider') else: logger.info("The secret provider is {0}".format(secret_provider))

当前,该功能在 Go 中不可用。

State storage

Pulsar Functions使用 Apache Bookerper 存储状态。 Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.

Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper table service to store the State for functions. For example, a WordCount function can store its counters state into BookKeeper table service via Pulsar Functions State API.

States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.

在 Pulsar Java Functions 中,你可以调用 Context 对象中的putStateputStateAsyncgetStategetStateAsyncincrCounterincrCounterAsyncgetCountergetCounterAsyncdeleteState等方法访问函数中的状态信息。 在 Pulsar Python Functions 中,可以调用 Context 对象中的 putStategetStateincrCountergetCounterdeleteState等方法访问函数中的状态信息。 You can also manage states using the querystate and putstate options to pulsar-admin functions.

Note
State storage is not available in Go.

API

Java

Python

Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the Context object when you are using Java SDK functions.

incrCounter

  1. /** * Increment the builtin distributed counter refered by key * @param key The name of the key * @param amount The amount to be incremented */ void incrCounter(String key, long amount);

incrCounter为应用提供了根据指定 key增加指定 的计数器功能。

incrCounterAsync

  1. /** * 内置的根据key 增加对应值的分布式计数器 * 不必等待增加操作完成即可返回 * * @param key The name of the key * @param amount The amount to be incremented */ CompletableFuture<Void> incrCounterAsync(String key, long amount);

incrCounterAsync为应用提供了异步的根据指定 key 增加指定 的计数器功能。

getCounter

  1. /** * Retrieve the counter value for the key. * * @param key name of the key * @return the amount of the counter value for this key */ long getCounter(String key);

应用可以使用getCounter,根据key获取到incrCounter方法累加后的计数器的值。

Except the counter API, Pulsar also exposes a general key/value API for functions to store general key/value state.

getCounterAsync

  1. /** * 根据 key 获取计数器的值, 但不必等待 * 操作完成即可返回 * * @param key name of the key * @return the amount of the counter value for this key */ CompletableFuture<Long> getCounterAsync(String key);

应用可以使用getCounter,根据 key 异步的获取到incrCounter 方法累加后的计数器的值。

putState

  1. /** * Update the state value for the key. * * @param key name of the key * @param value state value of the key */ void putState(String key, ByteBuffer value);

putStateAsync

  1. /** * 根据 Key 更新状态的值,但不需要等待操作完成即可返回 * * @param key name of the key * @param value state value of the key */ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);

应用能够使用putStateAsync方法异步的根据key去更新其状态。

getState

  1. /** * Retrieve the state value for the key. * * @param key name of the key * @return the state value for the key. */ ByteBuffer getState(String key);

getStateAsync

  1. /** * 获取到 key 的状态值, 但是不必等到操作完成即可返回。 * * @param key的名称 * @return key的状态值 */ CompletableFuture<ByteBuffer> getStateAsync(String key);

应用能够使用getStateAsync异步的根据key获取到其状态值。

deleteState

  1. /** * Delete the state value for the key. * * @param key name of the key */

Counters and binary values share the same keyspace, so this deletes either type.

Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the Context object when you are using Python SDK functions.

incr_counter

  1. def incr_counter(self, key, amount): """incr the counter of a given key in the managed state"""

Application can use incr_counter to change the counter of a given key by the given amount. If the key does not exist, a new key is created.

get_counter

  1. def get_counter(self, key): """get the counter of a given key in the managed state"""

Application can use get_counter to retrieve the counter of a given key mutated by incrCounter.

Except the counter API, Pulsar also exposes a general key/value API for functions to store general key/value state.

put_state

  1. def put_state(self, key, value): """update the value of a given key in the managed state"""

The key is a string, and the value is arbitrary binary data.

get_state

  1. def get_state(self, key): """get the value of a given key in the managed state"""

del_counter

  1. def del_counter(self, key): """delete the counter of a given key in the managed state"""

Counters and binary values share the same keyspace, so this deletes either type.

Query State

A Pulsar Function can use the State API for storing state into Pulsar’s state storage and retrieving state back from Pulsar’s state storage. Additionally Pulsar also provides CLI commands for querying its state.

  1. $ bin/pulsar-admin functions querystate \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <function-name> \
  5. --state-storage-url <bookkeeper-service-url> \
  6. --key <state-key> \
  7. [---watch]

If --watch is specified, the CLI will watch the value of the provided state-key.

示例

Java

Python

WordCountFunction is a very good example demonstrating on how Application can easily store state in Pulsar Functions.

  1. import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import java.util.Arrays;public class WordCountFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1)); return null; }}

The logic of this WordCount function is pretty simple and straightforward:

  1. The function first splits the received String into multiple words using regex \\..
  2. For each word, the function increments the corresponding counter by 1 (via incrCounter(key, amount)).
  1. from pulsar import Functionclass WordCount(Function): def process(self, item, context): for word in item.split(): context.incr_counter(word, 1)

The logic of this WordCount function is pretty simple and straightforward:

  1. 该 function 首先将接收到的字符串拆分为多个单词。
  2. 对于每个 word,该 function 将相应的 counter 递增1(通过 incr_counter(key, amount))。