Redis 数据存储

配置文件: emqx_backend_redis.conf

配置 Redis 服务器

支持配置多台 Redis 服务器连接池:

  1. ## Redis 服务集群类型: single | sentinel | cluster
  2. backend.redis.pool1.type = single
  3. ## Redis 服务器地址列表
  4. backend.redis.pool1.server = 127.0.0.1:6379
  5. ## Redis sentinel 模式下的 sentinel 名称
  6. ## backend.redis.pool1.sentinel = mymaster
  7. ## Redis 连接池大小
  8. backend.redis.pool1.pool_size = 8
  9. ## Redis 数据库名称
  10. backend.redis.pool1.database = 0
  11. ## Redis 密码
  12. ## backend.redis.pool1.password =
  13. ## 订阅的 Redis channel 名称
  14. backend.redis.pool1.channel = mqtt_channel

配置 Redis 存储规则

  1. backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  2. backend.redis.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  3. backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  4. backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
  5. backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
  6. backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  7. backend.redis.hook.session.unsubscribed.1= {"topic": "#", "action": {"commands": ["DEL mqtt:acked:${clientid}:${topic}"]}, "pool": "pool1"}
  8. backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "expired_time" : 3600, "pool": "pool1"}
  9. backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "expired_time" : 3600, "pool": "pool1"}
  10. backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  11. backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
  12. backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
  13. ## backend.redis.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_keep_latest"}, "pool": "pool1"}
  14. ## backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_store_keep_latest"}, "expired_time" : 3600, "pool": "pool1"}
  15. ## backend.redis.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_keep_latest"}, "pool": "pool1"}

Redis 存储规则说明

hooktopicaction/function说明
client.connectedon_client_connected存储客户端在线状态
session.createdon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribedqueue/#on_message_fetch_for_queue获取一对一离线消息
session.subscribedpubsub/#on_message_fetch_for_pubsub获取一对多离线消息
session.subscribed#on_retain_lookup获取 retain 消息
session.unsubscribed#删除 acked 消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储 retain 消息
message.publish#on_retain_delete删除 retain 消息
message.ackedqueue/#on_message_acked_for_queue一对一消息 ACK 处理
message.ackedpubsub/#on_message_acked_for_pubsub一对多消息 ACK 处理

Redis 命令行参数说明

hook可用参数示例(每个字段分隔,必须是一个空格)
client.connectedclientidSET conn:${clientid} ${clientid}
client.disconnectedclientidSET disconn:${clientid} ${clientid}
session.subscribedclientid, topic, qosHSET sub:${clientid} ${topic} ${qos}
session.unsubscribedclientid, topicSET unsub:${clientid} ${topic}
message.publishmessage, msgid, topic, payload, qos, clientidRPUSH pub:${topic} ${msgid}
message.ackedmsgid, topic, clientidHSET ack:${clientid} ${topic} ${msgid}
message.delivermsgid, topic, clientidHSET deliver:${clientid} ${topic} ${msgid}

Redis 命令行配置 Action

Redis 存储支持用户采用 Redis Commands 语句配置 Action,例如:

  1. ## 在客户端连接到 EMQ X 服务器后,执行一条 redis
  2. backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} ${clientid}"]}, "pool": "pool1"}

Redis 设备在线状态 Hash

mqtt:client Hash 存储设备在线状态:

  1. hmset
  2. key = mqtt:client:${clientid}
  3. value = {state:int, online_at:timestamp, offline_at:timestamp}
  4. hset
  5. key = mqtt:node:${node}
  6. field = ${clientid}
  7. value = ${ts}

查询设备在线状态:

  1. HGETALL "mqtt:client:${clientId}"

例如 ClientId 为 test 客户端上线:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "1"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "undefined"

例如 ClientId 为 test 客户端下线:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "0"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "1481685924"

Redis 保留消息 Hash

mqtt:retain Hash 存储 Retain 消息:

  1. hmset
  2. key = mqtt:retain:${topic}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

查询 retain 消息:

  1. HGETALL "mqtt:retain:${topic}"

例如查看 topic 为 topic 的 retain 消息:

  1. HGETALL mqtt:retain:topic
  2. 1) "id"
  3. > - 2) "6P9NLcJ65VXBbC22sYb4"
  4. >
  5. > 3) "from"
  6. >
  7. > - 4) "test"
  8. >
  9. > 5) "qos"
  10. > 6) "1"
  11. > 7) "topic"
  12. > 8) "topic"
  13. > 9) "retain"
  14. >
  15. > - 10\) "true"
  16. >
  17. > 11) "payload"
  18. > 12) "Hello world\!"
  19. > 13) "ts"
  20. > 14) "1481690659"

Redis 消息存储 Hash

mqtt:msg Hash 存储 MQTT 消息:

  1. hmset
  2. key = mqtt:msg:${msgid}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
  4. zadd
  5. key = mqtt:msg:${topic}
  6. field = 1
  7. value = ${msgid}

Redis 消息确认 SET

mqtt:acked SET 存储客户端消息确认:

  1. set
  2. key = mqtt:acked:${clientid}:${topic}
  3. value = ${msgid}

Redis 订阅存储 Hash

mqtt:sub Hash 存储订阅关系:

  1. hset
  2. key = mqtt:sub:${clientid}
  3. field = ${topic}
  4. value = ${qos}

某个客户端订阅主题:

  1. HSET mqtt:sub:${clientid} ${topic} ${qos}

例如为 ClientId 为 test 的客户端订阅主题 topic1, topic2 :

  1. HSET "mqtt:sub:test" "topic1" 1
  2. HSET "mqtt:sub:test" "topic2" 2

查询 ClientId 为 test 的客户端已订阅主题:

  1. HGETALL mqtt:sub:test
  2. 1) "topic1"
  3. 2) "1"
  4. 3) "topic2"
  5. 4) "2"

Redis SUB/UNSUB 事件发布

设备需要订阅/取消订阅主题时,业务服务器向 Redis 发布事件消息:

  1. PUBLISH
  2. channel = "mqtt_channel"
  3. message = {type: string , topic: string, clientid: string, qos: int}
  4. \*type: [subscribe/unsubscribe]

例如 ClientId 为 test 客户端订阅主题 topic0:

  1. PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"

例如 ClientId 为 test 客户端取消订阅主题:

  1. PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"

WARNING

Redis Cluster 无法使用 Redis PUB/SUB 功能。

启用 Redis 数据存储插件

  1. ./bin/emqx_ctl plugins load emqx_backend_redis