Cassandra Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Cassandra to setup Save data to Cassandra in rule engine.

Config file: etc/plugins/emqx_backend_cassa.conf

Configure Cassandra Cluster

Multi node Cassandra cluster is supported:

  1. ## Cassandra Node
  2. backend.ecql.pool1.nodes = 127.0.0.1:9042
  3. ## Cassandra Pool Size
  4. backend.ecql.pool1.size = 8
  5. ## Cassandra auto reconnect flag
  6. backend.ecql.pool1.auto_reconnect = 1
  7. ## Cassandra Username
  8. backend.ecql.pool1.username = cassandra
  9. ## Cassandra Password
  10. backend.ecql.pool1.password = cassandra
  11. ## Cassandra Keyspace
  12. backend.ecql.pool1.keyspace = mqtt
  13. ## Cassandra Logger type
  14. backend.ecql.pool1.logger = info
  15. ## Max number of fetch offline messages. Without count limit if infinity
  16. ## backend.cassa.max_returned_count = 500
  17. ## Time Range. Without time limit if infinity
  18. ## d - day
  19. ## h - hour
  20. ## m - minute
  21. ## s - second
  22. ## backend.cassa.time_range = 2h

Configure Cassandra Persistence Hooks

  1. ## Client Connected Record
  2. backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  3. ## Subscribe Lookup Record
  4. backend.cassa.hook.client.connected.2 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}
  5. ## Client DisConnected Record
  6. backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  7. ## Lookup Unread Message QOS > 0
  8. backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
  9. ## Lookup Retain Message
  10. backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  11. ## Store Publish Message QOS > 0
  12. backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  13. ## Delete Acked Record
  14. backend.cassa.hook.session.unsubscribed.1= {"topic": "#", "action": {"cql": ["delete from acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
  15. ## Store Retain Message
  16. backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  17. ## Delete Retain Message
  18. backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  19. ## Store Ack
  20. backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  21. ## Get offline messages
  22. ### "offline_opts": Get configuration for offline messages
  23. ### max_returned_count: Maximum number of offline messages get at a time
  24. ### time_range: Get only messages in the current time range
  25. ## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
  26. ## If you need to store Qos0 messages, you can enable the following configuration
  27. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
  28. ## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of Cassandra Persistence Hooks

hooktopicactionDescription
client.connectedon_client_connectedStore client connected state
client.connectedon_subscribe_lookupSubscribed topics
client.disconnectedon_client_disconnectedStore client disconnected state
session.subscribed#on_message_fetchFetch offline messages
session.subscribed#on_retain_lookupLookup retained messages
message.publish#on_message_publishStore published messages
message.publish#on_message_retainStore retained messages
message.publish#on_retain_deleteDelete retained messages
message.acked#on_message_ackedProcess ACK

CQL Parameters Description

Customized CQL command parameters includes:

hookParameterExample (${name} in CQL represents available parameter
client.connectedclientidinsert into conn(clientid) values(${clientid})
client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
message.deliveredmsgid, topic, clientidinsert into delivered(msgid, topic) values(${msgid}, ${topic})

Configure ‘action’ with CQL

Cassandra backend supports CLQ in ‘action’:

  1. ## After a client is connected to the EMQX server, it executes a CQL command(multiple command also supported):
  2. backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Initializing Cassandra

Create KeySpace:

  1. CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
  2. USE mqtt;

Import Cassandra tables:

  1. cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"

TIP

KeySpace is free of choice

Cassandra Client Connection Table

mqtt.client stores client connection states:

  1. CREATE TABLE mqtt.client (
  2. clientid text,
  3. node text,
  4. state int,
  5. connected timestamp,
  6. disconnected timestamp,
  7. PRIMARY KEY(clientid)
  8. );

Query a client’s connection state:

  1. select * from mqtt.client where clientid = ${clientid};

If client ‘test’ is online:

  1. select * from mqtt.client where clientid = 'test';
  2. clientid | connected | disconnected | node | state
  3. -----------+---------------------------------+---------------+---------------+-------
  4. test | 2017-02-14 08:27:29.872000+0000 | null | emqx@127.0.0.1| 1

Client ‘test’ is offline:

  1. select * from mqtt.client where clientid = 'test';
  2. clientid | connected | disconnected | node | state
  3. -----------+---------------------------------+---------------------------------+---------------+-------
  4. test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0

Cassandra Subscription Table

mqtt.sub stores subscriptions of clients:

  1. CREATE TABLE mqtt.sub (
  2. clientid text,
  3. topic text,
  4. qos int,
  5. PRIMARY KEY(clientid, topic)
  6. );

Client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

  1. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1);
  2. insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);

Query subscriptions of a client:

  1. select * from mqtt_sub where clientid = ${clientid};

Query subscriptions of client ‘test’:

  1. select * from mqtt_sub where clientid = 'test';
  2. clientid | topic | qos
  3. -----------+-------------+-----
  4. test | test_topic1 | 1
  5. test | test_topic2 | 2

Cassandra Message Table

mqtt.msg stores MQTT messages:

  1. CREATE TABLE mqtt.msg (
  2. topic text,
  3. msgid text,
  4. sender text,
  5. qos int,
  6. retain int,
  7. payload blob,
  8. arrived timestamp,
  9. PRIMARY KEY(topic, msgid)
  10. ) WITH CLUSTERING ORDER BY (msgid DESC);

Query messages published by a client:

  1. select * from mqtt_msg where sender = ${clientid};

Query messages published by client ‘test’:

  1. select * from mqtt_msg where sender = 'test';
  2. topic | msgid | arrived | payload | qos | retain | sender
  3. -------+----------------------+---------------------------------+--------------+-----+--------+--------
  4. hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
  5. world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test

Cassandra Retained Message Table

mqtt.retain stores retained messages:

  1. CREATE TABLE mqtt.retain (
  2. topic text,
  3. msgid text,
  4. PRIMARY KEY(topic)
  5. );

Query retained messages:

  1. select * from mqtt_retain where topic = ${topic};

Query retained messages with topic ‘retain’:

  1. select * from mqtt_retain where topic = 'retain';
  2. topic | msgid
  3. --------+----------------------
  4. retain | 2PguFrHsrzEvIIBdctmb

Cassandra Acknowledgement Table

mqtt.acked stores acknowledgements from the clients:

  1. CREATE TABLE mqtt.acked (
  2. clientid text,
  3. topic text,
  4. msgid text,
  5. PRIMARY KEY(clientid, topic)
  6. );

Enable Cassandra Backend

  1. ./bin/emqx_ctl plugins load emqx_backend_cassa