Tutorials

Write a function for word count

Tutorials - 图1note

The following example is a stateful function. By default, the state of a function is disabled. See Enable stateful functions for more instructions.

  1. Write the function in Java using the SDK for Java.

    1. package org.example.functions;
    2. import org.apache.pulsar.functions.api.Context;
    3. import org.apache.pulsar.functions.api.Function;
    4. import java.util.Arrays;
    5. public class WordCountFunction implements Function<String, Void> {
    6. // This function is invoked every time a message is published to the input topic
    7. @Override
    8. public Void process(String input, Context context) throws Exception {
    9. Arrays.asList(input.split(" ")).forEach(word -> {
    10. String counterKey = word.toLowerCase();
    11. context.incrCounter(counterKey, 1);
    12. });
    13. return null;
    14. }
    15. }
  2. Bundle and build the JAR file, and then deploy it in your Pulsar cluster using the pulsar-admin command.

    1. bin/pulsar-admin functions create \
    2. --jar target/my-jar-with-dependencies.jar \
    3. --classname org.example.functions.WordCountFunction \
    4. --tenant public \
    5. --namespace default \
    6. --name word-count \
    7. --inputs persistent://public/default/sentences \
    8. --output persistent://public/default/count

Write a function for content-based routing

  1. Write the function in Python using the SDK for Python.

    1. from pulsar import Function
    2. class RoutingFunction(Function):
    3. def __init__(self):
    4. self.fruits_topic = "persistent://public/default/fruits"
    5. self.vegetables_topic = "persistent://public/default/vegetables"
    6. def is_fruit(item):
    7. return item in [b"apple", b"orange", b"pear", b"other fruits..."]
    8. def is_vegetable(item):
    9. return item in [b"carrot", b"lettuce", b"radish", b"other vegetables..."]
    10. def process(self, item, context):
    11. if self.is_fruit(item):
    12. context.publish(self.fruits_topic, item)
    13. elif self.is_vegetable(item):
    14. context.publish(self.vegetables_topic, item)
    15. else:
    16. warning = "The item {0} is neither a fruit nor a vegetable".format(item)
    17. context.get_logger().warn(warning)
  2. Suppose this code is stored in ~/router.py, then you can deploy it in your Pulsar cluster using the pulsar-admin command.

    1. bin/pulsar-admin functions create \
    2. --py ~/router.py \
    3. --classname router.RoutingFunction \
    4. --tenant public \
    5. --namespace default \
    6. --name route-fruit-veg \
    7. --inputs persistent://public/default/basket-items

Write a window function for word count

Tutorials - 图2note

Currently, window functions are only available in Java.

This example demonstrates how to use the language-native interface to write a window function in Java.

Each input message is a sentence that is split into words and each word is counted. The built-in counter state is used to keep track of the word count in a persistent and consistent manner.

  1. public class WordCountFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) {
  4. Arrays.asList(input.split("\\s+")).forEach(word -> context.incrCounter(word, 1));
  5. return null;
  6. }
  7. }