Redis Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Redis to setup Save data to Redis in rule engine.

Config file: emqx_backend_redis.conf

Configure the Redis Server

Config Connection Pool of Multiple Redis Servers:

  1. ## Redis Server
  2. backend.redis.pool1.server = 127.0.0.1:6379
  3. ## Redis Sentinel
  4. ## backend.redis.server = 127.0.0.1:26378
  5. ##Redis Sentinel Cluster name
  6. ## backend.redis.sentinel = mymaster
  7. ## Redis Pool Size
  8. backend.redis.pool1.pool_size = 8
  9. ## Redis database
  10. backend.redis.pool1.database = 1
  11. ## Redis subscribe channel
  12. backend.redis.pool1.channel = mqtt_channel

Configure Persistence Hooks

  1. ## Expired after seconds, if =< 0 take the default value
  2. backend.redis.msg.expired_after = 3600
  3. ## Client Connected Record
  4. backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  5. ## Subscribe Lookup Record
  6. backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  7. ## Client DisConnected Record
  8. backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  9. ## Lookup Unread Message for one QOS > 0
  10. backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
  11. ## Lookup Unread Message for many QOS > 0
  12. backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
  13. ## Lookup Retain Message
  14. backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  15. ## Store Publish Message QOS > 0
  16. backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  17. ## Store Retain Message
  18. backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  19. ## Delete Retain Message
  20. backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  21. ## Store Ack for one
  22. backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
  23. ## Store Ack for many
  24. backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}

Description of Persistence Hooks

hooktopicaction/functionDescription
client.connectedon_client_connectedStore client connected state
client.connectedon_subscribe_lookupSubscribe to topics
client.disconnectedon_client_disconnectedStore the client disconnected state
session.subscribedqueue/#on_message_fetch_for_queueFetch one to one offline message
session.subscribedpubsub/#on_message_fetch_for_pubsubFetch one to many offline message
session.subscribed#on_retain_lookupLookup retained message
message.publish#on_message_publishStore the published messages
message.publish#on_message_retainStore retained messages
message.publish#on_retain_deleteDelete retained messages
message.ackedqueue/#on_message_acked_for_queueProcess ACK of one to one messages
message.ackedpubsub/#on_message_acked_for_pubsubProcess ACK of one to many messages

Redis Command Line Parameters

hookParameterExample (Fields separated exactly by one space)
client.connectedclientidSET conn:${clientid} clientid
client.disconnectedclientidSET disconn:${clientid} clientid
session.subscribedclientid, topic, qosHSET sub:${clientid} topic qos
session.unsubscribedclientid, topicSET unsub:${clientid} topic
message.publishmessage, msgid, topic, payload, qos, clientidRPUSH pub:${topic} msgid
message.ackedmsgid, topic, clientidHSET ack:${clientid} topic msgid
message.deliveredmsgid, topic, clientidHSET delivered:${clientid} topic msgid

Configure ‘action’ with Redis Commands

Redis backend supports raw ‘commands’ in ‘action’, e.g.:

  1. ## After a client connected to the EMQX server, it executes a redis command (multiple redis commands also supported)
  2. backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}

Using Redis Hash for Devices’ Connection State

mqtt:client Hash for devices’ connection state:

  1. hmset
  2. key = mqtt:client:${clientid}
  3. value = {state:int, online_at:timestamp, offline_at:timestamp}
  4. hset
  5. key = mqtt:node:${node}
  6. field = ${clientid}
  7. value = ${ts}

Lookup devices’ connection state:

  1. HGETALL "mqtt:client:${clientId}"

E.g.: Client with ClientId ‘test’ goes online:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "1"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "undefined"

Client with ClientId ‘test’ goes offline:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "0"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "1481685924"

Using Redis Hash for Retained Messages

mqtt:retain Hash for retained messages:

  1. hmset
  2. key = mqtt:retain:${topic}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

Lookup retained message:

  1. HGETALL "mqtt:retain:${topic}"

Lookup retained messages with a topic of ‘retain’:

  1. HGETALL mqtt:retain:topic
  2. 1) "id"
  3. > - 2) "6P9NLcJ65VXBbC22sYb4"
  4. > 3) "from"
  5. > - 4) "test"
  6. > 5) "qos"
  7. > 6) "1"
  8. > 7) "topic"
  9. > 8) "topic"
  10. > 9) "retain"
  11. > - 10\) "true"
  12. > 11) "payload"
  13. > 12) "Hello world\!"
  14. > 13) "ts"
  15. > 14) "1481690659"

Using Redis Hash for messages

mqtt:msg Hash for MQTT messages:

  1. hmset
  2. key = mqtt:msg:${msgid}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
  4. zadd
  5. key = mqtt:msg:${topic}
  6. field = 1
  7. value = ${msgid}

Using Redis Set for Message Acknowledgements

mqtt:acked SET stores acknowledgements from the clients:

  1. set
  2. key = mqtt:acked:${clientid}:${topic}
  3. value = ${msgid}

Using Redis Hash for Subscription

mqtt:sub Hash for Subscriptions:

  1. hset
  2. key = mqtt:sub:${clientid}
  3. field = ${topic}
  4. value = ${qos}

A client subscribes to a topic:

  1. HSET mqtt:sub:${clientid} ${topic} ${qos}

A client with ClientId of ‘test’ subscribes to topic1 and topic2:

  1. HSET "mqtt:sub:test" "topic1" 1
  2. HSET "mqtt:sub:test" "topic2" 2

Lookup the subscribed topics of client with ClientId of ‘test’:

  1. HGETALL mqtt:sub:test
  2. 1) "topic1"
  3. 2) "1"
  4. 3) "topic2"
  5. 4) "2"

Redis SUB/UNSUB Publish

When a device subscribes / unsubscribes to topics, EMQX broker publish an event to the Redis:

  1. PUBLISH
  2. channel = "mqtt_channel"
  3. message = {type: string , topic: string, clientid: string, qos: int}
  4. \*type: [subscribe/unsubscribe]

client with ClientID ‘test’ subscribe to ‘topic0’:

  1. PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"

Client with ClientId ‘test’ unsubscribes to ‘test_topic0’:

  1. PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"

Enable Redis Backend

  1. ./bin/emqx_ctl plugins load emqx_backend_redis