SQL 数据源和字段

规则的 SQL 语句可以处理的数据源有:MQTT 消息、MQTT 事件,或者是数据桥接。

SQL 语句使用 FROM 来指定数据源,在 SELECTWHERE 子句中可以引用相应的字段。 数据源类型不同,可以使用的字段也不同。

数据桥接

规则使用 $bridges/ 开头的主题来表示数据桥接的消息或事件。 格式为:$bridges/<type>:<name>

其中 <type>:<name> 部分是数据桥接的 ID,<type> 是数据桥接的类型,<name> 是数据桥接的名字。 比如 $bridges/mqtt:my_mqtt_bridge

MQTT 桥接事件 (“$bridges/mqtt:*“)

当该 MQTT 桥接从远程 MQTT Broker 接收到消息时触发规则

字段解释
idMQTT 消息 ID
server远程 MQTT Broker 的地址,例如 “broker.emqx.io:1883”
payloadMQTT 消息体
topicMQTT 主题
qosMQTT 消息的 QoS
dupMQTT 消息的 DUP Flag
retainMQTT 消息的 Retain Flag
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
message_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)

示例

  1. SELECT
  2. *
  3. FROM
  4. "$bridges/mqtt:my_mqtt_bridge"

输出:

  1. {
  2. "id": "0005E27C1D24E44FF440000017520000",
  3. "server": "broker.emqx.io:1883",
  4. "payload": "hello",
  5. "topic": "t/a",
  6. "qos": 1,
  7. "dup": false,
  8. "retain": false,
  9. "pub_props": {
  10. "Message-Expiry-Interval": 30,
  11. "Payload-Format-Indicator": 0,
  12. "User-Property": {
  13. "foo": "bar"
  14. },
  15. "User-Property-Pairs": [
  16. {
  17. "key": "foo"
  18. },
  19. {
  20. "value": "bar"
  21. }
  22. ]
  23. },
  24. "message_received_at": 1645002753259,
  25. }

MQTT 消息

规则的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题, 当任何消息发布到指定的主题时都会触发该规则。

字段解释
idMQTT 消息 ID
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. *
  3. FROM
  4. "t/#"

输出

  1. {
  2. "clientid": "c_emqx",
  3. "event": "message.publish",
  4. "event_type": "message_publish",
  5. "flags": {},
  6. "id": "0005E27C1D24E44FF440000017520000",
  7. "metadata": {
  8. "rule_id": "sql_tester:099ddfa9c466d1ca"
  9. },
  10. "node": "emqx@127.0.0.1",
  11. "payload": "abc",
  12. "peerhost": "192.168.0.10",
  13. "pub_props": {
  14. "Message-Expiry-Interval": 30,
  15. "Payload-Format-Indicator": 0,
  16. "User-Property": {
  17. "foo": "bar"
  18. },
  19. "User-Property-Pairs": [
  20. {
  21. "key": "foo"
  22. },
  23. {
  24. "value": "bar"
  25. }
  26. ]
  27. },
  28. "publish_received_at": 1656397576334,
  29. "qos": 1,
  30. "timestamp": 1656397576334,
  31. "topic": "t/a",
  32. "username": "u_emqx"
  33. }

MQTT 事件

规则的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

事件消息的主题以 "$events/" 开头,比如 "$events/client_connected", "$events/session_subscribed"。 如果想让 emqx 将事件消息发布出来,可以在 emqx_rule_engine.conf 文件中配置。

FROM 子句可用的事件主题

事件主题名释义
$events/message_delivered消息投递
$events/message_acked消息确认
$events/message_dropped消息在转发的过程中被丢弃
$events/delivery_dropped消息在投递的过程中被丢弃
$events/client_connected连接完成
$events/client_disconnected连接断开
$events/client_connack连接确认
$events/client_check_authz_complete鉴权完成
$events/session_subscribed订阅
$events/session_unsubscribed取消订阅

消息投递事件 (“$events/message_delivered”)

当消息被放入底层socket时触发规则。

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. topic,
  5. qos,
  6. node,
  7. timestamp
  8. FROM
  9. "$events/message_delivered"

输出

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645002753259,
  4. "qos": 1,
  5. "node": "emqx@127.0.0.1",
  6. "from_username": "u_emqx_1",
  7. "from_clientid": "c_emqx_1"
  8. }

消息确认事件 (“$events/message_acked”)

当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发。

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. topic,
  5. qos,
  6. node,
  7. timestamp
  8. FROM
  9. "$events/message_acked"

输出

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645002965664,
  4. "qos": 1,
  5. "node": "emqx@127.0.0.1",
  6. "from_username": "u_emqx_1",
  7. "from_clientid": "c_emqx_1"
  8. }

消息在转发的过程中被丢弃事件 (“$events/message_dropped”)

当一条消息无任何订阅者时触发规则。

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
no_subscribers:没有订阅者
receive_maximum_exceeded: awaiting_rel 队列已满
packet_identifier_inuse: 消息 ID 已被使用
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. reason,
  3. topic,
  4. qos,
  5. node,
  6. timestamp
  7. FROM
  8. "$events/message_dropped"

输出

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645003103004,
  4. "reason": "no_subscribers",
  5. "qos": 1,
  6. "node": "emqx@127.0.0.1"
  7. }

消息在投递的过程中被丢弃事件 (“$events/delivery_dropped”)

当订阅者的消息队列已满时触发规则。

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
queue_full:消息队列已满(QoS>0)
no_local:不允许客户端接收自己发布的消息
expired:消息或者会话过期
qos0_msg:QoS 0 的消息因为消息队列已满被丢弃
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. reason,
  5. topic,
  6. qos
  7. FROM "$events/delivery_dropped"

输出

  1. {
  2. "topic": "t/a",
  3. "reason": "queue_full",
  4. "qos": 1,
  5. "from_username": "u_emqx_1",
  6. "from_clientid": "c_emqx_1"
  7. }

终端连接成功事件 (“$events/client_connected”)

当终端连接成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
mountpoint主题挂载点(主题前缀)
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
is_bridge是否为 MQTT bridge 连接
connected_at终端连接完成时间 (单位:毫秒)
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. clientid,
  3. username,
  4. keepalive,
  5. is_bridge
  6. FROM
  7. "$events/client_connected"

输出

  1. {
  2. "username": "u_emqx",
  3. "keepalive": 60,
  4. "is_bridge": false,
  5. "clientid": "c_emqx"
  6. }

终端连接断开事件 (“$events/client_disconnected”)

当终端连接断开时触发规则。

字段解释
reason终端连接断开原因:
normal:客户端主动断开
kicked:服务端踢出,通过 REST API
keepalive_timeout:keepalive 超时
not_authorized:认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
tcp_closed:对端关闭了网络连接
discarded: another client connected with the same ClientID and set clean_start = true
takenover: another client connected with the same ClientID and set clean_start = false
internal_error:畸形报文或其他未知错误
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
disconnected_at终端连接断开时间 (单位:毫秒)
disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. clientid,
  3. username,
  4. reason,
  5. disconnected_at,
  6. node
  7. FROM
  8. "$events/client_disconnected"

输出

  1. {
  2. "username": "u_emqx",
  3. "reason": "normal",
  4. "node": "emqx@127.0.0.1",
  5. "disconnected_at": 1645003578536,
  6. "clientid": "c_emqx"
  7. }

连接确认事件 (“$events/client_connack”)

当服务端向客户端发送CONNACK报文时触发规则,reason_code 包含各种错误原因代码。

字段解释
reason_code各种原因代码
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

MQTT v5.0 协议将返回码重命名为原因码,增加了一个原因码来指示更多类型的错误(Reason code and ACK - MQTT 5.0 new featuresSQL 数据源和字段 - 图1 (opens new window))。 因此reason_code 在MQTT v3.1.1与MQTT v5.0中有很大的不同。

MQTT v3.1.1

reason_code描述
connection_accepted已接受连接
unacceptable_protocol_version服务器不支持客户端请求的 MQTT 协议
client_identifier_not_valid客户端 ID 是正确的 UTF-8 字符串,但服务器不允许
server_unavaliable网络连接已建立,但 MQTT 服务不可用
malformed_username_or_password用户名或密码中的数据格式错误
unauthorized_client客户端连接未授权

MQTT v5.0

reason_code描述
success连接成功
unspecified_error未指定的错误
malformed_packet畸形数据包
protocol_error协议错误
implementation_specific_error实现特定错误
unsupported_protocol_version不支持的协议版本
client_identifier_not_valid客户端标识符无效
bad_username_or_password错误的用户名或密码
not_authorized未经授权
server_unavailable服务器无法使用
server_busy服务器繁忙
banned禁止访问
bad_authentication_method错误的身份验证方法
topic_name_invalid主题名称无效
packet_too_large数据包太大
quota_exceeded超出配额
retain_not_supported不支持的retain
qos_not_supported不支持的qos
use_another_server使用另一台服务器
server_moved服务器迁移了
connection_rate_exceeded超出连接速率

示例

  1. SELECT
  2. clientid,
  3. username,
  4. reason_code,
  5. node
  6. FROM
  7. "$events/client_connack"

输出

  1. {
  2. "username": "u_emqx",
  3. "reason_code": "success",
  4. "node": "emqx@127.0.0.1",
  5. "connected_at": 1645003578536,
  6. "clientid": "c_emqx"
  7. }

鉴权完成事件 (“$events/client_check_authz_complete”)

当客户端鉴权结束时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
actionpublish or subscribe,发布或者订阅事件
resultallow or deny,鉴权完成
authz_source认证源
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. action,
  6. result,
  7. authz_source,
  8. node
  9. FROM
  10. "$events/client_check_authz_complete"

输出

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "action": "publish",
  5. "result": "allow",
  6. "authz_source": "cache",
  7. "node": "emqx@127.0.0.1",
  8. "clientid": "c_emqx"
  9. }

终端订阅成功事件 (“$events/session_subscribed”)

当终端订阅成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
sub_propsSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. qos
  6. FROM
  7. "$events/session_subscribed"

输出

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "qos": 1,
  5. "clientid": "c_emqx"
  6. }

取消终端订阅成功事件 (“$events/session_unsubscribed”)

当取消终端订阅成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. qos
  6. FROM
  7. "$events/session_unsubscribed"

输出

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "qos": 1,
  5. "clientid": "c_emqx"
  6. }