Window Functions Context
Java SDK 提供了对可用于窗口函数中的窗口上下文对象的访问。 该上下文对象为 Pulsar 窗口函数提供了各种各样的信息和功能,如下所示。
-
- 和函数有关的所有的输入主题和输出主题名称。
- 与该函数相关联的租户和命名空间。
- Pulsar 窗口函数的名称、ID 和版本。
- 运行窗口函数的 Pulsar 函数实例 ID。
- 调用窗口函数的实例数量。
- 输出 Schema 的内置类型或自定义类名称。
-
- 窗口函数使用的 Logger 对象,可创建窗口函数的日志信息。
-
- 访问任意的用户配置信息。
-
- Pulsar 的窗口函数支持路由功能。 Pulsar 的窗口函数可以通过
publish
接口向任意主题发送消息。
- Pulsar 的窗口函数支持路由功能。 Pulsar 的窗口函数可以通过
-
- 记录指标的接口。
-
- 存储和检索状态的接口:状态存储。
规格说明
细则描述包含了函数的基本信息。
获取输入主题
通过方法 getInputTopics
获取所有输入主题的名称清单。
此示例演示如何在 Java 窗口函数中获取所有输入主题的名称列表。
public class GetInputTopicsWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Collection<String> inputTopics = context.getInputTopics();
System.out.println(inputTopics);
return null;
}
}
获取输出主题
通过 getOutputTopic
方法获得消息发送的目标 主题名称。
此示例演示如何在 Java 窗口函数中获取输出主题的名称。
public class GetOutputTopicWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String outputTopic = context.getOutputTopic();
System.out.println(outputTopic);
return null;
}
}
获取租户
通过 getTenant
方法可以获取窗口函数关联的租户名称。
此示例演示如何在 Java 窗口函数中获取租户的名称。
public class GetTenantWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String tenant = context.getTenant();
System.out.println(tenant);
return null;
}
}
获取命名空间
通过 getNamespace
方法获取窗口函数关联的命名空间名称。
此示例演示如何在 Java 窗口函数中获取命名空间名称。
public class GetNamespaceWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String ns = context.getNamespace();
System.out.println(ns);
return null;
}
}
获取函数名称
通过 getFunctionName
方法获取窗口函数的名称。
此示例演示如何在 Java 窗口函数中获取函数的名称。
public class GetNameOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionName = context.getFunctionName();
System.out.println(functionName);
return null;
}
}
获取函数 ID
通过 getFunctionId
方法获取窗口函数 ID。
此示例演示如何在 Java 窗口函数中获取函数的 ID。
public class GetFunctionIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionID = context.getFunctionId();
System.out.println(functionID);
return null;
}
}
获取函数版本
通过 getFunctionVersion
方法获取窗口函数的版本。
此示例演示如何在 Java 窗口函数中获取函数的版本。
public class GetVersionOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionVersion = context.getFunctionVersion();
System.out.println(functionVersion);
return null;
}
}
获取实例 ID
通过 getInstanceId
方法获取窗口函数的实例 ID。
此示例演示如何在 Java 窗口函数中获取实例 ID。
public class GetInstanceIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int instanceId = context.getInstanceId();
System.out.println(instanceId);
return null;
}
}
获取实例个数
通过 getNumInstances
方法获取某窗口函数调用生成的实例数。
此示例演示如何在 Java 窗口函数中获取该函数的实例数。
public class GetNumInstancesWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int numInstances = context.getNumInstances();
System.out.println(numInstances);
return null;
}
}
获取输出 Schema 类型
通过 getOutputSchemaType
方法获取输出 Schema 的内置类型或自定义类名称。
此示例演示如何在 Java 窗口函数中获取输出 Schema 的类型。
public class GetOutputSchemaTypeWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String schemaType = context.getOutputSchemaType();
System.out.println(schemaType);
return null;
}
}
Logger
使用 Java SDK 的 Pulsar 窗口函数可以通过 SLF4j Logger 对象去生成指定级别的日志。
以下代码示范了在 Java 方法中根据日志内容是否包含 danger
关键字来记录 WARNING
级别或 INFO
级别的日志。
import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.slf4j.Logger;
public class LoggingWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Logger log = context.getLogger();
for (Record<String> record : inputs) {
log.info(record + "-window-log");
}
return null;
}
}
如果你需要你的函数来生成日志,请在创建或运行函数时指定一个日志主题。
bin/pulsar-admin functions create \
--jar my-functions.jar \
--classname my.package.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs \
# 其他函数配置
你可以通过 persistent://public/default/logging-function-logs
主题访问由 LoggingFunction
生成的所有日志。
Metrics
Pulsar 窗口函数可以发布可查询的任意指标到指标接口。
Note
如果 Pulsar 窗口函数使用了 JNI,则该函数将无法把指标或状态信息发布到 Pulsar。
你可以在遍历每条记录的时候使用上下文对象来记录指标信息。
以下代码示例了如何在 Java 函数中每次处理消息时为 process-count
键设置指标,同时为 elevens-count
键设置另外一个指标。
import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
/**
* 示例函数:跟踪每条消息发送时的事件时间。
*/
public class UserMetricWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> record : inputs) {
if (record.getEventTime().isPresent()) {
context.recordMetric("消息事件时间:", record.getEventTime().get().doubleValue());
}
}
return null;
}
}
User config
当你使用 SDK 运行或者更新函数时,你能够通过 --user-config
标志传递任意的 key/value 格式的参数。 key/value 参数必须使用 JSON 格式。
以下代码示例了如何给函数传递用户自定义的 key/value 参数。
bin/pulsar-admin functions create \
--name word-filter \
--user-config '{"forbidden-word":"rosebud"}' \
# 函数的其他配置
API
你可以使用以下 API 获取窗口函数的用户自定义信息。
getUserConfigMap
getUserConfigMap
API:获取窗口函数所有用户定义的 key/value 配置。
/**
* 获取函数内所有用户自定义的 key/value 配置。
*
* @return 所有用户自定义的配置值映射
*/
Map<String, Object> getUserConfigMap();
getUserConfigValue
getUserConfigValue
API:获得一个用户自定义的 key/value。
/**
* 获取任意用户自定义配置的 key/value。
*
* @param key 参数的 key
* @return Optional 对象:指定 key 对应的配置值。
*/
Optional<Object> getUserConfigValue(String key);
getUserConfigValueOrDefault
getUserConfigValueOrdefault
API:获取一个用户自定义的 key/value,若没有设置值,则返回默认值。
/**
* 获取任意用户自定义的 key/value,若未设置,则返回默认值。
*
* @param key
* @param defaultValue
* @return 对应 key 用户自定义的值或者是指定的默认值。
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);
这个示例演示如何访问 Pulsar 窗口函数中配置的键值对。
Java SDK 上下文对象允许你通过命令行访问 Pulsar 窗口函数中配置的 key/value 值(以 JSON 形式)。
提示
Java 窗口函数,所有配置的键/值对,
key
和value
都是String
类型。 要将值设置为其他类型,则需要对String
类型反序列化。
以下代码示范如何在 Java 窗口函数中配置键/值对。
bin/pulsar-admin functions create \
--user-config '{"word-of-the-day":"verdure"}' \
# 函数的其他配置
以下代码示范如何在窗口函数中访问值。
在每次调用本函数时,UserConfigFunction
方法都会去记录字符串 "The word of the day is verdure"
(即在每次消息到达时)。 只有当函数使用一个新的配置值进行更新之后,用户配置项 word-of-the-day
的值才会被改变。更新函数值有多种方式,例如通过命令行或者 REST API。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import java.util.Optional;
public class UserConfigWindowFunction implements WindowFunction<String, String> {
@Override
public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
if (whatToWrite.get() != null) {
return (String)whatToWrite.get();
} else {
return "好像没有数据哟~";
}
}
}
如果没有找到任何预先设置的值,你可以访问整个用户配置映射或设置默认值。
// Get the whole config map
Map<String, String> allConfigs = context.getUserConfigMap();
// Get value or resort to default
String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");
路由
你可以使用 context.publish()
接口来发布尽可能多的结果。
在下述 Java 代码中,PublishFunction
类使用上下文对象的内置方法在函数中发布消息到 publishTopic
中。
public class PublishWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
context.publish(publishTopic, output);
return null;
}
}
State storage
Pulsar 窗口函数使用 Apache BookKeeper 作为状态存储接口。 在安装 Apache Pulsar 的时候(包含 Standalone 安装),都会同时包含部署 BookKeeper。
Apache Pulsar 集成了 Apache BookKeeper table service
用来存储函数的 state
信息。 例如, WordCount
函数可以通过 Pulsar Functions 的状态 API 将其 counters
状态存储到 BookKeeper table service 中。
状态是以键值对形式存储的,key 是一个字符串,value 是任意的二进制数据:counters 是以64位大端二进制数值存储的。 key 的有效范围仅在 Pulsar 函数内,该函数的所有实例都会共享同一个 key。
目前,Pulsar 窗口函数暴露了读取、更新和状态管理的 Java API。 当你使用 Java SDK 函数时,这些 API 都可以在 context 对象访问到。
Java API | 说明 |
---|---|
incrCounter | 增加 key 引用的 counter 的值。 |
getCounter | 获取 key 引用的 counter 的值。 |
putState | 更新 key 的状态值。 |
你可以使用下述 API 访问、更新和管理 Java 窗口函数中的状态。
incrCounter
API incrCounter
用于增加 key 引用 counter 的值。
incrCounter
为应用提供了根据指定 key
增加指定 amount
的计数器功能。 If the key
does not exist, a new key is created.
/**
* 增加指定 key 引用的 counter 的值
* @参数 key - key 的名称
* @参数 amount - 需要增加的值
*/
void incrCounter(String key, long amount);
getCounter
API getCounter
用于获取 key 引用 counter 的值。
应用可以使用 API getCounter
根据 key
获取到 incrCounter
方法累加后的 counter 的值。
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
除了API getCounter
,Pulsar 还暴露了用于存储一般 key/value 状态的函数接口(putState
)。
putState
API putState
用于更新 key 对应的值。
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
以下代码演示了应用如何存储 Pulsar 窗口函数的状态。
WordCountWindowFunction
的逻辑是简单和直接的。
该函数首先使用正则表达式
\\.
将接收到的字符串拆分为多个单词。对于每个
word
,该函数将相应的counter
递增1(通过incrCounter(key, amount)
)。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> input : inputs) {
Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
}
return null;
}
}