在使用 Pulsar Functions 前,需要学习如何设置 Pulsar Functions worker,以及如何配置 Functions 运行时

Pulsar functions-worker is a logic component to run Pulsar Functions in cluster mode. Two options are available, and you can select either of the two options based on your requirements.

Note The --- Service Urls--- lines in the following diagrams represent Pulsar service URLs that Pulsar client and admin use to connect to a Pulsar cluster.

与 brokers 一起运行 Functions-worker

The following diagram illustrates the deployment of functions-workers running along with brokers.

assets/functions-worker-corun.png

To enable functions-worker running as part of a broker, you need to set functionsWorkerEnabled to true in the broker.conf file.

  1. functionsWorkerEnabled=true

When you set functionsWorkerEnabled to true, it means that you start functions-worker as part of a broker. You need to configure the conf/functions_worker.yml file to customize your functions_worker.

在与 broker 一起运行 Functions-worker 时,需要先配置 Functions-worker,再与 broker 一起启动。

配置 Functions-Worker 以与 brokers 一起运行

In this mode, since functions-worker is running as part of broker, most of the settings already inherit from your broker configuration (for example, configurationStore settings, authentication settings, and so on).

Pay attention to the following required settings when configuring functions-worker in this mode.

  • numFunctionPackageReplicas:存储 function 包的副本数。 默认值是 1,对独立部署很有用。 对于生产部署,为确保其高可用性,需设置为大于 2
  • pulsarFunctionsCluster:设置 Pulsar 集群名称 (与 clusterName 在 broker 配置中的设置相同)。

If authentication is enabled on the BookKeeper cluster, configure the following BookKeeper authentication settings.

  • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
  • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
  • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

同时运行 Functions-worker 和 broker

Once you have configured the functions_worker.yml file, you can start or restart your broker.

And then you can use the following command to verify if functions-worker is running well.

  1. curl <broker-ip>:8080/admin/v2/worker/cluster

After entering the command above, a list of active function workers in the cluster is returned. The output is something similar as follows.

  1. [{"workerId":"<worker-id>","workerHostname":"<worker-hostname>","port":8080}]

单独运行 Functions-worker

This section illustrates how to run functions-worker as a separate process in separate machines.

assets/functions-worker-separated.png

Note In this mode, make sure functionsWorkerEnabled is set to false, so you won’t start functions-worker with brokers by mistake.

配置 Functions-Worker 以单独运行

To run function-worker separately, you have to configure the following parameters.

Worker 参数

  • workerId:类型为字符串。 在集群中是唯一的,用于标识 worker 计算机。
  • workerHostname:worker 计算机的主机名。
  • workerPort:worker 服务器的监听端口。 在未进行自定义时,请使用其默认值。
  • workerPortTls:worker 服务器监听的 TLS 端口。 在未进行自定义时,请使用其默认值。

Function 包参数

  • numFunctionPackageReplicas:存储 function 包的副本数。 默认值为 1

Function 元数据参数

  • pulsarServiceUrl:broker 集群的 Pulsar 服务 URL。
  • pulsarWebServiceUrl:broker 集群的 Pulsar 网络服务 URL。
  • pulsarFunctionsCluster:设置 Pulsar 集群名称 (与 clusterName 在 broker 配置中的设置相同)。

If authentication is enabled for your broker cluster, you should configure the authentication plugin and parameters for the functions worker to communicate with the brokers.

  • clientAuthenticationPlugin
  • clientAuthenticationParameters

安全设置

If you want to enable security on functions workers, you should:

Enable TLS transport encryption

To enable TLS transport encryption, configure the following settings.

  1. tlsEnabled: true
  2. tlsCertificateFilePath: /path/to/functions-worker.cert.pem
  3. tlsKeyFilePath: /path/to/functions-worker.key-pk8.pem
  4. tlsTrustCertsFilePath: /path/to/ca.cert.pem

For details on TLS encryption, refer to Transport Encryption using TLS.

启用身份验证提供程序

To enable authentication on Functions Worker, configure the following settings.

Note Substitute the providers list with the providers you want to enable.

  1. authenticationEnabled: true
  2. authenticationProviders: [ provider1, provider2 ]

For SASL Authentication provider, add saslJaasClientAllowedIds and saslJaasBrokerSectionName under properties if needed.

  1. properties:
  2. saslJaasClientAllowedIds: .*pulsar.*
  3. saslJaasBrokerSectionName: Broker

For Token Authentication prodivder, add necessary settings under properties if needed. 更多详细信息,请参阅 Token Authentication

  1. properties:
  2. tokenSecretKey: file://my/secret.key
  3. # If using public/private
  4. # tokenPublicKey: file:///path/to/public.key
启用授权提供程序

To enable authorization on Functions Worker, you need to configure authorizationEnabled and configurationStoreServers. The authentication provider connects to configurationStoreServers to receive namespace policies.

  1. authorizationEnabled: true
  2. configurationStoreServers: <configuration-store-servers>

You should also configure a list of superuser roles. The superuser roles are able to access any admin API. The following is a configuration example.

  1. superUserRoles:
  2. - role1
  3. - role2
  4. - role3

BookKeeper 身份验证

If authentication is enabled on the BookKeeper cluster, you should configure the BookKeeper authentication settings as follows:

  • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
  • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
  • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

启动 Functions-worker

Once you have finished configuring the functions_worker.yml configuration file, you can use the following command to start a functions-worker:

  1. bin/pulsar functions-worker

为 Functions-workers 配置 Proxies

When you are running functions-worker in a separate cluster, the admin rest endpoints are split into two clusters. functions, function-worker, source and sink endpoints are now served by the functions-worker cluster, while all the other remaining endpoints are served by the broker cluster. Hence you need to configure your pulsar-admin to use the right service URL accordingly.

In order to address this inconvenience, you can start a proxy cluster for routing the admin rest requests accordingly. Hence you will have one central entry point for your admin service.

If you already have a proxy cluster, continue reading. If you haven’t setup a proxy cluster before, you can follow the instructions to start proxies.

assets/functions-worker-separated.png

To enable routing functions related admin requests to functions-worker in a proxy, you can edit the proxy.conf file to modify the following settings:

  1. functionWorkerWebServiceURL=<pulsar-functions-worker-web-service-url>
  2. functionWorkerWebServiceURLTLS=<pulsar-functions-worker-web-service-url>

对比与 Broker 一起运行和单独运行

As described above, you can run Function-worker with brokers, or run it separately. And it is more convenient to run functions-workers along with brokers. However, running functions-workers in a separate cluster provides better resource isolation for running functions in Process or Thread mode.

Use which mode for your cases, refer to the following guidelines to determine.

Use the Run-with-Broker mode in the following cases:

  • a)在 ProcessThread 模式下运行 functions,则不需要进行资源隔离;
  • b)在 Kubernetes 上配置 functions-worker 以运行 fucntions(Kubernetes 解决了资源隔离问题)。

Use the Run-separately mode in the following cases:

  • a) 没有 Kubernetes 集群;
  • b) 不想单独运行 functions 或 brokers。

故障排除

Error message: Namespace missing local cluster name in clusters list

  1. Failed to get partitioned topic metadata: org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: Namespace missing local cluster name in clusters list: local_cluster=xyz ns=public/functions clusters=[standalone]

The error message prompts when either of the cases occurs:

  • a) broker 是以 functionsWorkerEnabled=true 开始的,但是未在 conf/functions_worker.yaml 文件中将 pulsarFunctionsCluster 设置为正确的集群;
  • b) 当一个集群中的 brokers 运行良好,而另一个集群中的 brokers 运行有问题时,用 functionsWorkerEnabled=true 建立一个 Pulsar 集群的跨机房副本。

Workaround

If any of these cases happens, follow the instructions below to fix the problem:

  1. 获取 public/functions 命名空间的当前集群列表。
  1. bin/pulsar-admin namespaces get-clusters public/functions
  1. 检查集群是否在集群列表中。 如果集群不在列表中,则将其添加到列表中,并更新列表。
  1. bin/pulsar-admin namespaces set-clusters --cluster=<existing-clusters>,<new-cluster> public/functions
  1. conf/functions_worker.yml 文件中为 pulsarFunctionsCluster 设置正确的集群名称。