下载和安装

通过 https://github.com/emqx/kuiper/releases下载安装快速入门 - 图1 (opens new window)https://www.emqx.cn/downloads#kuiper下载安装快速入门 - 图2 (opens new window) 获取安装包.

zip、tar.gz 压缩包

解压缩 kuiper

  1. $ unzip kuiper-$VERISON-$OS-$ARCH.zip
  2. or
  3. $ tar -xzf kuiper-$VERISON-$OS-$ARCH.zip

运行 bin/kuiperd 以启动 kuiper 服务器

  1. $ bin/kuiperd

您应该会看到一条成功的消息:Serving Rule server on port 20498

kuiper 的目录结构如下:

  1. kuiper_installed_dir
  2. bin
  3. kuiperd
  4. kuiper
  5. etc
  6. mqtt_source.yaml
  7. ...
  8. data
  9. ...
  10. plugins
  11. ...
  12. log
  13. ...

deb、rpm 安装包

使用相关命令安装 kuiper

  1. $ sudo dpkg -i kuiper_$VERSION_$ARCH.deb
  2. or
  3. $ sudo rpm -ivh kuiper-$VERSION-1.el7.rpm

运行 kuiperd 以启动 kuiper 服务器

  1. $ sudo kuiperd

您应该会看到一条成功的消息:Serving Rule server on port 20498

kuiper 也支持 systemctl 启动

  1. $ sudo systemctl start kuiper

kuiper 的目录结构如下:

  1. /usr/lib/kuiper/bin
  2. kuiperd
  3. kuiper
  4. /etc/kuiper
  5. mqtt_source.yaml
  6. ...
  7. /var/lib/kuiper/data
  8. ...
  9. /var/lib/kuiper/plugins
  10. ...
  11. /var/log/kuiper
  12. ...

运行的第一个规则流

Kuiper 规则由一个 SQL 和多个操作组成。 Kuiper SQL 是一种易于使用的类 SQL 语言,用于确定规则流的逻辑。 通过命令行提供规则,规则流将在规则引擎中创建并连续运行。用户之后可以通过命令行管理规则。

Kuiper 具有许多用于复杂分析的内置函数和扩展,您可以访问 Kuiper SQL 参考获取有关语法和其功能的更多信息。

让我们考虑一个示例场景:我们正在通过 MQTT 服务从传感器接收温度和湿度记录,并且当温度在一个时间窗口中大于30摄氏度时,我们希望发出警报。 我们可以使用以下几个步骤为上述场景编写 Kuiper 规则。

先决条件

我们假设已经有一个 MQTT 消息服务器作为 Kuiper 服务器的数据源。 如果您没有,建议使用 EMQ X。 请按照 EMQ Broker 安装指南下载安装快速入门 - 图3 (opens new window)设置 MQTT 消息服务器。

定义输入流

流需要具有一个名称和一个架构,以定义每个传入事件应包含的数据。 对于这种情况,我们将使用 MQTT 源应对温度事件。 输入流可以通过 SQL 语言定义。

我们创建一个名为 demo 的流,该流使用 DATASOURCE 属性中指定的 MQTT demo 主题。

  1. $ bin/kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo")'

MQTT 源将通过tcp://localhost:1883连接到 MQTT 消息服务器,如果您的 MQTT 消息服务器位于别的位置,请在etc/mqtt_source.yaml中进行指定。 您可以通过修改如下配置文件来更改配置。

  1. default:
  2. qos: 1
  3. sharedsubscription: true
  4. servers: [tcp://127.0.0.1:1883]

您可以使用kuiper show streams 命令来查看是否创建了 demo 流。

通过查询工具测试流

现在已经创建了流,可以通过 kuiper query 命令对其进行测试。键入kuiper query后,显示 kuiper提示符。

  1. $ bin/kuiper query
  2. kuiper >

kuiper提示符下,您可以键入 SQL 并根据流验证 SQL。

  1. kuiper > select count(*), avg(humidity) as avg_hum, max(humidity) as max_hum from demo where temperature > 30 group by TUMBLINGWINDOW(ss, 5);
  2. query is submit successfully.

现在,如果有任何数据发布到位于tcp://127.0.0.1:1883的 MQTT 服务器,那么它打印如下消息。

  1. kuiper > [{"avg_hum":41,"count":4,"max_hum":91}]
  2. [{"avg_hum":62,"count":5,"max_hum":96}]
  3. [{"avg_hum":36,"count":3,"max_hum":63}]
  4. [{"avg_hum":48,"count":3,"max_hum":71}]
  5. [{"avg_hum":40,"count":3,"max_hum":69}]
  6. [{"avg_hum":44,"count":4,"max_hum":57}]
  7. [{"avg_hum":42,"count":3,"max_hum":74}]
  8. [{"avg_hum":53,"count":3,"max_hum":81}]
  9. ...

您可以按 ctrl + c 键中断查询,如果检测到客户端与查询断开连接,服务器将终止流传输。 以下是服务器上的日志打印。

  1. ...
  2. time="2019-09-09T21:46:54+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
  3. time="2019-09-09T21:46:54+08:00" level=info msg="stop the query."
  4. ...

编写规则

作为规则的一部分,我们需要指定以下内容:

  • 规则名称:规则的 ID。 它必须是唯一的
  • sql:针对规则运行的查询
  • 动作:规则的输出动作

我们可以运行 kuiper rule 命令来创建规则并在文件中指定规则定义

  1. $ bin/kuiper create rule ruleDemo -f myRule

myRule文件的内容。 对于在1分钟内滚动时间窗口中的平均温度大于30的事件,它将打印到日志中。

  1. {
  2. "sql": "SELECT temperature from demo where temperature > 30",
  3. "actions": [{
  4. "log": {}
  5. }]
  6. }

您应该在流日志中看到一条成功的消息rule ruleDemo created。 现在,规则已经建立并开始运行。

测试规则

现在,规则引擎已准备就绪,可以接收来自 MQTT demo 主题的事件。 要对其进行测试,只需使用 MQTT 客户端将消息发布到 demo 主题即可。 该消息应为 json 格式,如下所示:

  1. {"temperature":31.2, "humidity": 77}

检查位于“ log/stream.log”的流日志,您会看到已过滤的数据被打印出来。 另外,如果您发送以下消息,则它不符合 SQL 条件,并且该消息将被过滤。

  1. {"temperature":29, "humidity": 80}

管理规则

您可以使用命令行暂停规则一段时间,然后重新启动规则和其他管理工作。 规则名称是规则的标识符。 查看规则管理 CLI 以了解详细信息

  1. $ bin/kuiper stop rule ruleDemo

请参考以下主题,以获取有关使用 Kuiper 的指导。