MongoDB Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to MongoDB to setup Save data to MongoDB in rule engine.

Config file: emqx_backend_mongo.conf

Configure MongoDB Server

Connection pool of multiple MongoDB servers is supported:

  1. ## MongoDB Server Pools
  2. ## Mongo Topology Type single|unknown|sharded|rs
  3. backend.mongo.pool1.type = single
  4. ## If type rs, need config setname
  5. ## backend.mongo.pool1.rs_set_name = testrs
  6. ## Mongo Server 127.0.0.1:27017,127.0.0.2:27017...
  7. backend.mongo.pool1.server = 127.0.0.1:27017
  8. ## MongoDB Pool Size
  9. backend.mongo.pool1.c_pool_size = 8
  10. ## MongoDB Database
  11. backend.mongo.pool1.database = mqtt
  12. ## Mongo User
  13. ## backend.mongo.pool1.login = emqtt
  14. ## Mongo Password
  15. ## backend.mongo.pool1.password = emqtt
  16. ## MongoDB AuthSource
  17. ## Value: String
  18. ## Default: mqtt
  19. ## backend.mongo.pool1.auth_source = admin
  20. ## Whether to enable SSL connection.
  21. ##
  22. ## Value: true | false
  23. ## backend.mongo.pool1.ssl = false
  24. ## SSL keyfile.
  25. ##
  26. ## Value: File
  27. ## backend.mongo.pool1.keyfile =
  28. ## SSL certfile.
  29. ##
  30. ## Value: File
  31. ## backend.mongo.pool1.certfile =
  32. ## SSL cacertfile.
  33. ##
  34. ## Value: File
  35. ## backend.mongo.pool1.cacertfile =
  36. # Value: unsafe | safe
  37. ## backend.mongo.pool1.w_mode = safe
  38. ## Value: master | slave_ok
  39. ## backend.mongo.pool1.r_mode = slave_ok
  40. ## Mongo Topology Options
  41. ## backend.mongo.topology.pool_size = 1
  42. ## backend.mongo.topology.max_overflow = 0
  43. ## backend.mongo.topology.overflow_ttl = 1000
  44. ## backend.mongo.topology.overflow_check_period = 1000
  45. ## backend.mongo.topology.local_threshold_ms = 1000
  46. ## backend.mongo.topology.connect_timeout_ms = 20000
  47. ## backend.mongo.topology.socket_timeout_ms = 100
  48. ## backend.mongo.topology.server_selection_timeout_ms = 30000
  49. ## backend.mongo.topology.wait_queue_timeout_ms = 1000
  50. ## backend.mongo.topology.heartbeat_frequency_ms = 10000
  51. ## backend.mongo.topology.min_heartbeat_frequency_ms = 1000
  52. ## Max number of fetch offline messages. Without count limit if infinity
  53. ## backend.mongo.max_returned_count = 500
  54. ## Time Range. Without time limit if infinity
  55. ## d - day
  56. ## h - hour
  57. ## m - minute
  58. ## s - second
  59. ## backend.mongo.time_range = 2h

Configure MongoDB Persistence Hooks

  1. ## Client Connected Record
  2. backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  3. ## Subscribe Lookup Record
  4. backend.mongo.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  5. ## Client DisConnected Record
  6. backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  7. ## Lookup Unread Message QOS > 0
  8. backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
  9. ## Lookup Retain Message
  10. backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  11. ## Store Publish Message QOS > 0, payload_format options mongo_json | plain_text
  12. backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}
  13. ## Store Retain Message, payload_format options mongo_json | plain_text
  14. backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}
  15. ## Delete Retain Message
  16. backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  17. ## Store Ack
  18. backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  19. ## Get offline messages
  20. ### "offline_opts": Get configuration for offline messages
  21. ### max_returned_count: Maximum number of offline messages get at a time
  22. ### time_range: Get only messages in the current time range
  23. ## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
  24. ## If you need to store Qos0 messages, you can enable the following configuration
  25. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
  26. ## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of MongoDB Persistence Hooks

hooktopicactionDescription
client.connectedon_client_connectedStore client connected state
client.connectedon_subscribe_lookupSubscribed topics
client.disconnectedon_client_disconnectedStore client disconnected state
session.subscribed#on_message_fetchFetch offline messages
session.subscribed#on_retain_lookupLookup retained messages
message.publish#on_message_publishStore published messages
message.publish#on_message_retainStore retained messages
message.publish#on_retain_deleteDelete retained messages
message.acked#on_message_ackedProcess ACK

Create MongoDB DB & Collections

  1. use mqtt
  2. db.createCollection("mqtt_client")
  3. db.createCollection("mqtt_sub")
  4. db.createCollection("mqtt_msg")
  5. db.createCollection("mqtt_retain")
  6. db.createCollection("mqtt_acked")
  7. db.mqtt_client.ensureIndex({clientid:1, node:2})
  8. db.mqtt_sub.ensureIndex({clientid:1})
  9. db.mqtt_msg.ensureIndex({sender:1, topic:2})
  10. db.mqtt_retain.ensureIndex({topic:1})

TIP

DB name is free of choice

MongoDB MQTT Client Collection

mqtt_client stores MQTT clients’ connection states:

  1. {
  2. clientid: string,
  3. state: 0,1, //0 disconnected 1 connected
  4. node: string,
  5. online_at: timestamp,
  6. offline_at: timestamp
  7. }

Query client’s connection state:

  1. db.mqtt_client.findOne({clientid: ${clientid}})

E.g., if client ‘test’ is online:

  1. db.mqtt_client.findOne({clientid: "test"})
  2. {
  3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
  4. "clientid" : "test",
  5. "state" : 1,
  6. "node" : "emqx@127.0.0.1",
  7. "online_at" : 1482976411,
  8. "offline_at" : null
  9. }

Client ‘test’ is offline:

  1. db.mqtt_client.findOne({clientid: "test"})
  2. {
  3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
  4. "clientid" : "test",
  5. "state" : 0,
  6. "node" : "emq@127.0.0.1",
  7. "online_at" : 1482976411,
  8. "offline_at" : 1482976501
  9. }

MongoDB Subscription Collection

mqtt_sub stores subscriptions of clients:

  1. {
  2. clientid: string,
  3. topic: string,
  4. qos: 0,1,2
  5. }

E.g., client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

  1. db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
  2. db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})

Query subscription of client ‘test’:

  1. db.mqtt_sub.find({clientid: "test"})
  2. { "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
  3. { "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }

MongoDB Message Collection

mqtt_msg stores MQTT messages:

  1. {
  2. _id: int,
  3. topic: string,
  4. msgid: string,
  5. sender: string,
  6. qos: 0,1,2,
  7. retain: boolean (true, false),
  8. payload: string,
  9. arrived: timestamp
  10. }

Query messages published by a client:

  1. db.mqtt_msg.find({sender: ${clientid}})

Query messages published by client ‘test’:

  1. db.mqtt_msg.find({sender: "test"})
  2. {
  3. "_id" : 1,
  4. "topic" : "/World",
  5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
  6. "sender" : "test",
  7. "qos" : 1,
  8. "retain" : 1,
  9. "payload" : "Hello world!",
  10. "arrived" : 1482976729
  11. }

MongoDB Retained Message Collection

mqtt_retain stores retained messages:

  1. {
  2. topic: string,
  3. msgid: string,
  4. sender: string,
  5. qos: 0,1,2,
  6. payload: string,
  7. arrived: timestamp
  8. }

Query retained messages:

  1. db.mqtt_retain.findOne({topic: ${topic}})

Query retained messages with topic ‘retain’:

  1. db.mqtt_retain.findOne({topic: "/World"})
  2. {
  3. "_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
  4. "topic" : "/World",
  5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
  6. "sender" : "c1",
  7. "qos" : 1,
  8. "payload" : "Hello world!",
  9. "arrived" : 1482976729
  10. }

MongoDB Acknowledgement Collection

mqtt_acked stores acknowledgements from the clients:

  1. {
  2. clientid: string,
  3. topic: string,
  4. mongo_id: int
  5. }

Enable MongoDB Backend

  1. ./bin/emqx_ctl plugins load emqx_backend_mongo