Configure Functions runtime

You can use the following methods to run functions.

  • Thread: Invoke functions threads in functions worker.
  • Process: Invoke functions in processes forked by functions worker.
  • Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
note

Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.

The differences of the thread and process modes are:

  • Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with functions worker.
  • Process mode: when a function runs in process mode, it runs on the same machine that functions worker runs.

Configure thread runtime

It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. threadGroupName: "Your Function Container Group"

Thread runtime is only supported in Java function.

Configure process runtime

When you enable Process runtime, you do not need to configure anything.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # the directory for storing the function logs
  4. logDirectory:
  5. # change the jar location only when you put the java instance jar in a different location
  6. javaInstanceJarLocation:
  7. # change the python instance location only when you put the python instance jar in a different location
  8. pythonInstanceLocation:
  9. # change the extra dependencies location:
  10. extraFunctionDependenciesDir:

Process runtime is supported in Java, Python, and Go functions.

Configure Kubernetes runtime

When the functions worker generates Kubernetes manifests and apply the manifests, the Kubernetes runtime works. If you have run functions worker on Kubernetes, you can use the serviceAccount associated with the pod that the functions worker is running in. Otherwise, you can configure it to communicate with a Kubernetes cluster.

The manifests, generated by the functions worker, include a StatefulSet, a Service (used to communicate with the pods), and a Secret for auth credentials (when applicable). The StatefulSet manifest (by default) has a single pod, with the number of replicas determined by the “parallelism” of the function. On pod boot, the pod downloads the function payload (via the functions worker REST API). The pod’s container image is configurable, but must have the functions runtime.

The Kubernetes runtime supports secrets, so you can create a Kubernetes secret and expose it as an environment variable in the pod. The Kubernetes runtime is extensible, you can implement classes and customize the way how to generate Kubernetes manifests, how to pass auth data to pods, and how to integrate secrets.

tip

For the rules of translating Pulsar object names into Kubernetes resource labels, see here.

Basic configuration

It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
  4. k8Uri:
  5. # the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
  6. jobNamespace:
  7. # The Kubernetes pod name to run the function instances. It is set to
  8. # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty
  9. jobName:
  10. # the docker image to run function instance. by default it is `apachepulsar/pulsar`
  11. pulsarDockerImageName:
  12. # the docker image to run function instance according to different configurations provided by users.
  13. # By default it is `apachepulsar/pulsar`.
  14. # e.g:
  15. # functionDockerImages:
  16. # JAVA: JAVA_IMAGE_NAME
  17. # PYTHON: PYTHON_IMAGE_NAME
  18. # GO: GO_IMAGE_NAME
  19. functionDockerImages:
  20. # "The image pull policy for image used to run function instance. By default it is `IfNotPresent`
  21. imagePullPolicy: IfNotPresent
  22. # the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
  23. # if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
  24. pulsarRootDir:
  25. # The config admin CLI allows users to customize the configuration of the admin cli tool, such as:
  26. # `/bin/pulsar-admin and /bin/pulsarctl`. By default it is `/bin/pulsar-admin`. If you want to use `pulsarctl`
  27. # you need to set this setting accordingly
  28. configAdminCLI:
  29. # this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
  30. # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
  31. # is running. setting this to false if your function worker is not running as a k8 pod.
  32. submittingInsidePod: false
  33. # setting the pulsar service url that pulsar function should use to connect to pulsar
  34. # if it is not set, it will use the pulsar service url configured in worker service
  35. pulsarServiceUrl:
  36. # setting the pulsar admin url that pulsar function should use to connect to pulsar
  37. # if it is not set, it will use the pulsar admin url configured in worker service
  38. pulsarAdminUrl:
  39. # The flag indicates to install user code dependencies. (applied to python package)
  40. installUserCodeDependencies:
  41. # The repository that pulsar functions use to download python dependencies
  42. pythonDependencyRepository:
  43. # The repository that pulsar functions use to download extra python dependencies
  44. pythonExtraDependencyRepository:
  45. # the custom labels that function worker uses to select the nodes for pods
  46. customLabels:
  47. # The expected metrics collection interval, in seconds
  48. expectedMetricsCollectionInterval: 30
  49. # Kubernetes Runtime will periodically checkback on
  50. # this configMap if defined and if there are any changes
  51. # to the kubernetes specific stuff, we apply those changes
  52. changeConfigMap:
  53. # The namespace for storing change config map
  54. changeConfigMapNamespace:
  55. # The ratio cpu request and cpu limit to be set for a function/source/sink.
  56. # The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
  57. cpuOverCommitRatio: 1.0
  58. # The ratio memory request and memory limit to be set for a function/source/sink.
  59. # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
  60. memoryOverCommitRatio: 1.0
  61. # The port inside the function pod which is used by the worker to communicate with the pod
  62. grpcPort: 9093
  63. # The port inside the function pod on which prometheus metrics are exposed
  64. metricsPort: 9094
  65. # The directory inside the function pod where nar packages will be extracted
  66. narExtractionDirectory:
  67. # The classpath where function instance files stored
  68. functionInstanceClassPath:
  69. # Upload the builtin sources/sinks to BookKeeper.
  70. # True by default.
  71. uploadBuiltinSinksSources: true
  72. # the directory for dropping extra function dependencies
  73. # if it is not an absolute path, it is relative to `pulsarRootDir`
  74. extraFunctionDependenciesDir:
  75. # Additional memory padding added on top of the memory requested by the function per on a per instance basis
  76. percentMemoryPadding: 10
  77. # The duration (in seconds) before the StatefulSet is deleted after a function stops or restarts.
  78. # Value must be a non-negative integer. 0 indicates the StatefulSet is deleted immediately.
  79. # Default is 5 seconds.
  80. gracePeriodSeconds: 5

If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.

Run standalone functions worker on Kubernetes

If you run functions worker standalone (that is, not embedded) on Kubernetes, you need to configure pulsarSerivceUrl to be the URL of the broker and pulsarAdminUrl as the URL to the functions worker.

For example, both Pulsar brokers and Function Workers run in the pulsar K8S namespace. The brokers have a service called brokers and the functions worker has a service called func-worker. The settings are as follows:

  1. pulsarServiceUrl: pulsar://broker.pulsar:6650 // or pulsar+ssl://broker.pulsar:6651 if using TLS
  2. pulsarAdminUrl: http://func-worker.pulsar:8080 // or https://func-worker:8443 if using TLS

Run RBAC in Kubernetes clusters

If you run RBAC in your Kubernetes cluster, make sure that the service account you use for running functions workers (or brokers, if functions workers run along with brokers) have permissions on the following Kubernetes APIs.

  • services
  • configmaps
  • pods
  • apps.statefulsets

The following is sufficient:

  1. apiVersion: rbac.authorization.k8s.io/v1beta1
  2. kind: ClusterRole
  3. metadata:
  4. name: functions-worker
  5. rules:
  6. - apiGroups: [""]
  7. resources:
  8. - services
  9. - configmaps
  10. - pods
  11. verbs:
  12. - '*'
  13. - apiGroups:
  14. - apps
  15. resources:
  16. - statefulsets
  17. verbs:
  18. - '*'
  19. ---
  20. apiVersion: v1
  21. kind: ServiceAccount
  22. metadata:
  23. name: functions-worker
  24. ---
  25. apiVersion: rbac.authorization.k8s.io/v1beta1
  26. kind: ClusterRoleBinding
  27. metadata:
  28. name: functions-worker
  29. roleRef:
  30. apiGroup: rbac.authorization.k8s.io
  31. kind: ClusterRole
  32. name: functions-worker
  33. subjectsKubernetesSec:
  34. - kind: ServiceAccount
  35. name: functions-worker

If the service-account is not properly configured, an error message similar to this is displayed:

  1. 22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
  2. io.kubernetes.client.ApiException: Forbidden
  3. at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
  4. at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
  5. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  6. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  7. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  8. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  9. at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
  10. at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]

Integrate Kubernetes secrets

In order to safely distribute secrets, Pulasr Functions can reference Kubernetes secrets. To enable this, set the secretsProviderConfiguratorClassName to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator.

You can create a secret in the namespace where your functions are deployed. For example, you deploy functions to the pulsar-func Kubernetes namespace, and you have a secret named database-creds with a field name password, which you want to mount in the pod as an environment variable called DATABASE_PASSWORD. The following functions configuration enables you to reference that secret and mount the value as an environment variable in the pod.

  1. tenant: "mytenant"
  2. namespace: "mynamespace"
  3. name: "myfunction"
  4. topicName: "persistent://mytenant/mynamespace/myfuncinput"
  5. className: "com.company.pulsar.myfunction"
  6. secrets:
  7. # the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
  8. DATABASE_PASSWORD:
  9. path: "database-creds"
  10. key: "password"

Enable token authentication

When you enable authentication for your Pulsar cluster, you need a mechanism for the pod running your function to authenticate with the broker.

The org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider interface provides support for any authentication mechanism. The functionAuthProviderClassName in function-worker.yml is used to specify your path to this implementation.

Pulsar includes an implementation of this interface for token authentication, and distributes the certificate authority via the same implementation. The configuration is similar as follows:

  1. functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider

For token authentication, the functions worker captures the token that is used to deploy (or update) the function. The token is saved as a secret and mounted into the pod.

For custom authentication or TLS, you need to implement this interface or use an alternative mechanism to provide authentication. If you use token authentication and TLS encryption to secure the communication with the cluster, Pulsar passes your certificate authority (CA) to the client, so the client obtains what it needs to authenticate the cluster, and trusts the cluster with your signed certificate.

note

If you use tokens that expire when deploying functions, these tokens will expire.

Run clusters with authentication

When you run a functions worker in a standalone process (that is, not embedded in the broker) in a cluster with authentication, you must configure your functions worker to interact with the broker and authenticate incoming requests. So you need to configure properties that the broker requires for authentication or authorization.

For example, if you use token authentication, you need to configure the following properties in the function-worker.yml file.

  1. clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
  2. clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
  3. configurationMetadataStoreUrl: zk:zookeeper-cluster:2181 # auth requires a connection to zookeeper
  4. authenticationProviders:
  5. - "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
  6. authorizationEnabled: true
  7. authenticationEnabled: true
  8. superUserRoles:
  9. - superuser
  10. - proxy
  11. properties:
  12. tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
  13. tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded
note

You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.

Customize Kubernetes runtime

The Kubernetes integration enables you to implement a class and customize how to generate manifests. You can configure it by setting runtimeCustomizerClassName in the functions-worker.yml file and use the fully qualified class name. You must implement the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer interface.

The functions (and sinks/sources) API provides a flag, customRuntimeOptions, which is passed to this interface.

To initialize the KubernetesManifestCustomizer, you can provide runtimeCustomizerConfig in the functions-worker.yml file. runtimeCustomizerConfig is passed to the public void initialize(Map<String, Object> config) function of the interface. runtimeCustomizerConfigis different from the customRuntimeOptions as runtimeCustomizerConfig is the same across all functions. If you provide both runtimeCustomizerConfig and customRuntimeOptions, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer.

Pulsar includes a built-in implementation. To use the basic implementation, set runtimeCustomizerClassName to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer. The built-in implementation initialized with runtimeCustomizerConfig enables you to pass a JSON document as customRuntimeOptions with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig and customRuntimeOptions are provided, BasicKubernetesManifestCustomizer uses customRuntimeOptions to override the configuration if there are conflicts in these two configurations.

Below is an example of customRuntimeOptions.

  1. {
  2. "jobName": "jobname", // the k8s pod name to run this function instance
  3. "jobNamespace": "namespace", // the k8s namespace to run this function in
  4. "extractLabels": { // extra labels to attach to the statefulSet, service, and pods
  5. "extraLabel": "value"
  6. },
  7. "extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
  8. "extraAnnotation": "value"
  9. },
  10. "nodeSelectorLabels": { // node selector labels to add on to the pod spec
  11. "customLabel": "value"
  12. },
  13. "tolerations": [ // tolerations to add to the pod spec
  14. {
  15. "key": "custom-key",
  16. "value": "value",
  17. "effect": "NoSchedule"
  18. }
  19. ],
  20. "resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
  21. "requests": {
  22. "cpu": 1,
  23. "memory": "4G"
  24. },
  25. "limits": {
  26. "cpu": 2,
  27. "memory": "8G"
  28. }
  29. }
  30. }

Run clusters with geo-replication

If you run multiple clusters tied together with geo-replication, it is important to use a different function namespace for each cluster. Otherwise, the function shares a namespace and potentially schedule across clusters.

For example, if you have two clusters: east-1 and west-1, you can configure the functions workers for east-1 and west-1 perspectively as follows.

  1. pulsarFunctionsCluster: east-1
  2. pulsarFunctionsNamespace: public/functions-east-1
  1. pulsarFunctionsCluster: west-1
  2. pulsarFunctionsNamespace: public/functions-west-1

This ensures the two different Functions Workers use distinct sets of topics for their internal coordination.

Configure standalone functions worker

When configuring a standalone functions worker, you need to configure properties that the broker requires, especially if you use TLS. And then Functions Worker can communicate with the broker.

You need to configure the following required properties.

  1. workerPort: 8080
  2. workerPortTls: 8443 # when using TLS
  3. tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # when using TLS
  4. tlsKeyFilePath: /etc/pulsar/tls/tls.key # when using TLS
  5. tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # when using TLS
  6. pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ when using TLS
  7. pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ when using TLS
  8. useTls: true # when using TLS, critical!