Data Sources and Fields

Rules in EMQX can process data from various data sources, including MQTT Messages, MQTT Events, or Data Bridges.

As discussed in the Rule Engine Syntax section, you can use the FROM clause to specify the data source and the corresponding fields can be referenced in the SELECT and where clauses. This section will introduce the fields for MQTT Messages, MQTT Events, and Data Bridges.

MQTT Message

You can use EMQX rules to handle message publishing, in this case, you need to specify the message topics with the FROM clause.

For example, in the following statement, you will select fields payload.msg (then rename as msg with the AS clause), clientid, username, payload, topic, and qos for any message published to topics following the pattern t/#.

Example:

  1. SELECT
  2. payload.msg as msg,
  3. clientid,
  4. username,
  5. payload,
  6. topic,
  7. qos
  8. FROM
  9. "t/#"

Output:

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "qos": 1,
  5. "payload": "{\"msg\":\"hello\"}",
  6. "msg": "hello",
  7. "clientid": "c_emqx"
  8. }

Refer to the table below for fields that can be selected from the received MQTT messages:

FieldExplanation
idMQTT message ID
clientidClient ID of the publisher
usernameUsername of the publisher
payloadMQTT payload
peerhostClient IP Address
topicMQTT topic
qosQoS level
flagsFlags
headersInternal data related to the message processing
pub_propsPUBLISH Properties (MQTT 5.0 clients only)
timestampTimestamp (unit: ms)
publish_received_atTime when PUBLISH message reaches EMQX (unit: ms)
nodeNode where the event is triggered

MQTT Events

You can use EMQX rules to handle event notifications, for example, client online and offline, client subscriptions, etc. In this case, you can use the FROM clause to specify the event topics. The event topic starts with "$events/", such as "$events/client_connected".

TIP

By default, clients are unable to subscribe directly to MQTT event messages. This tutorial will guide you through the process of using rules to subscribe to these messages. To enable direct subscriptions for MQTT event messages with your clients, you must first add the following configuration items to the emqx.conf file. After saving the modifications, restart EMQX to apply the new settings.

  1. sys_topics {
  2. sys_event_messages {
  3. client_connected = true,
  4. client_disconnected = true,
  5. client_subscribed = true,
  6. client_unsubscribed = true
  7. }
  8. }

See the table below for the supported event topic list.

Event Topic List

Event topic nameExplanation
$events/message_deliveredMessage delivery
$events/message_ackedMessage acknowledged
$events/message_droppedMessage dropped when routing
$events/delivery_droppedMessage dropped when delivering
$events/client_connectedConnection complete
$events/client_disconnectedDisconnect
$events/client_connackConnection acknowledged
$events/client_check_authz_completeAuthorization check complete
$events/session_subscribedSubscribe
$events/session_unsubscribedUnsubscribe

Message Delivery Event (“$events/message_delivered”)

This event topic can be used to trigger a rule when a message is delivered to a client.

For example, to extract data from the "$events/message_delivered" event topic that includes the following data fields, ID and username of the publisher, message topic, message QoS, EMQX node with the event triggered, and the time when the event was triggered, you can use the statement below:

Example:

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. topic,
  5. qos,
  6. node,
  7. timestamp
  8. FROM
  9. "$events/message_delivered"

Output:

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645002753259,
  4. "qos": 1,
  5. "node": "emqx@127.0.0.1",
  6. "from_username": "u_emqx_1",
  7. "from_clientid": "c_emqx_1"
  8. }

Below are detailed explanations of each field.

CodeExplanation
idMQTT message ID
from_clientidClient ID of the publisher
from_usernameUsername of the publisher
clientidClient ID of the subscriber
usernameUsername of the subscriber
payloadMQTT payload
peerhostClient IP address
topicMQTT topic
qosQoS level
flagsFlags
pub_propsPUBLISH Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
publish_received_atTime when PUBLISH message reaches EMQX
(unit: ms)
nodeEMQX node where the event triggered

Message Acknowledged Event (“$events/message_acked”)

This event topic can be used to trigger a rule when the message delivery is acknowledged.

TIP

Only availabe for QOS 1 and QOS 2 messages.

For example, to extract data from the "$events/message_acked" event topic that includes the following data fields, ID and username of the publisher, message topic, message QoS, EMQX node where the event triggered, and the time when the event was triggered, you can use the statement below:

Example:

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. topic,
  5. qos,
  6. node,
  7. timestamp
  8. FROM
  9. "$events/message_acked"

Output:

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645002965664,
  4. "qos": 1,
  5. "node": "emqx@127.0.0.1",
  6. "from_username": "u_emqx_1",
  7. "from_clientid": "c_emqx_1"
  8. }

Below are detailed explanations of each field.

CodeExplanation
idMQTT message ID
from_clientidClient ID of the publisher
from_usernameUsername of the publisher
clientidClient ID of the subscriber
usernameUsername of the subscriber
payloadMQTT payload
peerhostClient IPAddress
topicMQTT topic
qosQoS levels
flagsFlags
pub_propsPUBLISH Properties (MQTT 5.0 only)
puback_propsPUBACK Properties (MQTT 5.0 only)
timestampEvent trigger time (in milliseconds)
publish_received_atTime when PUBLISH message reaches EMQX (unit: ms)
nodeEMQX node where the event triggered

Message Dropped When Routing Event (“$events/message_dropped”)

This event topic can be used to trigger a rule when a message is dropped during routing.

For example, to extract data from the "$events/message_dropped" event topic that includes the following data fields: drop reason, message topic, message QoS, EMQX node where the event triggered, and the time when the event was triggered, you can use the statement below:

Example:

  1. SELECT
  2. reason,
  3. topic,
  4. qos,
  5. node,
  6. timestamp
  7. FROM
  8. "$events/message_dropped"

Output:

  1. {
  2. "topic": "t/a",
  3. "timestamp": 1645003103004,
  4. "reason": "no_subscribers",
  5. "qos": 1,
  6. "node": "emqx@127.0.0.1"
  7. }
FieldExplanation
idMQTT message ID
reasonDropping reasons:

no_subscribers: No clients subscribe to the topic

receive_maximum_exceeded: awaiting_rel queue is full

packet_identifier_inuse: Receive a QoS 2 message with unreleased packet ID
clientidClient ID of the publisher
usernameUsername of the publisher
payloadMQTT payload
peerhostClient IP Address
topicMQTT topic
qosQoS levels
flagsFlags
pub_propsPUBLISH Properties (MQTT 5.0 only)
timestampEvent trigger time (unit: ms)
publish_received_atTime when PUBLISH message reaches EMQX (unit: ms)
nodeNode where the event is triggered

Message Dropped When Delivering Event (“$events/delivery_dropped”)

This event topic can be used to trigger a rule when a message is dropped during delivering.

For example, to extract data from the "$events/delivery_dropped" event topic that includes the following data fields: publisher ID and username, drop reason, message topic and QoS, you can use the statement below:

Example:

  1. SELECT
  2. from_clientid,
  3. from_username,
  4. reason,
  5. topic,
  6. qos
  7. FROM "$events/delivery_dropped"

Output:

  1. {
  2. "topic": "t/a",
  3. "reason": "queue_full",
  4. "qos": 1,
  5. "from_username": "u_emqx_1",
  6. "from_clientid": "c_emqx_1"
  7. }

Below are detailed explanations of each field.

Explanation
idMQTT message ID
reasonDropping reasons:

queue_full: Message (QoS>0) queue is full.

no_local: Clients are not allowed to receive messages published by themselves.

expired: Message or the Session expired.

qos0_msg: Message (QoS 0) queue is full.
from_clientidClient ID of the publisher
from_usernameUsername of the publisher
clientidClient ID of the subscriber
usernameUsername of the subscriber
payloadMQTT payload
peerhostClient IP Address
topicMQTT topic
qosMessage QoS
flagsFlags
pub_propsPUBLISH Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
publish_received_atTime when PUBLISH message reaches EMQX (unit: ms)
nodeEMQX node where the event is triggered

Connection Complete Event (“$events/client_connected”)

This event topic can be used to trigger a rule when a client is connected successfully.

For example, to extract data from the "$events/client_connected" event topic that includes the following data fields: client ID and username, keepalive interval, and whether the connected MQTT client is acting as a bridge, you can use the statement below:

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. keepalive,
  5. is_bridge
  6. FROM
  7. "$events/client_connected"

Output:

  1. {
  2. "username": "u_emqx",
  3. "keepalive": 60,
  4. "is_bridge": false,
  5. "clientid": "c_emqx"
  6. }

Refer to the table below for fields that can be selected from the received MQTT messages:

FieldExplanation
clientidClient ID
usernameClient username
mountpointMountpoint for bridging messages
peernameIP address and port of terminal
socknameIP Address and Port listened by EMQX
proto_nameProtocol name
proto_verProtocol version
keepaliveMQTT keepalive interval
clean_startMQTT clean_start
expiry_intervalMQTT session expiration time
is_bridgeWhether the client is acting as a bridge
connected_atClient connection completion time (unit: ms)
conn_propsCONNECT Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
nodeEMQX node where the event is triggered

Disconnect Event (“$events/client_disconnected”)

This event topic can be used to trigger a rule when a client is disconnected.

For example, you can use the statement below to extract data from the "$events/client_disconnected" event topic that includes the following data fields: client ID, username, disconnect reason, disconnect time, and EMQX node where the event is triggered.

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. reason,
  5. disconnected_at,
  6. node
  7. FROM
  8. "$events/client_disconnected"

Output:

  1. {
  2. "username": "u_emqx",
  3. "reason": "normal",
  4. "node": "emqx@127.0.0.1",
  5. "disconnected_at": 1645003578536,
  6. "clientid": "c_emqx"
  7. }
FieldExplanation
reasonDisconnect reasons

normal: The client is intentionally disconnected

kicked: EMQX has forcibly removed the client through REST API

keepalive_timeout: The specified keepalive time period expired.

not_authorized: Authorization failed.

tcp_closed: The peer has closed network connection.

discarded: Another client ( with clean_start set to true) connected with the same ClientID, causing the previous connection to be dropped.

takenover: Another client ( with clean_start set to false) connected with the same ClientID, taking over the previous connection..

internal_error: An error has occurred due to an improperly formatted message or other unknown issues.
clientidClient ID
usernameClient username
peernameIP Address and Port number
socknameIP Address and Port number listened by EMQX
disconnected_atClient disconnection completion time (unit: ms)
disconn_propsDISCONNECT Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
nodeEMQX node where the event is triggered

Connection Acknowlege Event (“$events/client_connack”)

This event topic can be used to trigger a rule when the EMQX sends a CONNACK packet to the client.

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. reason_code,
  5. node
  6. FROM
  7. "$events/client_connack"

Output:

  1. {
  2. "username": "u_emqx",
  3. "reason_code": "success",
  4. "node": "emqx@127.0.0.1",
  5. "connected_at": 1645003578536,
  6. "clientid": "c_emqx"
  7. }

Refer to the table below for fields that can be extracted:

FieldExplanation
reason_codeReason code*
clientidClient ID of the publisher
usernameUsername of the publisher
peernameIP and port
socknameIP and port listened by EMQX
proto_nameProtocol name
proto_verProtocol version
keepaliveMQTT keepalive interval
clean_startMQTT clean_start
expiry_intervalMQTT session expiration time
conn_propsCONNECT Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
nodeEMQX node where the alarm is triggered.

[^*]: The MQTT v5.0 protocol renames the return code to a reason code, adding a reason code to indicate more types of errors (Reason code and ACK - MQTT 5.0 new featuresData Sources and Fields - 图1 (opens new window)).

Here is the reason code for MQTT v3.1.1 and MQTT v5.0.

Reason CodeDescription
connection_acceptedConnection accepted
unacceptable_protocol_versionEMQX does not support the MQTT protocol requested by the client.
client_identifier_not_validThe client ID is not allowed by EMQX.
server_unavaliableNetwork connection has been established, but MQTT service is unavailable.
malformed_username_or_passwordUsername or password is in the wrong data format.
unauthorized_clientClient connection is not authorized.
Reason CodeDescription
successConnection successful
unspecified_errorUnknown error
malformed_packetPacket malformed
protocol_errorProtocol issue
implementation_specific_errorSpecific implementation error
unsupported_protocol_versionUnsupported protocol version
client_identifier_not_validInvalid client ID
bad_username_or_passwordInvalid username/password
not_authorizedUnauthorized
server_unavailableServer unavailable
server_busyServer busy
bannedBanned
bad_authentication_methodInvalid authentication method
topic_name_invalidInvalid topic name
packet_too_largePacket too big
quota_exceededQuota exceeded
retain_not_supportedRetain message feature unsupported
qos_not_supportedUnsupported QoS level
use_another_serverUse different broker
server_movedBroker relocated
connection_rate_exceededConnection rate limit reached

Authorization Check Complete Event (“$events/client_check_authz_complete”)

This event topic can be used to trigger a rule when the authorization check for the client is complete.

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. action,
  6. result,
  7. authz_source,
  8. node
  9. FROM
  10. "$events/client_check_authz_complete"

Output:

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "action": "publish",
  5. "result": "allow",
  6. "authz_source": "cache",
  7. "node": "emqx@127.0.0.1",
  8. "clientid": "c_emqx"
  9. }

Refer to the table below for fields that can be extracted.

FieldExplanation
clientidClient ID
usernameUsername
peerhostClient IP Address
topicMQTT topic
actionPublish or subscribe action
resultAccess control check result
authz_sourceThe authorization source
timestampTimestamp (unit: ms)
nodeEMQX node where the event is triggered.

Subscriber Event (“$events/session_subscribed”)

This event topic can be used to trigger a rule when the client subscribes successfully.

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. qos
  6. FROM
  7. "$events/session_subscribed"

Output:

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "qos": 1,
  5. "clientid": "c_emqx"
  6. }

Refer to the table below for fields that can be extracted.

FieldExplanation
clientidClient ID
usernameClient username
peerhostClient IP Address
topicMQTT topic
qosQoS levels
sub_propsSUBSCRIBE Properties (MQTT 5.0 client only)
timestampEvent trigger time (unit: ms)
nodeEMQX node where the event is triggered

Unsubcribe Event (“$events/session_unsubscribed”)

The rule is triggered when the terminal subscription is cancelled successfully.

Example:

  1. SELECT
  2. clientid,
  3. username,
  4. topic,
  5. qos
  6. FROM
  7. "$events/session_unsubscribed"

Output:

  1. {
  2. "username": "u_emqx",
  3. "topic": "t/a",
  4. "qos": 1,
  5. "clientid": "c_emqx"
  6. }

Refer to the table below for fields that can be extracted.

FieldExplanation
clientidClient ID
usernameClient username
peerhostClient IP Address
topicMQTT topic
qosQoS levels
unsub_propsUNSUBSCRIBE Properties (MQTT 5.0 clients only)
timestampEvent trigger time (unit: ms)
nodeEMQX node where the event is triggered

Data Bridges

Rules use topics prefixed by $bridges/ to present messages or events triggered by a data bridge. The format is:

$bridges/<type>:<name>

Where

  • <type>:<name> is the bridge Id,
  • <type> is the bridge type,
  • <name> is the bridge name.

For example, the MQTT Bridge events can be referred to in the format of "$bridges/mqtt:*. To set a rule for all messages sent by the MQTT data bridge named my_mqtt_bridge, you can use the statement below:

Example:

  1. SELECT
  2. *
  3. FROM
  4. "$bridges/mqtt:my_mqtt_bridge"

Output:

  1. {
  2. "id": "0005E27C1D24E44FF440000017520000",
  3. "server": "broker.emqx.io:1883",
  4. "payload": "hello",
  5. "topic": "t/a",
  6. "qos": 1,
  7. "dup": false,
  8. "retain": false,
  9. "pub_props": {
  10. "Message-Expiry-Interval": 30,
  11. "Payload-Format-Indicator": 0,
  12. "User-Property": {
  13. "foo": "bar"
  14. },
  15. "User-Property-Pairs": [
  16. {
  17. "key": "foo"
  18. },
  19. {
  20. "value": "bar"
  21. }
  22. ]
  23. },
  24. "message_received_at": 1645002753259,
  25. }

For each field in the returned output:

FieldExplanation
idMQTT message ID
serverServer name of the remote MQTT broker, such as “broker.emqx.io:1883”
payloadMQTT payload
topicMQTT topic
qosMQTT QoS
dupMQTT DUP flag
retainMQTT Retain flag
pub_propsPUBLISH Properties (MQTT 5.0 clients only)
message_received_atTimestamp when the message is received (unit: ms)