Cassandra 消息存储

配置 Cassandra 服务器

配置文件: emqx_backend_cassa.conf

支持配置多台Cassandra服务器连接池:

  1. ## Cassandra 节点地址
  2. backend.ecql.pool1.nodes = 127.0.0.1:9042
  3. ## Cassandra 连接池大小
  4. backend.ecql.pool1.size = 8
  5. ## Cassandra 自动重连间隔(s)
  6. backend.ecql.pool1.auto_reconnect = 1
  7. ## Cassandra 认证用户名/密码
  8. backend.ecql.pool1.username = cassandra
  9. backend.ecql.pool1.password = cassandra
  10. ## Cassandra Keyspace
  11. backend.ecql.pool1.keyspace = mqtt
  12. ## Cassandra Logger type
  13. backend.ecql.pool1.logger = info
  14. ##--------------------------------------------------------------------
  15. ## Cassandra Backend Hooks
  16. ##--------------------------------------------------------------------
  17. ## Client Connected Record
  18. backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  19. ## Subscribe Lookup Record
  20. backend.cassa.hook.session.created.1 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}
  21. ## Client DisConnected Record
  22. backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  23. ## Lookup Unread Message QOS > 0
  24. backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
  25. ## Lookup Retain Message
  26. backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  27. ## Store Publish Message QOS > 0
  28. backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  29. ## Delete Acked Record
  30. backend.cassa.hook.session.unsubscribed.1= {"topic": "#", action": {"cql": ["delete from acked where client_id = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
  31. ## Store Retain Message
  32. backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  33. ## Delete Retain Message
  34. backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  35. ## Store Ack
  36. backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  37. ## 获取离线消息
  38. ## "offline_opts": 获取离线消息的配置
  39. ## - max_returned_count: 单次拉去的最大离线消息数目
  40. ## - time_range: 仅拉去在当前时间范围的消息
  41. ## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
  42. ## 如果需要存储 Qos0 消息, 可开启以下配置
  43. ## 警告: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次
  44. ## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

backend 消息存储规则包括:

hooktopicaction说明
client.connectedon_client_connected存储客户端在线状态
session.createdon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribed#on_message_fetch获取离线消息
session.subscribed#on_retain_lookup获取retain消息
session.unsubscribed#删除 akced 消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储retain消息
message.publish#on_retain_delete删除retain消息
message.acked#on_message_acked消息ACK处理

自定义 CQL 语句 可用参数包括:

hook可用参数示例(cql语句中${name} 表示可获取的参数)
client.connectedclientidinsert into conn(clientid) values(${clientid})
client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivermsgid, topic, clientidinsert into deliver(msgid, topic) values(${msgid}, ${topic})

支持 CQL 语句配置:

考虑到用户的需求不同, backend cassandra 自带的函数无法满足用户需求, 用户可根据自己的需求配置 cql 语句

在 etc/plugins/emqx_backend_cassa.conf 中添加如下配置:

  1. ## 在客户端连接到 EMQ X 服务器后,执行一条 cql 语句(支持多条 cql 语句)
  2. backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Cassandra 创建一个 Keyspace

  1. CREATE KEYSPACE mqtt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
  2. USR mqtt;

导入 Cassandra 表结构

  1. cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"

Cassandra 用户状态表(Client Table)

mqtt.client 存储设备在线状态:

  1. CREATE TABLE mqtt.client (
  2. client_id text PRIMARY KEY,
  3. connected timestamp,
  4. disconnected timestamp,
  5. node text,
  6. state int
  7. );

查询设备在线状态:

  1. select * from mqtt.client where client_id = ${clientid};

例如 ClientId 为 test 的客户端上线:

  1. select * from mqtt.client where client_id = 'test';
  2. client_id | connected | disconnected | node | state
  3. -----------+---------------------------------+---------------+-----------------+-------
  4. test | 2017-02-14 08:27:29.872000+0000 | null | emqx@127.0.0.1| 1

例如ClientId为test客户端下线:

  1. select * from mqtt.client where client_id = 'test';
  2. client_id | connected | disconnected | node | state
  3. -----------+---------------------------------+---------------------------------+-----------------+-------
  4. test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0

Cassandra 用户订阅主题表(Sub Table)

mqtt.sub 存储订阅关系:

  1. CREATE TABLE mqtt.sub (
  2. client_id text,
  3. topic text,
  4. qos int,
  5. PRIMARY KEY (client_id, topic)
  6. );

用户test分别订阅主题test_topic1 test_topic2:

  1. insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic1', 1);
  2. insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic2', 2);

某个客户端订阅主题:

  1. select * from mqtt_sub where clientid = ${clientid};

查询ClientId为’test’的客户端已订阅主题:

  1. select * from mqtt_sub where clientid = 'test';
  2. client_id | topic | qos
  3. -----------+-------------+------
  4. test | test_topic1 | 1
  5. test | test_topic2 | 2

Cassandra 发布消息表(Msg Table)

mqtt.msg 存储MQTT消息:

  1. CREATE TABLE mqtt.msg (
  2. topic text,
  3. msgid text,
  4. arrived timestamp,
  5. payload text,
  6. qos int,
  7. retain int,
  8. sender text,
  9. PRIMARY KEY (topic, msgid)
  10. ) WITH CLUSTERING ORDER BY (msgid DESC);

查询某个客户端发布的消息:

  1. select * from mqtt_msg where sender = ${clientid};

查询ClientId为’test’的客户端发布的消息:

  1. select * from mqtt_msg where sender = 'test';
  2. topic | msgid | arrived | payload | qos | retain | sender
  3. -------+----------------------+---------------------------------+--------------+-----+--------+--------
  4. hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
  5. world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test

Cassandra 保留消息表(Retain Message Table)

mqtt.retain 存储 Retain 消息:

  1. CREATE TABLE mqtt.retain (
  2. topic text PRIMARY KEY,
  3. msgid text
  4. );

查询 retain 消息:

  1. select * from mqtt_retain where topic = ${topic};

查询 topic 为 ‘t/retain’ 的 retain 消息:

  1. select * from mqtt_retain where topic = 't/retain';
  2. topic | msgid
  3. --------+----------------------
  4. retain | 2PguFrHsrzEvIIBdctmb

Cassandra 接收消息 ack 表(Message Acked Table)

mqtt.acked 存储客户端消息确认:

  1. CREATE TABLE mqtt.acked (
  2. client_id text,
  3. topic text,
  4. msgid text,
  5. PRIMARY KEY (client_id, topic)
  6. );

启用 Cassandra 存储插件

  1. ./bin/emqx_ctl plugins load emqx_backend_cassa