Configure Functions runtime

你可以使用下列方式去运行函数。

  • Thread: Invoke functions threads in functions worker.
  • Process: Invoke functions in processes forked by functions worker.
  • Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.

Note
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.

线程和进程两种模式有如下不同:

  • 线程模式:当函数运行在线程模式时,函数和 Puslsar functions 的 work 是运行在同一个Java 虚拟机(JVM) 里面的。
  • 进程模式: 当函数运行在进程模式时,它们是运行在同一台机器上的不同 Java 虚拟机里面。

配置线程模式运行

It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. threadGroupName: "Your Function Container Group"

Thread runtime is only supported in Java function.

配置进程模式运行

When you enable Process runtime, you do not need to configure anything.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # the directory for storing the function logs
  4. logDirectory:
  5. # change the jar location only when you put the java instance jar in a different location
  6. javaInstanceJarLocation:
  7. # change the python instance location only when you put the python instance jar in a different location
  8. pythonInstanceLocation:
  9. # change the extra dependencies location:
  10. extraFunctionDependenciesDir:

Process runtime is supported in Java, Python, and Go functions.

配置 Kubernetes 运行

当函数 worker 生成 Kubernetes manifests,应用这份 manifests 时,Kubernetes 就会开始工作。 如果你将函数运行在 Kubernetes 里面,你能够使用serviceAccount 去关联正在运行该函数的 Pod。 然后,可以将其配置为与 Kubernetes 集群进行通信。

Mainifests 由函数 worker 生成,包含一个StatefulSet,一个Service(用于 pods 之间通信),和一个Secret(在需要的情况下,用于身份认证)。 默认情况下,StatefulSet只有一个 Pod,函数的 “并行度” 决定它的数量。 Pode 启动时, Pod 会下载函数的运行内容 (通过函数 worker 的REST API 下载)。 Pod 的容器镜像是可以配置的,但是必须先有函数运行时。

Kubernetes 运行时是支持 secrets 的,所以你能够创建一个 Kubernetes secret,并将其作为环境变量在 Pod 内可见。 Kubernetes 运行时是可扩展的,你可以实现类并定制,比如:如何生成 Kubernetes manifests、如果 传递认证数据给pod,如何整合 secrets等。

基础配置

It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
  4. k8Uri:
  5. # the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
  6. jobNamespace:
  7. # The Kubernetes pod name to run the function instances. It is set to
  8. # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty
  9. jobName:
  10. # the docker image to run function instance. 默认情况下是`apachepulsar/pulsar`
  11. pulsarDockerImageName:
  12. # docker 镜像根据用户提供的不同配置运行函数实例。
  13. # 默认是 `apachepulsar/pulsar`
  14. # e.g:
  15. # functionDockerImages:
  16. # JAVA: JAVA_IMAGE_NAME
  17. # PYTHON: PYTHON_IMAGE_NAME
  18. # GO: GO_IMAGE_NAME
  19. functionDockerImages:
  20. # "The image pull policy for image used to run function instance. By default it is `IfNotPresent`
  21. imagePullPolicy: IfNotPresent
  22. # the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
  23. # if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
  24. pulsarRootDir:
  25. # The config admin CLI allows users to customize the configuration of the admin cli tool, such as:
  26. # `/bin/pulsar-admin and /bin/pulsarctl`. By default it is `/bin/pulsar-admin`. If you want to use `pulsarctl`
  27. # you need to set this setting accordingly
  28. configAdminCLI:
  29. # this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
  30. # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
  31. # is running. setting this to false if your function worker is not running as a k8 pod.
  32. submittingInsidePod: false
  33. # setting the pulsar service url that pulsar function should use to connect to pulsar
  34. # if it is not set, it will use the pulsar service url configured in worker service
  35. pulsarServiceUrl:
  36. # setting the pulsar admin url that pulsar function should use to connect to pulsar
  37. # if it is not set, it will use the pulsar admin url configured in worker service
  38. pulsarAdminUrl:
  39. # The flag indicates to install user code dependencies. (applied to python package)
  40. installUserCodeDependencies:
  41. # The repository that pulsar functions use to download python dependencies
  42. pythonDependencyRepository:
  43. # The repository that pulsar functions use to download extra python dependencies
  44. pythonExtraDependencyRepository:
  45. # the custom labels that function worker uses to select the nodes for pods
  46. customLabels:
  47. # The expected metrics collection interval, in seconds
  48. expectedMetricsCollectionInterval: 30
  49. # Kubernetes Runtime will periodically checkback on
  50. # this configMap if defined and if there are any changes
  51. # to the kubernetes specific stuff, we apply those changes
  52. changeConfigMap:
  53. # The namespace for storing change config map
  54. changeConfigMapNamespace:
  55. # The ratio cpu request and cpu limit to be set for a function/source/sink.
  56. # The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
  57. cpuOverCommitRatio: 1.0
  58. # The ratio memory request and memory limit to be set for a function/source/sink.
  59. # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
  60. memoryOverCommitRatio: 1.0
  61. # The port inside the function pod which is used by the worker to communicate with the pod
  62. grpcPort: 9093
  63. # The port inside the function pod on which prometheus metrics are exposed
  64. metricsPort: 9094
  65. # The directory inside the function pod where nar packages will be extracted
  66. narExtractionDirectory:
  67. # The classpath where function instance files stored
  68. functionInstanceClassPath:
  69. # the directory for dropping extra function dependencies
  70. # if it is not an absolute path, it is relative to `pulsarRootDir`
  71. extraFunctionDependenciesDir:
  72. # Additional memory padding added on top of the memory requested by the function per on a per instance basis
  73. percentMemoryPadding: 10

如果函数 worker 运行在 Kubernetes 环境的 broker 中,你可以使用默认配置。

Kubernetes 运行独立的函数 worker

如果你想在 Kubernetes 上运行独立的函数 worker (即不嵌入在broker上运行),你必须在函数 worker 上配置pulsarSerivceUrlpulsarAdminUrl,来指定 broker 的地址。

例如,Pulsar brokers 和 函数 worker 都运行在 K8S的 pulsar命名空间下。 Broker 有一个叫做 brokers 的服务,函数 worker 有一个叫做 func-worker 的服务。 他们的设置如下:

  1. pulsarServiceUrl: pulsar://broker.pulsar:6650 // 如果使用了 TLS 就用 :pulsar+ssl://broker.pulsar:6651
  2. pulsarAdminUrl: http://func-worker.pulsar:8080 // 如果使用了TLS 就用: https://func-worker:8443

Kubernetes 集群中运行 RBAC

如果要在集群中运行 RBAC,需要确保运行函数worker(如果函数运行在 broker 上,那就是 broker )的 service account 有以下 Kubernetes API 的访问权限。

  • services
  • configmaps
  • pods
  • apps.statefulsets

如下是完整的配置:

  1. apiVersion: rbac.authorization.k8s.io/v1beta1
  2. kind: ClusterRole
  3. metadata:
  4. name: functions-worker
  5. rules:
  6. - apiGroups: [""]
  7. resources:
  8. - services
  9. - configmaps
  10. - pods
  11. verbs:
  12. - '*'
  13. - apiGroups:
  14. - apps
  15. resources:
  16. - statefulsets
  17. verbs:
  18. - '*'
  19. ---
  20. apiVersion: v1
  21. kind: ServiceAccount
  22. metadata:
  23. name: functions-worker
  24. ---
  25. apiVersion: rbac.authorization.k8s.io/v1beta1
  26. kind: ClusterRoleBinding
  27. metadata:
  28. name: functions-worker
  29. roleRef:
  30. apiGroup: rbac.authorization.k8s.io
  31. kind: ClusterRole
  32. name: functions-worker
  33. subjectsKubernetesSec:
  34. - kind: ServiceAccount
  35. name: functions-worker

如果 service account 的配置不正确,就会有如下的错误信息提示。

  1. 22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
  2. io.kubernetes.client.ApiException: Forbidden
  3. at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
  4. at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
  5. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  6. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  7. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  8. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  9. at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
  10. at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]

集成 Kubernetes secrets

为了确保信息安全 ,Pulsar 函数可以引用Kubernetes secrets。 如果要启用此功能,请将参数secretsProviderConfiguratorClassName设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator

你可以在部署函数的 kubernetes 命名空间中创建一个 secret。 例如,你的函数部署在 Kubernetes 的命名空间 pulsar-func中,你有一个名为database-creds的secret,它有一个字段password,你希望将其作为一个环境变量传递到 pod 中,并且想用名称DATABASE_PASSWORD获取到它。 那么下面的函数配置允许你将这个 secret 以环境变量的形式传递到 pod 里面。

  1. tenant: "mytenant"
  2. namespace: "mynamespace"
  3. name: "myfunction"
  4. topicName: "persistent://mytenant/mynamespace/myfuncinput"
  5. className: "com.company.pulsar.myfunction"
  6. secrets:
  7. # secret 中的`password`和`database-creds`将被挂载到名为 `DATABASE_PASSWORD `的环境变量中。
  8. DATABASE_PASSWORD:
  9. path: "database-creds"
  10. key: "password"

启用 Token 认证

当你开启 Pulsar 集群的身份认证时,你需要为运行函数的 pod 提供一种机制,以通过 broker的身份认证。

org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider 接口可以为任何认证机制提供支持。 配置文件function-worker.yml的配置项functionAuthProviderClassName 允许你自定义认证实现机制。

Pulsar 自带了令牌身份认证的实现,并通过相同的方式实现了分发证书授权。 配置如下:

  1. functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider

使用 Token 身份认证时,函数 worker 会获取到 token,并将其用于部署或者更新函数。 Token 通过 secret 的方式挂载到 pod 里面。

自定义的认证方式或者TLS, 你必须去实现上面的接口或者使用替代机制实现身份认证。 如果你在集群中使用 Token 身份认证和 TLS 加密实现安全通信,Pulsar 会将证书授权(CA)传递给客户端,所以客户端就可以获取到集群认证所需的内容,同时信任集群所签发的证书。

Note
If you use tokens that expire when deploying functions, these tokens will expire.

启用身份认证

当您在启用了身份认证的集群中独立运行函数 worker 时, 你必须配置函数 worker 和 broker 交互时传入身份认证信息。 所以你必须配置 broker 所需的身份认证和鉴权选项。

例如,如果是使用 Token 认证,你必须在function-worker.yml文件内配置如下属性。

  1. clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
  2. clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
  3. configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
  4. authenticationProviders:
  5. - "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
  6. authorizationEnabled: true
  7. authenticationEnabled: true
  8. superUserRoles:
  9. - superuser
  10. - proxy
  11. properties:
  12. tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
  13. tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded

Note
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.

定制 Kubernetes 运行时

Kubernetes 集成允许你实现类并自定义如何生成manifests. 你可以为配置文件functions-worker.yml的配置项runtimeCustomizerClassName指定一个全路径的类名。 这个类必须实现org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer接口。

函数(数据来源/ 数据去向) API 会提供了一个标记,customRuntimeOptions,传递到这个接口。

To initialize the KubernetesManifestCustomizer, you can provide runtimeCustomizerConfig in the functions-worker.yml file. runtimeCustomizerConfig is passed to the public void initialize(Map<String, Object> config) function of the interface. runtimeCustomizerConfigis different from the customRuntimeOptions as runtimeCustomizerConfig is the same across all functions. If you provide both runtimeCustomizerConfig and customRuntimeOptions, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer.

Pulsar 自带了一些实现。 若要使用基本的功能,可以将runtimeCustomizerClassName设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer。 The built-in implementation initialized with runtimeCustomizerConfig enables you to pass a JSON document as customRuntimeOptions with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig and customRuntimeOptions are provided, BasicKubernetesManifestCustomizer uses customRuntimeOptions to override the configuration if there are conflicts in these two configurations.

Below is an example of customRuntimeOptions.

  1. {
  2. "jobName": "jobname", // the k8s pod name to run this function instance
  3. "jobNamespace": "namespace", // the k8s namespace to run this function in
  4. "extractLabels": { // extra labels to attach to the statefulSet, service, and pods
  5. "extraLabel": "value"
  6. },
  7. "extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
  8. "extraAnnotation": "value"
  9. },
  10. "nodeSelectorLabels": { // node selector labels to add on to the pod spec
  11. "customLabel": "value"
  12. },
  13. "tolerations": [ // tolerations to add to the pod spec
  14. {
  15. "key": "custom-key",
  16. "value": "value",
  17. "effect": "NoSchedule"
  18. }
  19. ],
  20. "resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
  21. "requests": {
  22. "cpu": 1,
  23. "memory": "4G"
  24. },
  25. "limits": {
  26. "cpu": 2,
  27. "memory": "8G"
  28. }
  29. }
  30. }

使用跨域复制运行集群

如果你想使用跨域复制运行多个集群,每个集群必须使用不同的函数命名空间。 否则,函数会共享同一个命名空间,可能跨集群调度。

例如,假设你有两个集群:east-1west-1, 你可能对这两个函数做如下的运行配置:

  1. pulsarFunctionsCluster: east-1
  2. pulsarFunctionsNamespace: public/functions-east-1
  1. pulsarFunctionsCluster: west-1
  2. pulsarFunctionsNamespace: public/functions-west-1

这确保了两个不同的函数 Worker 使用两个不同的主题进行内部调度。

配置函数worker 独立运行

当需要配置独立运行函数 worker 时,你必须在 broker 里面配置如下参数,尤其是你需要使用 TLS 时。 然后函数 Worker 才可以和 broker 进行通信。

你必须需配置一下必选参数。

  1. workerPort: 8080
  2. workerPortTls: 8443 # 使用TLS的时候需要配置
  3. tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # 使用TLS的时候需要配置
  4. tlsKeyFilePath: /etc/pulsar/tls/tls.key # 使用TLS的时候需要配置
  5. tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # 使用TLS的时候需要配置
  6. pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ 使用TLS的时候需要配置
  7. pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ 使用TLS的时候需要配置
  8. useTls: true # 使用TLS的时候需要配置