MQTT动作

该操作用于将输出消息发布到 MQTT 服务器中。

属性名称 是否可选 说明
server MQTT 服务器地址,例如 tcp://127.0.0.1:1883
topic MQTT 主题,例如 analysis/result , 也可设置为动态属性,例如 $.col, 将会把结果中的 col 列的值作为主题
clientId MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid
protocolVersion MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。
qos 消息转发的服务质量
username 连接用户名
password 连接密码
certificationPath 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 /var/kuiper 中运行 bin/kuiperd ,那么父目录为 /var/kuiper; 如果运行从 /var/kuiper/bin 中运行./kuiperd,那么父目录为 /var/kuiper/bin
privateKeyPath 私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 certificationPath 类似。
rootCaPath 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 certificationPath 类似。
insecureSkipVerify 如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于TLS连接。
retained 如果 retained 设置为 true,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 false
connectionSelector 重用到 MQTT Broker 的连接,详细信息,请参考

以下为使用 SAS 连接到 Azure IoT Hub 的样例。

  1. {
  2. "mqtt": {
  3. "server": "ssl://xyz.azure-devices.net:8883",
  4. "topic": "devices/demo_001/messages/events/",
  5. "protocolVersion": "3.1.1",
  6. "qos": 1,
  7. "clientId": "demo_001",
  8. "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
  9. "password": "SharedAccessSignature sr=*******************",
  10. "retained": false
  11. }
  12. }

以下为使用证书和私钥连接到 AWS IoT的另一个样例。

  1. {
  2. "mqtt": {
  3. "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
  4. "topic": "devices/result",
  5. "qos": 1,
  6. "clientId": "demo_001",
  7. "certificationPath": "keys/d3807d9fa5-certificate.pem",
  8. "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
  9. "retained": false
  10. }
  11. }

动态主题

若结果数据中包含主题内容,可以将其作为主题属性,从而实现动态主题的需求。假设 SQL 选出的数据包含 mytopic, 则可以使用数据模板的语法将其设置为 topic 属性的值,如下所示:

  1. {
  2. "mqtt": {
  3. "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
  4. "topic": "{{.mytopic}}",
  5. "qos": 1,
  6. "clientId": "demo_001",
  7. "certificationPath": "keys/d3807d9fa5-certificate.pem",
  8. "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
  9. "retained": false
  10. }
  11. }