Pulsar Functions State Storage (Developer Preview)

Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper table servicefor storing the State for functions. For example, A WordCount function can store its counters state into BookKeeper's table service via Pulsar Functions State API.

API

Java API

Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the Context object whenyou are using Java SDK functions.

incrCounter

  1. /**
  2. * Increment the builtin distributed counter refered by key
  3. * @param key The name of the key
  4. * @param amount The amount to be incremented
  5. */
  6. void incrCounter(String key, long amount);

Application can use incrCounter to change the counter of a given key by the given amount.

getCounter

  1. /**
  2. * Retrieve the counter value for the key.
  3. *
  4. * @param key name of the key
  5. * @return the amount of the counter value for this key
  6. */
  7. long getCounter(String key);

Application can use getCounter to retrieve the counter of a given key mutated by incrCounter.

Besides the counter API, Pulsar also exposes a general key/value API for functions to storegeneral key/value state.

putState

  1. /**
  2. * Update the state value for the key.
  3. *
  4. * @param key name of the key
  5. * @param value state value of the key
  6. */
  7. void putState(String key, ByteBuffer value);

getState

  1. /**
  2. * Retrieve the state value for the key.
  3. *
  4. * @param key name of the key
  5. * @return the state value for the key.
  6. */
  7. ByteBuffer getState(String key);

Python API

State currently is not supported at Python SDK.

Query State

A Pulsar Function can use the State API for storing state into Pulsar's state storageand retrieving state back from Pulsar's state storage. Additionally Pulsar also providesCLI commands for querying its state.

  1. $ bin/pulsar-admin functions querystate \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <function-name> \
  5. --state-storage-url <bookkeeper-service-url> \
  6. --key <state-key> \
  7. [---watch]

If —watch is specified, the CLI will watch the value of the provided state-key.

Example

Java Example

WordCountFunction is a very good exampledemonstrating on how Application can easily store state in Pulsar Functions.

  1. public class WordCountFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) throws Exception {
  4. Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
  5. return null;
  6. }
  7. }

The logic of this WordCount function is pretty simple and straightforward:

  • The function first splits the received String into multiple words using regex \..
  • For each word, the function increments the corresponding counter by 1 (via incrCounter(key, amount)).

Python Example

State currently is not supported at Python SDK.