Hands On! 101

本文通过非常具体且简单的步骤来学习流处理器的工作原理。为了简单起见,我们使用了一个自定义 Docker 镜像,其中包含测试需要的相关组件。

Requirements

以下教程需要以下软件组件:

  • Fluent Bit >= v1.2.0
  • Docker Engine (如果您的系统中已经安装了 Fluent Bit 二进制文件,则此项是非必须的)

另外,下载以下数据样本文件(130KB):

在命令行进行流处理

接下来的所有步骤,我们将从命令行运行 Fluent Bit,为简单起见,我们将使用官方的 Docker 镜像。

1. 查看 Fluent Bit 版本

  1. $ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
  2. Fluent Bit v1.4.0

2. 解析样例日志文件

样本文件包含 JSON 记录。在此命令中,我们将添加解析器配置文件,并指示 tail 输入插件将内容解析为 json:

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.4 \
  3. /fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf \
  4. -i tail -p path=/sp-samples-1k.log \
  5. -p parser=json \
  6. -o stdout -f 1

上面的命令将把已解析的内容打印到标准输出。内容将打印与每个记录关联的 Tag 以及具有两个字段的数组: 记录时间戳和记录的键值对映射

  1. Fluent Bit v1.4.0
  2. * Copyright (C) 2019-2020 The Fluent Bit Authors
  3. * Copyright (C) 2015-2018 Treasure Data
  4. * Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
  5. * https://fluentbit.io
  6. [2019/05/08 13:34:16] [ info] [storage] initializing...
  7. [2019/05/08 13:34:16] [ info] [storage] in-memory
  8. [2019/05/08 13:34:16] [ info] [storage] normal synchronization mode, checksum disabled
  9. [2019/05/08 13:34:16] [ info] [engine] started (pid=1)
  10. [2019/05/08 13:34:16] [ info] [sp] stream processor started
  11. [0] tail.0: [1557322456.315513208, {"date"=>"22/abr/2019:12:43:51 -0600", "ip"=>"73.113.230.135", "word"=>"balsamine", "country"=>"Japan", "flag"=>false, "num"=>96}]
  12. [1] tail.0: [1557322456.315525280, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"242.212.128.227", "word"=>"inappendiculate", "country"=>"Chile", "flag"=>false, "num"=>15}]
  13. [2] tail.0: [1557322456.315532364, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"85.61.182.212", "word"=>"elicits", "country"=>"Argentina", "flag"=>true, "num"=>73}]
  14. [3] tail.0: [1557322456.315538969, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"124.192.66.23", "word"=>"Dwan", "country"=>"Germany", "flag"=>false, "num"=>67}]
  15. [4] tail.0: [1557322456.315545150, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"18.135.244.142", "word"=>"chesil", "country"=>"Argentina", "flag"=>true, "num"=>19}]
  16. [5] tail.0: [1557322456.315550927, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"132.113.203.169", "word"=>"fendered", "country"=>"United States", "flag"=>true, "num"=>53}]

到目前为止,还没有流处理,在步骤 #3 中,我们将开始进行一些基本查询。

3. 选择带有指定键的记录

如下命令通过 -T 选项引入一个流处理器(SP)查询,并将输出插件更改为 null,其目的是在标准输出接口中获得 流处理结果,避免混淆。

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.2 \
  3. /fluent-bit/bin/fluent-bit \
  4. -R /fluent-bit/etc/parsers.conf \
  5. -i tail \
  6. -p path=/sp-samples-1k.log \
  7. -p parser=json \
  8. -T "SELECT word, num FROM STREAM:tail.0 WHERE country='Chile';" \
  9. -o null -f 1

上面的查询旨在查询 country 键的值与 Chile 匹配的所有记录,并且对于每个匹配记录,仅使用 wordnum 键字段组成一条记录并输出:

  1. [0] [1557322913.263534, {"word"=>"Candide", "num"=>94}]
  2. [0] [1557322913.263581, {"word"=>"delightfulness", "num"=>99}]
  3. [0] [1557322913.263607, {"word"=>"effulges", "num"=>63}]
  4. [0] [1557322913.263690, {"word"=>"febres", "num"=>21}]
  5. [0] [1557322913.263706, {"word"=>"decasyllables", "num"=>76}]

4. 计算平均值

以下查询与上一步中的查询类似,但这次我们将使用 AVG() 的聚合函数来获取输入记录的平均值:

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.2 \
  3. /fluent-bit/bin/fluent-bit \
  4. -R /fluent-bit/etc/parsers.conf \
  5. -i tail \
  6. -p path=/sp-samples-1k.log \
  7. -p parser=json \
  8. -T "SELECT AVG(num) FROM STREAM:tail.0 WHERE country='Chile';" \
  9. -o null -f 1

输出:

  1. [0] [1557323573.940149, {"AVG(num)"=>61.230770}]
  2. [0] [1557323573.941890, {"AVG(num)"=>47.842106}]
  3. [0] [1557323573.943544, {"AVG(num)"=>40.647060}]
  4. [0] [1557323573.945086, {"AVG(num)"=>56.812500}]
  5. [0] [1557323573.945130, {"AVG(num)"=>99.000000}]

为什么我们得到了多个记录?答案是: 当 Fluent Bit 处理数据时,记录以数据块的方式输入,流处理器对数据块进行处理,输入插件提取了 5 个记录块,流处理器会独立地处理每个块的查询。要一次处理多个块,我们必须在窗口时间内对结果进行分组。

5. 对结果分组和窗口

结果分组旨在简化数据处理,且在定义的时间范围内使用时,我们可以完成很多事情。如下查询示例按 country 关键字将结果分组,并计算 num 值的平均值,处理窗口时间为 1 秒意味着: 处理 1 秒窗口时间内传入的所有数据块:

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.2 \
  3. /fluent-bit/bin/fluent-bit \
  4. -R /fluent-bit/etc/parsers.conf \
  5. -i tail \
  6. -p path=/sp-samples-1k.log \
  7. -p parser=json \
  8. -T "SELECT country, AVG(num) FROM STREAM:tail.0 \
  9. WINDOW TUMBLING (1 SECOND) \
  10. WHERE country='Chile' \
  11. GROUP BY country;" \
  12. -o null -f 1

输出:

  1. [0] [1557324239.003211, {"country"=>"Chile", "AVG(num)"=>53.164558}]

6. 接收流处理结果作为新的数据流

现在,我们看到了一个更真实的用例。将数据流处理结果发送到标准输出对于学习很有帮助,现在我们将指示流处理器将结果作为输入,成为 Fluent Bit 数据管道的一部分,并在其上附加一个 Tag。

可以使用 CREATE STREAM 语句完成此操作,该语句还将使用 sp-results 作为结果的标签。请注意,现在输出插件为 stdout,且带有匹配所有带有 sp-results 标签记录的参数:

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.2 \
  3. /fluent-bit/bin/fluent-bit \
  4. -R /fluent-bit/etc/parsers.conf \
  5. -i tail \
  6. -p path=/sp-samples-1k.log \
  7. -p parser=json \
  8. -T "CREATE STREAM results WITH (tag='sp-results') \
  9. AS \
  10. SELECT country, AVG(num) FROM STREAM:tail.0 \
  11. WINDOW TUMBLING (1 SECOND) \
  12. WHERE country='Chile' \
  13. GROUP BY country;" \
  14. -o stdout -m 'sp-results' -f 1

输出:

  1. [0] sp-results: [1557325032.000160100, {"country"=>"Chile", "AVG(num)"=>53.164558}]

F.A.Q

Where STREAM name comes from?

Fluent Bit 具有流的概念,且每个输入插件实例都有一个默认名称。您可以通过设置别名来覆盖默认名称。注意以下示例中 alias 参数和新的 stream 名称。

  1. $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
  2. fluent/fluent-bit:1.4 \
  3. /fluent-bit/bin/fluent-bit \
  4. -R /fluent-bit/etc/parsers.conf \
  5. -i tail \
  6. -p path=/sp-samples-1k.log \
  7. -p parser=json \
  8. -p alias=samples \
  9. -T "CREATE STREAM results WITH (tag='sp-results') \
  10. AS \
  11. SELECT country, AVG(num) FROM STREAM:samples \
  12. WINDOW TUMBLING (1 SECOND) \
  13. WHERE country='Chile' \
  14. GROUP BY country;" \
  15. -o stdout -m 'sp-results' -f 1