Configure Functions runtime

Pulsar Functions support the following methods to run functions.

  • Thread: Invoke functions in threads in Functions Worker.
  • Process: Invoke functions in processes forked by Functions Worker.
  • Kubernetes: Submit functions as Kubernetes StatefulSets by Functions Worker.

Note

Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.

线程和进程两种模式有如下不同:

  • Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
  • Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.

配置线程模式运行

It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:

  1. threadContainerFactory:
  2. threadGroupName: "Your Function Container Group"

Thread runtime is only supported in Java function.

配置进程模式运行

When you enable Process runtime, you do not need to configure anything.

  1. processContainerFactory:
  2. # 这个目录用来保存函数的日志
  3. logDirectory:
  4. # 改变当你提交一个 java jar包时,jar 包的存放路径。
  5. javaInstanceJarLocation:
  6. # 改变当你提交一个python 包时,包的存放路径
  7. pythonInstanceLocation:
  8. # 修改额外的依赖文件的路径
  9. extraFunctionDependenciesDir:

Process runtime is supported in Java, Python, and Go functions.

配置 Kubernetes 运行

It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.

  1. kubernetesContainerFactory:
  2. # kubernetes集群的地址,如果为空,那么将使用函数 worker 里面的 kubernetes 配置。
  3. k8Uri:
  4. # 运行函数实例的 kubernetes 命名空间 如果为空,则为默认的命名空间'default'
  5. jobNamespace:
  6. # 运行函数的 docker 镜像 by default it is `apachepulsar/pulsar`
  7. pulsarDockerImageName:
  8. # the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
  9. # if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
  10. pulsarRootDir:
  11. # this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
  12. # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
  13. # is running. setting this to false if your function worker is not running as a k8 pod.
  14. submittingInsidePod: false
  15. # setting the pulsar service url that pulsar function should use to connect to pulsar
  16. # if it is not set, it will use the pulsar service url configured in worker service
  17. pulsarServiceUrl:
  18. # setting the pulsar admin url that pulsar function should use to connect to pulsar
  19. # if it is not set, it will use the pulsar admin url configured in worker service
  20. pulsarAdminUrl:
  21. # the custom labels that function worker uses to select the nodes for pods
  22. customLabels:
  23. # the directory for dropping extra function dependencies
  24. # if it is not an absolute path, it is relative to `pulsarRootDir`
  25. extraFunctionDependenciesDir:
  26. # Additional memory padding added on top of the memory requested by the function per on a per instance basis
  27. percentMemoryPadding: 10

If you have already run a Pulsar cluster on Kubernetes, you can keep the settings unchanged at most of time.

However, if you enable RBAC on deploying your Pulsar cluster, make sure the service account you use for running Functions Workers (or brokers, if Functions Workers run along with brokers) have permissions on the following kubernetes APIs.

  • services
  • configmaps
  • pods
  • apps.statefulsets

Otherwise, you will not be able to create any functions. The following is an example of error message.

  1. 22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
  2. io.kubernetes.client.ApiException: Forbidden
  3. at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
  4. at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
  5. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  6. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  7. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  8. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  9. at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
  10. at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]

If this happens, you need to grant the required permissions to the service account used for running Functions Workers. An example to grant permissions is shown below: a service account functions-worker is granted with permissions to access Kubernetes resources services, configmaps, pods and apps.statefulsets.

  1. apiVersion: rbac.authorization.k8s.io/v1beta1
  2. kind: ClusterRole
  3. metadata:
  4. name: functions-worker
  5. rules:
  6. - apiGroups: [""]
  7. resources:
  8. - services
  9. - configmaps
  10. - pods
  11. verbs:
  12. - '*'
  13. - apiGroups:
  14. - apps
  15. resources:
  16. - statefulsets
  17. verbs:
  18. - '*'
  19. ---
  20. apiVersion: v1
  21. kind: ServiceAccount
  22. metadata:
  23. name: functions-worker
  24. ---
  25. apiVersion: rbac.authorization.k8s.io/v1beta1
  26. kind: ClusterRoleBinding
  27. metadata:
  28. name: functions-worker
  29. roleRef:
  30. apiGroup: rbac.authorization.k8s.io
  31. kind: ClusterRole
  32. name: functions-worker
  33. subjects:
  34. - kind: ServiceAccount
  35. name: functions-worker

Kubernetes CustomRuntimeOptions

The functions (and sinks/sources) API provides a flag, customRuntimeOptions which can be used to pass options to the runtime to customize how the runtime operates.

In the case of case of kubernetes, this is passed to an instance of the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer. This interface can be overridden and allows for a high degree of customization over how the K8S manifests are generated. The interface is injected by passing the class name to the runtimeCustomizerClassName in the functions-worker.yaml

To use the basic implementation, set org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer for the runtimeCustomerClassName property. This implementation takes the following customRuntimeOptions

  1. {
  2. "jobNamespace": "namespace", // the k8s namespace to run this function in
  3. "extractLabels": { // extra labels to attach to the statefulSet, service, and pods
  4. "extraLabel": "value"
  5. },
  6. "extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
  7. "extraAnnotation": "value"
  8. },
  9. "nodeSelectorLabels": { // node selector labels to add on to the pod spec
  10. "customLabel": "value"
  11. },
  12. "tolerations": [ // tolerations to add to the pod spec
  13. {
  14. "key": "custom-key",
  15. "value": "value",
  16. "effect": "NoSchedule"
  17. }
  18. ],
  19. "resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
  20. "requests": {
  21. "cpu": 1,
  22. "memory": "4G"
  23. },
  24. "limits": {
  25. "cpu": 2,
  26. "memory": "8G"
  27. }
  28. }
  29. }