Window Functions Context

Java SDK 提供了对可用于窗口函数中的窗口上下文对象的访问。 该上下文对象为 Pulsar 窗口函数提供了各种各样的信息和功能,如下所示。

  • 规格说明

    • 和函数有关的所有的输入主题和输出主题名称。
    • 与该函数相关联的租户和命名空间。
    • Pulsar 窗口函数的名称、ID 和版本。
    • 运行窗口函数的 Pulsar 函数实例 ID。
    • 调用窗口函数的实例数量。
    • 输出 Schema 的内置类型或自定义类名称。
  • Logger

    • 窗口函数使用的 Logger 对象,可创建窗口函数的日志信息。
  • User config

    • 访问任意的用户配置信息。
  • 路由

    • Pulsar 的窗口函数支持路由功能。 Pulsar 的窗口函数可以通过 publish 接口向任意主题发送消息。
  • Metrics

    • 记录指标的接口。
  • State storage

规格说明

细则描述包含了函数的基本信息。

获取输入主题

通过方法 getInputTopics 获取所有输入主题的名称清单

此示例演示如何在 Java 窗口函数中获取所有输入主题的名称列表。

  1. public class GetInputTopicsWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. Collection<String> inputTopics = context.getInputTopics();
  5. System.out.println(inputTopics);
  6. return null;
  7. }
  8. }

获取输出主题

通过 getOutputTopic 方法获得消息发送的目标 主题名称

此示例演示如何在 Java 窗口函数中获取输出主题的名称。

  1. public class GetOutputTopicWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String outputTopic = context.getOutputTopic();
  5. System.out.println(outputTopic);
  6. return null;
  7. }
  8. }

获取租户

通过 getTenant 方法可以获取窗口函数关联的租户名称。

此示例演示如何在 Java 窗口函数中获取租户的名称。

  1. public class GetTenantWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String tenant = context.getTenant();
  5. System.out.println(tenant);
  6. return null;
  7. }
  8. }

获取命名空间

通过 getNamespace 方法获取窗口函数关联的命名空间名称。

此示例演示如何在 Java 窗口函数中获取命名空间名称。

  1. public class GetNamespaceWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String ns = context.getNamespace();
  5. System.out.println(ns);
  6. return null;
  7. }
  8. }

获取函数名称

通过 getFunctionName 方法获取窗口函数的名称。

此示例演示如何在 Java 窗口函数中获取函数的名称。

  1. public class GetNameOfWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionName = context.getFunctionName();
  5. System.out.println(functionName);
  6. return null;
  7. }
  8. }

获取函数 ID

通过 getFunctionId 方法获取窗口函数 ID。

此示例演示如何在 Java 窗口函数中获取函数的 ID。

  1. public class GetFunctionIDWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionID = context.getFunctionId();
  5. System.out.println(functionID);
  6. return null;
  7. }
  8. }

获取函数版本

通过 getFunctionVersion 方法获取窗口函数的版本。

此示例演示如何在 Java 窗口函数中获取函数的版本。

  1. public class GetVersionOfWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionVersion = context.getFunctionVersion();
  5. System.out.println(functionVersion);
  6. return null;
  7. }
  8. }

获取实例 ID

通过 getInstanceId 方法获取窗口函数的实例 ID。

此示例演示如何在 Java 窗口函数中获取实例 ID。

  1. public class GetInstanceIDWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. int instanceId = context.getInstanceId();
  5. System.out.println(instanceId);
  6. return null;
  7. }
  8. }

获取实例个数

通过 getNumInstances 方法获取某窗口函数调用生成的实例数。

此示例演示如何在 Java 窗口函数中获取该函数的实例数。

  1. public class GetNumInstancesWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. int numInstances = context.getNumInstances();
  5. System.out.println(numInstances);
  6. return null;
  7. }
  8. }

获取输出 Schema 类型

通过 getOutputSchemaType 方法获取输出 Schema 的内置类型或自定义类名称。

此示例演示如何在 Java 窗口函数中获取输出 Schema 的类型。

  1. public class GetOutputSchemaTypeWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String schemaType = context.getOutputSchemaType();
  5. System.out.println(schemaType);
  6. return null;
  7. }
  8. }

Logger

使用 Java SDK 的 Pulsar 窗口函数可以通过 SLF4j Logger 对象去生成指定级别的日志。

以下代码示范了在 Java 方法中根据日志内容是否包含 danger 关键字来记录 WARNING 级别或 INFO 级别的日志。

  1. import java.util.Collection;
  2. import org.apache.pulsar.functions.api.Record;
  3. import org.apache.pulsar.functions.api.WindowContext;
  4. import org.apache.pulsar.functions.api.WindowFunction;
  5. import org.slf4j.Logger;
  6. public class LoggingWindowFunction implements WindowFunction<String, Void> {
  7. @Override
  8. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  9. Logger log = context.getLogger();
  10. for (Record<String> record : inputs) {
  11. log.info(record + "-window-log");
  12. }
  13. return null;
  14. }
  15. }

如果你需要你的函数来生成日志,请在创建或运行函数时指定一个日志主题。

  1. bin/pulsar-admin functions create \
  2. --jar my-functions.jar \
  3. --classname my.package.LoggingFunction \
  4. --log-topic persistent://public/default/logging-function-logs \
  5. # 其他函数配置

你可以通过 persistent://public/default/logging-function-logs 主题访问由 LoggingFunction 生成的所有日志。

Metrics

Pulsar 窗口函数可以发布可查询的任意指标到指标接口。

Note

如果 Pulsar 窗口函数使用了 JNI,则该函数将无法把指标或状态信息发布到 Pulsar。

你可以在遍历每条记录的时候使用上下文对象来记录指标信息。

以下代码示例了如何在 Java 函数中每次处理消息时为 process-count 键设置指标,同时为 elevens-count 键设置另外一个指标。

  1. import java.util.Collection;
  2. import org.apache.pulsar.functions.api.Record;
  3. import org.apache.pulsar.functions.api.WindowContext;
  4. import org.apache.pulsar.functions.api.WindowFunction;
  5. /**
  6. * 示例函数:跟踪每条消息发送时的事件时间。
  7. */
  8. public class UserMetricWindowFunction implements WindowFunction<String, Void> {
  9. @Override
  10. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  11. for (Record<String> record : inputs) {
  12. if (record.getEventTime().isPresent()) {
  13. context.recordMetric("消息事件时间:", record.getEventTime().get().doubleValue());
  14. }
  15. }
  16. return null;
  17. }
  18. }

User config

当你使用 SDK 运行或者更新函数时,你能够通过 --user-config 标志传递任意的 key/value 格式的参数。 key/value 参数必须使用 JSON 格式。

以下代码示例了如何给函数传递用户自定义的 key/value 参数。

  1. bin/pulsar-admin functions create \
  2. --name word-filter \
  3. --user-config '{"forbidden-word":"rosebud"}' \
  4. # 函数的其他配置

API

你可以使用以下 API 获取窗口函数的用户自定义信息。

getUserConfigMap

getUserConfigMap API:获取窗口函数所有用户定义的 key/value 配置。

  1. /**
  2. * 获取函数内所有用户自定义的 key/value 配置。
  3. *
  4. * @return 所有用户自定义的配置值映射
  5. */
  6. Map<String, Object> getUserConfigMap();

getUserConfigValue

getUserConfigValue API:获得一个用户自定义的 key/value。

  1. /**
  2. * 获取任意用户自定义配置的 key/value。
  3. *
  4. * @param key 参数的 key
  5. * @return Optional 对象:指定 key 对应的配置值。
  6. */
  7. Optional<Object> getUserConfigValue(String key);

getUserConfigValueOrDefault

getUserConfigValueOrdefault API:获取一个用户自定义的 key/value,若没有设置值,则返回默认值。

  1. /**
  2. * 获取任意用户自定义的 key/value,若未设置,则返回默认值。
  3. *
  4. * @param key
  5. * @param defaultValue
  6. * @return 对应 key 用户自定义的值或者是指定的默认值。
  7. */
  8. Object getUserConfigValueOrDefault(String key, Object defaultValue);

这个示例演示如何访问 Pulsar 窗口函数中配置的键值对。

Java SDK 上下文对象允许你通过命令行访问 Pulsar 窗口函数中配置的 key/value 值(以 JSON 形式)。

提示

Java 窗口函数,所有配置的键/值对,keyvalue 都是 String 类型。 要将值设置为其他类型,则需要对 String 类型反序列化。

以下代码示范如何在 Java 窗口函数中配置键/值对。

  1. bin/pulsar-admin functions create \
  2. --user-config '{"word-of-the-day":"verdure"}' \
  3. # 函数的其他配置

以下代码示范如何在窗口函数中访问值。

在每次调用本函数时,UserConfigFunction 方法都会去记录字符串 "The word of the day is verdure"(即在每次消息到达时)。 只有当函数使用一个新的配置值进行更新之后,用户配置项 word-of-the-day 的值才会被改变。更新函数值有多种方式,例如通过命令行或者 REST API。

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. import java.util.Optional;
  5. public class UserConfigWindowFunction implements WindowFunction<String, String> {
  6. @Override
  7. public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
  8. Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
  9. if (whatToWrite.get() != null) {
  10. return (String)whatToWrite.get();
  11. } else {
  12. return "好像没有数据哟~";
  13. }
  14. }
  15. }

如果没有找到任何预先设置的值,你可以访问整个用户配置映射或设置默认值。

  1. // Get the whole config map
  2. Map<String, String> allConfigs = context.getUserConfigMap();
  3. // Get value or resort to default
  4. String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");

路由

你可以使用 context.publish() 接口来发布尽可能多的结果。

在下述 Java 代码中,PublishFunction 类使用上下文对象的内置方法在函数中发布消息到 publishTopic 中。

  1. public class PublishWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
  4. String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
  5. String output = String.format("%s!", input);
  6. context.publish(publishTopic, output);
  7. return null;
  8. }
  9. }

State storage

Pulsar 窗口函数使用 Apache BookKeeper 作为状态存储接口。 在安装 Apache Pulsar 的时候(包含 Standalone 安装),都会同时包含部署 BookKeeper。

Apache Pulsar 集成了 Apache BookKeeper table service 用来存储函数的 state 信息。 例如, WordCount 函数可以通过 Pulsar Functions 的状态 API 将其 counters 状态存储到 BookKeeper table service 中。

状态是以键值对形式存储的,key 是一个字符串,value 是任意的二进制数据:counters 是以64位大端二进制数值存储的。 key 的有效范围仅在 Pulsar 函数内,该函数的所有实例都会共享同一个 key。

目前,Pulsar 窗口函数暴露了读取、更新和状态管理的 Java API。 当你使用 Java SDK 函数时,这些 API 都可以在 context 对象访问到。

Java API说明
incrCounter增加 key 引用的 counter 的值。
getCounter获取 key 引用的 counter 的值。
putState更新 key 的状态值。

你可以使用下述 API 访问、更新和管理 Java 窗口函数中的状态。

incrCounter

API incrCounter 用于增加 key 引用 counter 的值。

incrCounter 为应用提供了根据指定 key 增加指定 amount 的计数器功能。 If the key does not exist, a new key is created.

  1. /**
  2. * 增加指定 key 引用的 counter 的值
  3. * @参数 key - key 的名称
  4. * @参数 amount - 需要增加的值
  5. */
  6. void incrCounter(String key, long amount);

getCounter

API getCounter 用于获取 key 引用 counter 的值。

应用可以使用 API getCounter 根据 key 获取到 incrCounter 方法累加后的 counter 的值。

  1. /**
  2. * Retrieve the counter value for the key.
  3. *
  4. * @param key name of the key
  5. * @return the amount of the counter value for this key
  6. */
  7. long getCounter(String key);

除了API getCounter,Pulsar 还暴露了用于存储一般 key/value 状态的函数接口(putState)。

putState

API putState 用于更新 key 对应的值。

  1. /**
  2. * Update the state value for the key.
  3. *
  4. * @param key name of the key
  5. * @param value state value of the key
  6. */
  7. void putState(String key, ByteBuffer value);

以下代码演示了应用如何存储 Pulsar 窗口函数的状态。

WordCountWindowFunction 的逻辑是简单和直接的。

  1. 该函数首先使用正则表达式 \\. 将接收到的字符串拆分为多个单词。

  2. 对于每个 word,该函数将相应的 counter 递增1(通过 incrCounter(key, amount))。

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import java.util.Arrays;
  4. public class WordCountWindowFunction implements WindowFunction<String, Void> {
  5. @Override
  6. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  7. for (Record<String> input : inputs) {
  8. Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
  9. }
  10. return null;
  11. }
  12. }