Use metrics to monitor functions

To ensure that running functions are healthy at any time, you can configure functions to publish arbitrary metrics to the metrics interface that can be queried.

Use metrics to monitor functions - 图1note

Using the language-native interface for Java or Python is not able to publish metrics and stats to Pulsar.

You can use both built-in metrics and customized metrics to monitor functions.

  • Use the built-in function metrics. Pulsar Functions expose the metrics that can be collected and used for monitoring the health of Java, Python, and Go functions. You can check the metrics by following the monitoring guide.
  • Set your customized metrics. In addition to the built-in metrics, Pulsar allows you to customize metrics for Java and Python functions. Function workers collect user-defined metrics to Prometheus automatically and you can check them in Grafana.

Here is an example of how to customize metrics for Java, Python and Go functions by using the Context object on a per-key basis. For example, you can set a metric for the process-count key and set another one for the elevens-count key every time the function processes a message.

  • Java
  • Python
  • Go
  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. public class MetricRecorderFunction implements Function<Integer, Void> {
  4. @Override
  5. public void apply(Integer input, Context context) {
  6. // Records the metric 1 every time a message arrives
  7. context.recordMetric("hit-count", 1);
  8. // Records the metric only if the arriving number equals 11
  9. if (input == 11) {
  10. context.recordMetric("elevens-count", 1);
  11. }
  12. return null;
  13. }
  14. }
  1. from pulsar import Function
  2. class MetricRecorderFunction(Function):
  3. def process(self, input, context):
  4. context.record_metric('hit-count', 1)
  5. if input == 11:
  6. context.record_metric('elevens-count', 1)
  1. func metricRecorderFunction(ctx context.Context, in []byte) error {
  2. inputstr := string(in)
  3. fctx, ok := pf.FromContext(ctx)
  4. if !ok {
  5. return errors.New("get Go Functions Context error")
  6. }
  7. fctx.RecordMetric("hit-count", 1)
  8. if inputstr == "eleven" {
  9. fctx.RecordMetric("elevens-count", 1)
  10. }
  11. return nil
  12. }