Use filesystem offloader with Pulsar

本章将一步一步的指导你完成在 Pulsar 中安装和配置filesystem offloader。

安装

本节介绍如何安装 filesystem offloader。

前提条件

  • Pulsar: 2.4.2 或更高版本

步骤

下列示例将使用 Pulsar 2.5.1。

  1. 使用下列方式下载 Pulsar 压缩包:

    • Apache 镜像下载 Pulsar tar 包。

    • 下载页面下载 Pulsar tar 包。

    • 使用 wget 命令下载 Pulsar tar 包。

      1. wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz
  2. 下载并解压 Pulsar offloaders 程序包。

    1. wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz
    2. tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz

    Note

    • 如果在一个裸机集群中运行 Pulsar,请确保offloaders安装包已经在 broker 的所有 Pulsar 目录下解压缩。

    • 如果在 Docker 中运行 Pulsar 或使用 Docker 镜像(如 K8S 和 DCOS)部署 Pulsar,则可以使用 apachepulsar/pulsar-all 镜像。 apachepulsar/pulsar-all 镜像已经绑定了分层存储 offloader。

  3. 将 Pulsar offloaders 作为offloaders复制到Pulsar目录中。

    1. mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders
    2. ls offloaders

    输出

    1. tiered-storage-file-system-2.5.1.nar
    2. tiered-storage-jcloud-2.5.1.nar

    Note

    • 如果在一个裸机集群中运行 Pulsar,请确保offloaders安装包已经在 broker 的所有 Pulsar 目录下解压缩。

    • 如果在 Docker 中运行 Pulsar 或使用 Docker 镜像(如 K8S 和 DCOS)部署 Pulsar,则可以使用 apachepulsar/pulsar-all 镜像。 apachepulsar/pulsar-all 镜像已经绑定了分层存储 offloader。

Configuration

Note

在将数据从 BookKeeper 卸载到 filesystem 之前,你需要配置 filesystem offloader 驱动程序的一些属性。

此外,你还可以配置 filesystem offloader 的自动触发和手动触发机制。

配置 filesystem offloader driver

您可以在 broker.confstandalone.conf 配置文件中配置 filesystem offloader 驱动程序。

HDFS

NFS

  • 以下为必选 配置。

    参数| 描述| 示例值 |—-|—-|—- managedLedgerOffloadDriver | Offloader 驱动程序名称,不区分大小写。 | filesystem fileSystemURI | 连接地址,访问默认 Hadoop 分布式文件系统的 URI。 | hdfs://127.0.0.1:9000 offloadersDirectory | Offloader directory | offloaders fileSystemProfilePath | Hadoop profile path. The configuration file is stored in the Hadoop profile path. 它包含了一些 Hadoop 性能调整的配置。 | ../conf/filesystem_offload_core_site.xml

  • 以下为可选 配置。

    参数| 描述| 示例值 |—-|—-|—- managedLedgerMinLedgerRolloverTimeMinutes| topic 的 ledger 切换的最短时间。

    Note: it is not recommended to set this parameter in the production environment.|2 offloadersDirectory | Offloader directory | offloaders managedLedgerMaxEntriesPerLedger|Maximum number of entries to append to a ledger before triggering a rollover.

    Note: it is not recommended to set this parameter in the production environment.|5000

  • 以下为必选 配置。

    参数| 描述| 示例值 |—-|—-|—- managedLedgerOffloadDriver | Offloader 驱动程序名称,不区分大小写。 | filesystem fileSystemProfilePath | NFS profile path. The configuration file is stored in the NFS profile path. 它包含用于性能调整的各种设置。 | ../conf/filesystem_offload_core_site.xml

  • 以下为可选 配置。

    参数| 描述| 示例值 |—-|—-|—- managedLedgerMinLedgerRolloverTimeMinutes| topic 的 ledger 切换的最短时间。

    注意: 不建议在生产环境中设置此参数。|2 managedLedgerMaxEntriesPerLedger|在触发滚动之前添加到 ledger 的最大条目数。

    注意: 不建议在生产环境中设置此参数。|5000

自动运行 filesystem offloader

您可以将命名空间策略配置为在达到阈值后自动卸载数据。 该阈值基于一个 topic 在 Pulsar 集群的数据存储大小而定。 一旦 topic 到达阈值,就自动触发卸载操作。

阈值|动作 |—-|—- | > 0 | 如果 topic 存储达到其阈值,它就会触发卸载操作。 = 0| 它使 broker 竭尽全力地 offload 数据。 < 0 | 它将禁用自动卸载操作。

Automatic offload runs when a new segment is added to a topic log. 如果在命名空间上设置了阈值,但向该 topic 生产的消息很少,filesystem offloader 将会停止工作,直至当前 segment 达到饱和。

通过命令行工具(CLI)可以设置阈值大小,比如 pulsar-admin。

示例

该示例使用 pulsar-admin 将 filesystem offloader 的阈值大小设置为 10 MB。

  1. pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace

提示

关于 pulsar-admin namespaces set-offload-threshold options 命令的更多信息,包括标志、描述、默认值和速记方法,见这里

手动运行 filesystem offloader

对于每一个 topic,使用下列方法之一可以手动触发 filesystem offloader:

  • 使用 REST 端点。

  • 使用命令行工具(例如 pulsar-admin)。

要想用命令行工具触发 filesystem offloader,需要指定一个 topic 应该保留在 Pulsar 集群中的最大数据量(阈值)。 如果 Pulsar 集群上 topic 的数据大小超过了这个阈值,那么该 topic 下的 segment 就会被陆续卸载到 filesystem 上,直到不再超过这个阈值。 优先卸载旧的 segment。

示例

  • 此示例使用 pulsar-admin 手动运行 filesystem offloader。

    1. pulsar-admin topics offload --size-threshold 10M persistent://my-tenant/my-namespace/topic1

    输出

    1. Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1

    提示

    关于pulsar-admin topics offload-status options命令的更多信息,包括标志、描述、默认值和速记方法,见这里

  • 该示例使用 pulsar-admin 检查 filesystem offloader 的状态。

    1. pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1

    输出

    1. Offload is currently running

    要等待 filesystem 完成任务,请添加 -w 参数。

    1. pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1

    输出

    1. Offload was a success

    如果在卸载操作中出现错误,该错误会将会在 pulsar-admin topics offload-status 命令中抛出。

    1. pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1

    输出

    1. Error in offload
    2. null
    3. Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=

    提示

    关于pulsar-admin topics offload-status options命令的更多信息,包括标志、描述、默认值和速记方法,见这里

教程

本节提供有关如何使用 filesystem offloader 将数据从 Pulsar 移动到 Hadoop 分布式文件系统(HDFS)或网络文件系统(NFS)的分步说明。

HDFS

NFS

要将数据从 Pulsar 移动到 HDFS,请按照以下步骤进行。

第 1 步:准备 HDFS 环境

本教程使用 Hadoop 3.2.1设置一个 Hadoop 单个节点集群。

提示

有关如何设置 Hadoop 单节点集群的详细信息,请参见此处

  1. 下载并解压缩 Hadoop 3.2.1。

    1. wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz tar -zxvf hadoop-3.2.1.tar.gz -C $HADOOP_HOME
  2. 配置 Hadoop。

    1. # $HADOOP_HOME/etc/hadoop/core-site.xml<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property></configuration># $HADOOP_HOME/etc/hadoop/hdfs-site.xml<configuration> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>
  3. 设置无密码 ssh。

    1. # 现在检查无需密码是否可以使用 ssh 连接到 localhost:$ ssh localhost# 如果在无密码的情况下无法 ssh 连接到 localhost,请执行以下命令$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys$ chmod 0600 ~/.ssh/authorized_keys
  4. 启动 HDFS。

    1. # 不要重复执行该命令,重复执行会导致数据节点的 clusterId 与 namenode 不一致$HADOOP_HOME/bin/hadoop namenode -format$HADOOP_HOME/sbin/start-dfs.sh
  5. 导航到 HDFS 网站

    您可以查看 概述 页面。

    Filesystem offloader - 图1

    1. 在顶部导航栏,单击 数据节点 查看数据节点信息。

      Filesystem offloader - 图2

    2. 单击 HTTP 地址 获取更多关于 localhost:9866 的详细信息。

      如下文所示, 所用容量 的大小是 4 KB,它是初始值。

      Filesystem offloader - 图3

第 2 步: 安装 filesystem offloader

更多详细信息,请参阅 安装

第 3 步: 配置 filesystem offloader

正如 配置 部分所指明的那样,需要先配置 filesystem offloader 驱动程序的一些属性,然后才能使用。 本教程假设您已按以下方式配置 filesystem offloader 驱动程序,并在单机(Standalone)模式下运行 Pulsar。

conf/standalone.conf 文件中设置以下配置。

  1. managedLedgerOffloadDriver=filesystemfileSystemURI=hdfs://127.0.0.1:9000fileSystemProfilePath=../conf/filesystem_offload_core_site.xml

Note

为测试目的,您可以设置以下两个配置来加速 ledger 切换, 但不建议在生产环境中设置它们。

  1. managedLedgerMinLedgerRolloverTimeMinutes=1managedLedgerMaxEntriesPerLedger=100

Note

在本节中,假定您已启用 NFS 服务并设置 NFS 服务的共享路径。 在本节中, /Users/test 被用作 NFS 服务的共享路径。

要将数据卸载到 NFS,请执行以下步骤。

第 1 步: 安装 filesystem offloader

更多详细信息,请参阅 安装

第 2 步:将 NFS 置于本地文件系统

此示例将挂载 /Users/pulsar_nfs/Users/test

  1. mount -e 192.168.0.103:/Users/test/Users/pulsar_nfs

第 3 步: 配置 filesystem offloader 驱动程序

正如 配置 部分所指明的那样,需要先配置 filesystem offloader 驱动程序的一些属性,然后才能使用。 本教程假设您已按以下方式配置 filesystem offloader 驱动程序,并在单机(Standalone)模式下运行 Pulsar。

  1. conf/standalone.conf 文件中设置以下配置。

    1. managedLedgerOffloadDriver=filesystemfileSystemProfilePath=../conf/filesystem_offload_core_site.xml
  2. 修改 filesystem_offload_core_site.xml,如下所示。

    1. <property> <name>fs.defaultFS</name> <value>file:///</value></property><property> <name>hadoop.tmp.dir</name> <value>file:///Users/pulsar_nfs</value></property><property> <name>io.file.buffer.size</name> <value>4096</value></property><property> <name>io.seqfile.compress.blocksize</name> <value>1000000</value></property><property> <name>io.seqfile.compression.type</name> <value>BLOCK</value></property><property> <name>io.map.index.interval</name> <value>128</value></property>

第 4 步:将数据从 BookKeeper 卸载到文件系统

在下载 Pulsar tar 包的仓库中执行以下命令。 例如, ~/path/to/apache-pulsar-2.5.1

  1. 启动 Pulsar 单机模式。

    1. bin/pulsar standalone -a 127.0.0.1
  2. 为了确保生成的数据不会立即被删除,建议设置 保留策略, 可以是 大小 限制或 时间 限制。 为保留策略设定的值越大,数据保留的时间越长。

    1. bin/pulsar-admin namespaces set-retention public/default --size 100M --time 2d

    提示

    关于 pulsarctl namespaces set-retention options 命令的更多信息,包括参数、描述、默认值和速记方法,参见这里

  3. 使用 pulsar-client 生成数据。

    1. bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
  4. 卸载操作在触发 ledger 切换后开始。 为了确保卸载数据成功,建议您等待下,直到触发多个 ledger 切换。 在这种情况下,可能需要等待一会。 您可以使用 pulsarctl 检查 ledger 状态。

    1. bin/pulsar-admin topics stats-internal public/default/fs-test

    输出

    The data of the ledger 696 is not offloaded.

    1. {
    2. "version": 1,
    3. "creationDate": "2020-06-16T21:46:25.807+08:00",
    4. "modificationDate": "2020-06-16T21:46:25.821+08:00",
    5. "ledgers": [
    6. {
    7. "ledgerId": 696,
    8. "isOffloaded": false
    9. }
    10. ],
    11. "cursors": {}
    12. }
  5. 请稍等,然后向 topic 发送更多消息。

    1. bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
  6. 使用 pulsarctl 检查 ledger 状态。

    1. bin/pulsar-admin topics stats-internal public/default/fs-test

    输出

    Ledger 696 的数据未卸载。

    1. {
    2. "version": 2,
    3. "creationDate": "2020-06-16T21:46:25.807+08:00",
    4. "modificationDate": "2020-06-16T21:48:52.288+08:00",
    5. "ledgers": [
    6. {
    7. "ledgerId": 696,
    8. "entries": 1001,
    9. "size": 81695,
    10. "isOffloaded": false
    11. },
    12. {
    13. "ledgerId": 697,
    14. "isOffloaded": false
    15. }
    16. ],
    17. "cursors": {}
    18. }
  7. 使用 pulsarctl 手动触发卸载操作。

    1. bin/pulsar-admin topics offload -s 0 public/default/fs-test

    输出

    ledger 中的数据先于 ledge 697 已卸载。

    1. # offload info, the ledgers before 697 will be offloaded
    2. Offload triggered for persistent://public/default/fs-test3 for messages before 697:0:-1
  8. 使用 pulsarctl 检查 ledger 状态。

    1. bin/pulsar-admin topics stats-internal public/default/fs-test

    输出

    ledger 696 的数据已卸载。

    1. {
    2. "version": 4,
    3. "creationDate": "2020-06-16T21:46:25.807+08:00",
    4. "modificationDate": "2020-06-16T21:52:13.25+08:00",
    5. "ledgers": [
    6. {
    7. "ledgerId": 696,
    8. "entries": 1001,
    9. "size": 81695,
    10. "isOffloaded": true
    11. },
    12. {
    13. "ledgerId": 697,
    14. "isOffloaded": false
    15. }
    16. ],
    17. "cursors": {}
    18. }

    并且 所用容量 已经从 4 KB 变成 116.46 KB。

    Filesystem offloader - 图4