Deploy and manage functions worker

在使用 Pulsar Functions 前,需要学习如何设置 Pulsar Functions worker,以及如何配置 Functions 运行时

Pulsar functions-worker is a logic component to run Pulsar Functions in cluster mode. 有两种不同的选择,你可以选择一种你需要的方式:

Note
The --- Service Urls--- lines in the following diagrams represent Pulsar service URLs that Pulsar client and admin use to connect to a Pulsar cluster.

与 brokers 一起运行 Functions-worker

The following diagram illustrates the deployment of functions-workers running along with brokers.

assets/functions-worker-corun.png

To enable functions-worker running as part of a broker, you need to set functionsWorkerEnabled to true in the broker.conf file.

  1. functionsWorkerEnabled=true

如果functionsWorkerEnabled设置为 true,Functions-worker 会作为 broker 的一部分运行。 You need to configure the conf/functions_worker.yml file to customize your functions_worker.

在与 broker 一起运行 Functions-worker 时,需要先配置 Functions-worker,再与 broker 一起启动。

配置 Functions-Worker 以与 brokers 一起运行

在这个模式下,让 functions-worker 在 broker 上运行,其大多数配置已经继承了 broker 的配置(如配置存储设置,权限配置等等)。

Pay attention to the following required settings when configuring functions-worker in this mode.

  • numFunctionPackageReplicas:存储 function 包的副本数。 默认值是 1,对独立部署很有用。 对于生产环境部署,为确保其高可用性,需设置为大于 2
  • initializedDlogMetadata: Whether to initialize distributed log metadata in runtime. 如果设置为 true,需要确保通过 bin/pulsar initialize-cluster-metadata 命令对其进行了初始化。

If authentication is enabled on the BookKeeper cluster, configure the following BookKeeper authentication settings.

  • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
  • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
  • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

配置和 broker 共同运行的 Stateful-Functions

如果想使用 Stateful-Functions 相关的函数(例如,putState()queryState() 相关的接口),参考以下步骤。

  1. Enable the streamStorage service in the BookKeeper.

    现在服务使用的是 NAR 包,所以需要在 bookkeeper.conf 中进行配置。

    1. extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent

    在启动 bookie 后,使用以下方法检查 streamStorage 服务是否正常启动。

    输入:

    1. telnet localhost 4181

    输出:

    1. Trying 127.0.0.1...
    2. Connected to localhost.
    3. Escape character is '^]'.
  2. functions_worker.yml 中打开此功能。

  1. ```text
  2. stateStorageServiceUrl: bk://<bk-service-url>:4181
  3. ```
  4. `bk-service-url` is the service URL pointing to the BookKeeper table service.

同时运行 Functions-worker 和 broker

Once you have configured the functions_worker.yml file, you can start or restart your broker.

And then you can use the following command to verify if functions-worker is running well.

  1. curl <broker-ip>:8080/admin/v2/worker/cluster

After entering the command above, a list of active function workers in the cluster is returned. 会输出类似以下的内容:

  1. [{"workerId":"<worker-id>","workerHostname":"<worker-hostname>","port":8080}]

单独运行 Functions-worker

This section illustrates how to run functions-worker as a separate process in separate machines.

assets/functions-worker-separated.png

Note
In this mode, make sure functionsWorkerEnabled is set to false, so you won’t start functions-worker with brokers by mistake. 当想通过访问 functions-worker 来管理 functions 时,pulsar-admin 客户端工具或者其他客户端应该用 workerHostnameworkerPort 这些配置在 Worker parameters 中的参数来生成一个 --admin-url

配置 Functions-Worker 以单独运行

To run function-worker separately, you have to configure the following parameters.

Worker 参数

  • workerId:类型为字符串。 它是整个集群是唯一的,用于标识每台 worker 机器
  • workerHostname:worker 计算机的主机名。
  • workerPort:worker 服务器的监听端口。 在未进行自定义时,请使用其默认值。
  • workerPortTls:worker 服务器监听的 TLS 端口。 在未进行自定义时,请使用其默认值。

Function 包参数

  • numFunctionPackageReplicas:存储 function 包的副本数。 默认值为 1

Function 元数据参数

  • pulsarServiceUrl:broker 集群的 Pulsar 服务 URL。
  • pulsarWebServiceUrl: The Pulsar web service URL for your broker cluster.
  • pulsarFunctionsCluster:设置 Pulsar 集群名称 (与 clusterName 在 broker 配置中的设置相同)。

If authentication is enabled for your broker cluster, you should configure the authentication plugin and parameters for the functions worker to communicate with the brokers.

  • clientAuthenticationPlugin
  • clientAuthenticationParameters

安全设置

If you want to enable security on functions workers, you should:

Enable TLS transport encryption

To enable TLS transport encryption, configure the following settings.

  1. useTLS: true
  2. pulsarServiceUrl: pulsar+ssl://localhost:6651/
  3. pulsarWebServiceUrl: https://localhost:8443
  4. tlsEnabled: true
  5. tlsCertificateFilePath: /path/to/functions-worker.cert.pem
  6. tlsKeyFilePath: /path/to/functions-worker.key-pk8.pem
  7. tlsTrustCertsFilePath: /path/to/ca.cert.pem
  8. // Pulsar 客户端用于和 Pulsar broker 可靠通信所需的证书的存放路径。
  9. brokerClientTrustCertsFilePath: /path/to/ca.cert.pem

For details on TLS encryption, refer to Transport Encryption using TLS.

启用身份验证提供程序

要在函数 worker 启用身份验证,你需要配置以下信息。

Note
Substitute the providers list with the providers you want to enable.

  1. authenticationEnabled: true
  2. authenticationProviders: [ provider1, provider2 ]

For TLS Authentication provider, follow the example below to add the necessary settings. 查看 TLS 认证 可以了解到详细的信息。

  1. brokerClientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls
  2. brokerClientAuthenticationParameters: tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/admin.key-pk8.pem
  3. authenticationEnabled: true
  4. authenticationProviders: ['org.apache.pulsar.broker.authentication.AuthenticationProviderTls']

For SASL Authentication provider, add saslJaasClientAllowedIds and saslJaasBrokerSectionName under properties if needed.

  1. properties:
  2. saslJaasClientAllowedIds: .*pulsar.*
  3. saslJaasBrokerSectionName: Broker

For Token Authentication provider, add necessary settings for properties if needed. 更多详细信息,请参阅 Token Authentication。 注意:秘钥文件必须是 DER 编码

  1. properties:
  2. tokenSecretKey: file://my/secret.key
  3. # If using public/private
  4. # tokenPublicKey: file:///path/to/public.key
启用授权提供程序

如果需要启用函数 Worker 授权机制,你必须配置authorizationEnabledauthorizationProviderconfigurationStoreServers。 The authentication provider connects to configurationStoreServers to receive namespace policies.

  1. authorizationEnabled: true
  2. authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
  3. configurationStoreServers: <configuration-store-servers>

You should also configure a list of superuser roles. The superuser roles are able to access any admin API. The following is a configuration example.

  1. superUserRoles:
  2. - role1
  3. - role2
  4. - role3
启用端到端加密

你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。

要启用 Functions Worker 端到端加密,可以在命令行使用 --producer-config 进行配置,更多信息可以参考 here

CryptoConfig 的相关配置信息包含到 ProducerConfig 中。 关于 CryptoConfig 的具体可配置字段信息如下:

  1. public class CryptoConfig {
  2. private String cryptoKeyReaderClassName;
  3. private Map cryptoKeyReaderConfig;
  4. private String[] encryptionKeys;
  5. private ProducerCryptoFailureAction producerCryptoFailureAction;
  6. private ConsumerCryptoFailureAction consumerCryptoFailureAction;
  7. }
  • producerCryptoFailureAction: 在生产者加密数据失败时,执行 FAILSEND 其中之一的操作。
  • consumerCryptoFailureAction: 在消费者解密数据失败时,执行 FAIL, DISCARD, CONSME 其中之一的操作。

BookKeeper 身份验证

如果要开启 BooKeeper 集群的身份认证,你必须配置以下 Bookeeper 认证选项:

  • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
  • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
  • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

启动 Functions-worker

一旦配置完 functions_worker.yml 文件,可以后台启动 functions-worker ,使用 nohup 命令,结合 pulsar-daemon 客户端工具进行:

  1. bin/pulsar-daemon start functions-worker

也可以前台启动 functions-worker ,结合 pulsar 客户端工具进行:

  1. bin/pulsar functions-worker

为 Functions-workers 配置 Proxies

When you are running functions-worker in a separate cluster, the admin rest endpoints are split into two clusters. functions, function-worker, source and sink endpoints are now served by the functions-worker cluster, while all the other remaining endpoints are served by the broker cluster. Hence you need to configure your pulsar-admin to use the right service URL accordingly.

In order to address this inconvenience, you can start a proxy cluster for routing the admin rest requests accordingly. Hence you will have one central entry point for your admin service.

If you already have a proxy cluster, continue reading. If you haven’t setup a proxy cluster before, you can follow the instructions to start proxies.

assets/functions-worker-separated.png

To enable routing functions related admin requests to functions-worker in a proxy, you can edit the proxy.conf file to modify the following settings:

  1. functionWorkerWebServiceURL=<pulsar-functions-worker-web-service-url>
  2. functionWorkerWebServiceURLTLS=<pulsar-functions-worker-web-service-url>

对比与 Broker 一起运行和单独运行

As described above, you can run Function-worker with brokers, or run it separately. And it is more convenient to run functions-workers along with brokers. However, running functions-workers in a separate cluster provides better resource isolation for running functions in Process or Thread mode.

Use which mode for your cases, refer to the following guidelines to determine.

Use the Run-with-Broker mode in the following cases:

  • a)在 ProcessThread 模式下运行 functions,则不需要进行资源隔离;
  • b)在 Kubernetes 上配置 functions-worker 以运行 fucntions(Kubernetes 解决了资源隔离问题)。

Use the Run-separately mode in the following cases:

  • a) 没有 Kubernetes 集群;
  • b) 不想单独运行 functions 或 brokers。

故障排除

Error message: Namespace missing local cluster name in clusters list

  1. Failed to get partitioned topic metadata: org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: Namespace missing local cluster name in clusters list: local_cluster=xyz ns=public/functions clusters=[standalone]

The error message prompts when either of the cases occurs:

  • a) broker 是以 functionsWorkerEnabled=true 开始的,但是未在 conf/functions_worker.yaml 文件中将 pulsarFunctionsCluster 设置为正确的集群;
  • b) 当一个集群中的 brokers 运行良好,而另一个集群中的 brokers 运行有问题时,用 functionsWorkerEnabled=true 建立一个 Pulsar 集群的跨机房副本。

Workaround

If any of these cases happens, follow the instructions below to fix the problem:

  1. Disable Functions Worker by setting functionsWorkerEnabled=false, and restart brokers.

  2. 获取 public/functions 命名空间的当前集群列表。

  1. bin/pulsar-admin namespaces get-clusters public/functions
  1. 检查集群是否在集群列表中。 如果集群不在列表中,则将其添加到列表中,并更新列表。
  1. bin/pulsar-admin namespaces set-clusters --clusters <existing-clusters>,<new-cluster> public/functions
  1. After setting the cluster successfully, enable functions worker by setting functionsWorkerEnabled=true.

  2. Set the correct cluster name in pulsarFunctionsCluster in the conf/functions_worker.yml file, and restart brokers.