Debug Pulsar Functions

You can use the following methods to debug Pulsar Functions:

抓取的 stderr

将 Function 启动信息和抓取的 stderr 输出写入 logs/functions/<tenant>/<namespace>/<function>/<function>-<instance>.log

这对于调试 Function 无法启动非常有用。

Use unit test

A Pulsar Function is a function with inputs and outputs, you can test a Pulsar Function in a similar way as you test any function.

For example, if you have the following Pulsar Function:

  1. import java.util.function.Function;
  2. public class JavaNativeExclamationFunction implements Function<String, String> {
  3. @Override
  4. public String apply(String input) {
  5. return String.format("%s!", input);
  6. }
  7. }

You can write a simple unit test to test Pulsar Function.

提示

Pulsar使用测试ng进行测试。

  1. @Test
  2. public void testJavaNativeExclamationFunction() {
  3. JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction();
  4. String output = exclamation.apply("foo");
  5. Assert.assertEquals(output, "foo!");
  6. }

The following Pulsar Function implements the org.apache.pulsar.functions.api.Function interface.

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. public class ExclamationFunction implements Function<String, String> {
  4. @Override
  5. public String process(String input, Context context) {
  6. return String.format("%s!", input);
  7. }
  8. }

In this situation, you can write a unit test for this function as well. Remember to mock the Context parameter. The following is an example.

提示

Pulsar使用测试ng进行测试。

  1. @Test
  2. public void testExclamationFunction() {
  3. ExclamationFunction exclamation = new ExclamationFunction();
  4. String output = exclamation.process("foo", mock(Context.class));
  5. Assert.assertEquals(output, "foo!");
  6. }

Debug with localrun mode

When you run a Pulsar Function in localrun mode, it launches an instance of the Function on your local machine as a thread.

In this mode, a Pulsar Function consumes and produces actual data to a Pulsar cluster, and mirrors how the function actually runs in a Pulsar cluster.

Note
Currently, debugging with localrun mode is only supported by Pulsar Functions written in Java. 需要 Pulsar 2.4.0 或更高版本才能完成以下操作。 即使本地运行 Pulsar 2.4.0 之前的、版本,你也不能以本地运行模式程序调试或以线程运行 Function。

You can launch your function in the following manner.

  1. FunctionConfig functionConfig = new FunctionConfig();
  2. functionConfig.setName(functionName);
  3. functionConfig.setInputs(Collections.singleton(sourceTopic));
  4. functionConfig.setClassName(ExclamationFunction.class.getName());
  5. functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
  6. functionConfig.setOutput(sinkTopic);
  7. LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
  8. localRunner.start(true);

So you can debug functions using an IDE easily. Set breakpoints and manually step through a function to debug with real data.

The following example illustrates how to programmatically launch a function in localrun mode.

  1. public class ExclamationFunction implements Function<String, String> {
  2. @Override
  3. public String process(String s, Context context) throws Exception {
  4. return s + "!";
  5. }
  6. public static void main(String[] args) throws Exception {
  7. FunctionConfig functionConfig = new FunctionConfig();
  8. functionConfig.setName("exclamation");
  9. functionConfig.setInputs(Collections.singleton("input"));
  10. functionConfig.setClassName(ExclamationFunction.class.getName());
  11. functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
  12. functionConfig.setOutput("output");
  13. LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
  14. localRunner.start(false);
  15. }

To use localrun mode programmatically, add the following dependency.

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-functions-local-runner</artifactId>
  4. <version>${pulsar.version}</version>
  5. </dependency>

For complete code samples, see here.

Note
Debugging with localrun mode for Pulsar Functions written in other languages will be supported soon.

Use log topic

In Pulsar Functions, you can generate log information defined in functions to a specified log topic. You can configure consumers to consume messages from a specified log topic to check the log information.

Pulsar Functions core programming model

示例

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. public class LoggingFunction implements Function<String, Void> {
  5. @Override
  6. public void apply(String input, Context context) {
  7. Logger LOG = context.getLogger();
  8. String messageId = new String(context.getMessageId());
  9. if (input.contains("danger")) {
  10. LOG.warn("A warning was received in message {}", messageId);
  11. } else {
  12. LOG.info("Message {} received\nContent: {}", messageId, input);
  13. }
  14. return null;
  15. }
  16. }

As shown in the example above, you can get the logger via context.getLogger() and assign the logger to the LOG variable of slf4j, so you can define your desired log information in a function using the LOG variable. Meanwhile, you need to specify the topic to which the log information is produced.

示例

  1. $ bin/pulsar-admin functions create \
  2. --log-topic persistent://public/default/logging-function-logs \
  3. # Other function configs

Use Functions CLI

With Pulsar Functions CLI, you can debug Pulsar Functions with the following subcommands:

  • get
  • status
  • stats
  • list
  • trigger

提示

For complete commands of Pulsar Functions CLI, see here

get

Get information about a Pulsar Function.

用法

  1. $ pulsar-admin functions get options

选项

标记说明
—fqfnPulsar Function 的完全限定函数名称(FQFN)。
—namePulsar Function 的名称。
—namespacePulsar Function 的命名空间。
—tenantPulsar Function 的租户。

提示

--fqfn 包含 --name, --namespace--租户, 你可以指定 --fqfn--name --namespace--租户.

示例

You can specify --fqfn to get information about a Pulsar Function.

  1. $ ./bin/pulsar-admin functions get public/default/ExclamationFunctio6

Optionally, you can specify --name, --namespace and --tenant to get information about a Pulsar Function.

  1. $ ./bin/pulsar-admin functions get \
  2. --tenant public \
  3. --namespace default \
  4. --name ExclamationFunctio6

As shown below, the get command shows input, output, runtime, and other information about the ExclamationFunctio6 function.

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "ExclamationFunctio6",
  5. "className": "org.example.test.ExclamationFunction",
  6. "inputSpecs": {
  7. "persistent://public/default/my-topic-1": {
  8. "isRegexPattern": false
  9. }
  10. },
  11. "output": "persistent://public/default/test-1",
  12. "processingGuarantees": "ATLEAST_ONCE",
  13. "retainOrdering": false,
  14. "userConfig": {},
  15. "runtime": "JAVA",
  16. "autoAck": true,
  17. "parallelism": 1
  18. }

status

Check the current status of a Pulsar Function.

用法

  1. $ pulsar-admin functions status options

选项

标记说明
—fqfnPulsar Function 的完全限定函数名称(FQFN)。
—instance-id如果没有指定 Pulsar Function 实例的 ID
—instance-id,则获取所有实例的 ID。
—namePulsar Function 的名称。
—namespacePulsar Function 的命名空间。
—tenantPulsar Function 的租户。

示例

  1. $ ./bin/pulsar-admin functions status \
  2. --tenant public \
  3. --namespace default \
  4. --name ExclamationFunctio6 \

As shown below, the status command shows the number of instances, running instances, the instance running under the ExclamationFunctio6 function, received messages, successfully processed messages, system exceptions, the average latency and so on.

  1. {
  2. "numInstances" : 1,
  3. "numRunning" : 1,
  4. "instances" : [ {
  5. "instanceId" : 0,
  6. "status" : {
  7. "running" : true,
  8. "error" : "",
  9. "numRestarts" : 0,
  10. "numReceived" : 1,
  11. "numSuccessfullyProcessed" : 1,
  12. "numUserExceptions" : 0,
  13. "latestUserExceptions" : [ ],
  14. "numSystemExceptions" : 0,
  15. "latestSystemExceptions" : [ ],
  16. "averageLatency" : 0.8385,
  17. "lastInvocationTime" : 1557734137987,
  18. "workerId" : "c-standalone-fw-23ccc88ef29b-8080"
  19. }
  20. } ]
  21. }

stats

Get the current stats of a Pulsar Function.

用法

  1. $ pulsar-admin functions stats options

选项

标记说明
—fqfnPulsar Function 的完全限定函数名称(FQFN)。
—instance-idPulsar Function 实例的 ID。
如果没有指定 —instance-id,则获取所有实例的 ID。
—namePulsar Function 的名称。
—namespacePulsar Function 的命名空间。
—tenantPulsar Function 的租户。

示例

  1. $ ./bin/pulsar-admin functions stats \
  2. --tenant public \
  3. --namespace default \
  4. --name ExclamationFunctio6 \

The output is shown as follows:

  1. {
  2. "receivedTotal" : 1,
  3. "processedSuccessfullyTotal" : 1,
  4. "systemExceptionsTotal" : 0,
  5. "userExceptionsTotal" : 0,
  6. "avgProcessLatency" : 0.8385,
  7. "1min" : {
  8. "receivedTotal" : 0,
  9. "processedSuccessfullyTotal" : 0,
  10. "systemExceptionsTotal" : 0,
  11. "userExceptionsTotal" : 0,
  12. "avgProcessLatency" : null
  13. },
  14. "lastInvocation" : 1557734137987,
  15. "instances" : [ {
  16. "instanceId" : 0,
  17. "metrics" : {
  18. "receivedTotal" : 1,
  19. "processedSuccessfullyTotal" : 1,
  20. "systemExceptionsTotal" : 0,
  21. "userExceptionsTotal" : 0,
  22. "avgProcessLatency" : 0.8385,
  23. "1min" : {
  24. "receivedTotal" : 0,
  25. "processedSuccessfullyTotal" : 0,
  26. "systemExceptionsTotal" : 0,
  27. "userExceptionsTotal" : 0,
  28. "avgProcessLatency" : null
  29. },
  30. "lastInvocation" : 1557734137987,
  31. "userMetrics" : { }
  32. }
  33. } ]
  34. }

list

List all Pulsar Functions running under a specific tenant and namespace.

用法

  1. $ pulsar-admin functions list options

选项

标记说明
—namespacePulsar Function 的命名空间。
—tenantPulsar Function 的租户。

示例

  1. $ ./bin/pulsar-admin functions list \
  2. --tenant public \
  3. --namespace default

As shown below, the list command returns three functions running under the public tenant and the default namespace.

  1. ExclamationFunctio1
  2. ExclamationFunctio2
  3. ExclamationFunctio3

trigger

Trigger a specified Pulsar Function with a supplied value. This command simulates the execution process of a Pulsar Function and verifies it.

用法

  1. $ pulsar-admin functions trigger options

选项

标记说明
—fqfnPulsar Function 的完全限定函数名称(FQFN)。
—namePulsar Function 的名称。
—namespacePulsar Function 的命名空间。
—tenantPulsar Function 的租户。
—topicPulsar Function consume 的消息所在 topic 的名称。
—trigger-file包含触发 Pulsar Function 数据的文件路径。
—trigger-value触发 Pulsar Function 的值。

示例

  1. $ ./bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name ExclamationFunctio6 \
  5. --topic persistent://public/default/my-topic-1 \
  6. --trigger-value "hello pulsar functions"

As shown below, the trigger command returns the following result:

  1. This is my function!

Note

You must specify the entire topic name when using the --topic option. 否则,就会发生以下错误。

  1. Function in trigger function has unidentified topic
  2. Reason: Function in trigger function has unidentified topic