SQL 手册

兼容提示: EMQ X v4.0 对规则引擎 SQL 语法做出较大调整,v3.x 升级用户请参照迁移指南进行适配。

SQL 语句

SQL 语句用于从原始数据中,根据条件筛选出字段,并进行预处理和转换,基本格式为:

  1. SELECT <字段名> FROM <触发事件> [WHERE <条件>]
  1. 注意事项
  2. FROM 子句后面的主题名需要用双引号(“”) 引起来。
  3. WHERE 子句后面接筛选条件,如果使用到字符串需要用单引号 (‘’) 引起来。
  4. SELECT 子句中,若使用 “.” 符号对 payload 进行嵌套选择,必须保证 payload 为 JSON 格式。

SQL 语句示例

  1. 从 topic 为 “t/a” 的消息中提取所有字段:

    1. SELECT * FROM "t/a"
  1. 从 topic 能够匹配到 ‘t/#’ 的消息中提取所有字段。注意这里使用了 ‘=~’ 操作符进行带通配符的 topic 匹配

    1. SELECT * FROM "t/#"
  2. 从 topic 能够匹配到 ‘t/#’ 的消息中提取 qos,username 和 clientid 字段

    1. SELECT qos, username, clientid FROM "t/#"
  3. 从任意 topic 的消息中提取 username 字段,并且筛选条件为 username = ‘u_emqx’

    1. SELECT username FROM "#" WHERE username='u_emqx'
  4. 从任意 topic 的消息的消息体(payload) 中提取 x 字段,并创建别名 x 以便在 WHERE 子句中使用。WHERE 子句限定条件为 x = 1。注意 payload 必须为 JSON 格式。举例:此 SQL 语句可以匹配到消息体 {"x": 1}, 但不能匹配到消息体 {"x": 2}

    1. SELECT payload as p, p.x as x FROM "#" WHERE x=1
  5. 类似于上面的 SQL 语句,但嵌套地提取消息体中的数据,此 SQL 语句可以匹配到消息体 {"x": {"y": 1}}

    1. SELECT payload as p, p.x.y as a FROM "#" WHERE a=1
  6. 在 clientid = ‘c1’ 尝试连接时,提取其来源 IP 地址和端口号

    1. SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
  7. 筛选所有订阅 ‘t/#’ 主题且订阅级别为 QoS1 的 clientid。注意这里用的是严格相等操作符 ‘=’,所以不会匹配主题为 ‘t’ 或 ‘t/+/a’ 的订阅请求

    1. SELECT clientid FROM "$events/session_subscribe" WHERE topic = 't/#' and qos = 1

事件主题

Message Publish ($events/message_publish)

字段类型示例值说明
idstringMQTT message id
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
payloadstring{“msg”: “hello”}消息内容, 如果是 JSON 格式将自动解码, 在 SQL 中使用 payload.x 获取对象信息
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
flagsstringflags
headersstringheaders
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Message Delivered ($events/message_delivered)

字段类型示例值说明
eventstringdisconnect触发事件名称
idstringMQTT message id
from_clientidstringfrom_clientid
from_usernamestringfrom_username
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
payloadstring{“msg”: “hello”}消息内容, 如果是 JSON 格式将自动解码, 在 SQL 中使用 payload.x 获取对象信息
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
flagsstringflags
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Message Acked ($events/message_acked)

字段类型示例值说明
eventstringdisconnect触发事件名称
idstringMQTT message id
from_clientidstringfrom_clientid
from_usernamestringfrom_username
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
payloadstring{“msg”: “hello”}消息内容, 如果是 JSON 格式将自动解码, 在 SQL 中使用 payload.x 获取对象信息
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
flagsstringflags
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Message Dropped ($events/message_dropped)

字段类型示例值说明
eventstringdisconnect触发事件名称
idstringMQTT message id
reasonstringreason
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
payloadstring{“msg”: “hello”}消息内容, 如果是 JSON 格式将自动解码, 在 SQL 中使用 payload.x 获取对象信息
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
flagsstringflags
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Client Connected ($events/client_connected)

字段类型示例值说明
eventstringdisconnect触发事件名称
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
mountpointstringundefined挂载点, 使用于桥接消息
peernamestring127.0.0.1:63412客户端网络地址
socknamestringsockname
proto_namestringproto_name
proto_verstring4当前协议版本
keepaliveinteger60当前客户端 keepalive
clean_startbooleanfalseClean Start
expiry_intervalstringexpiry_interval
is_bridgestring
connected_atinteger1576549961086连接毫秒级时间戳
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Client Disconnected ($events/client_disconnected)

字段类型示例值说明
eventstringdisconnect触发事件名称
reasonstringreason
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
peernamestring127.0.0.1:63412客户端网络地址
socknamestringsockname
disconnected_atstringdisconnected_at
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Session Subscribed ($events/session_subscribed)

字段类型示例值说明
eventstringdisconnect触发事件名称
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称

Session Unsubscribed ($events/session_unsubscribed)

字段类型示例值说明
eventstringdisconnect触发事件名称
clientidstringclientid
usernamestringu_emqx当前客户端 MQTT username
peerhoststringpeerhost
topicstringt/a当前 MQTT 主题, SQL 中可以使用通配符进行筛选.
Subscribe 与 Unsubscribe 请求中包含多个主题时, 这里只会获取到第一个, 如需获取全部请使用 topic_filters
qosinteger1消息 QoS 0,1,2 中枚举
timestampinteger1576549961086当前毫秒级时间戳
nodestringemqx@127.0.0.1触发事件的节点名称