PostgreSQL Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to PostgreSQL to setup Save data to PostgreSQL in rule engine.

Config file: emqx_backend_pgsql.conf

Configure PostgreSQL Server

TIP

Support PostgreSQL 13 and below versions

Connection pool of multiple PostgreSQL servers is supported:

  1. ## Pgsql Server
  2. backend.pgsql.pool1.server = 127.0.0.1:5432
  3. ## Pgsql Pool Size
  4. backend.pgsql.pool1.pool_size = 8
  5. ## Pgsql Username
  6. backend.pgsql.pool1.username = root
  7. ## Pgsql Password
  8. backend.pgsql.pool1.password = public
  9. ## Pgsql Database
  10. backend.pgsql.pool1.database = mqtt
  11. ## Pgsql Ssl
  12. backend.pgsql.pool1.ssl = false
  13. ## Max number of fetch offline messages. Without count limit if infinity
  14. ## backend.pgsql.max_returned_count = 500
  15. ## Time Range. Without time limit if infinity
  16. ## d - day
  17. ## h - hour
  18. ## m - minute
  19. ## s - second
  20. ## backend.pgsql.time_range = 2h

Configure PostgreSQL Persistence Hooks

  1. ## Client Connected Record
  2. backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  3. ## Subscribe Lookup Record
  4. backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  5. ## Client DisConnected Record
  6. backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  7. ## Lookup Unread Message QOS > 0
  8. backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
  9. ## Lookup Retain Message
  10. backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  11. ## Store Publish Message QOS > 0
  12. backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  13. ## Store Retain Message
  14. backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  15. ## Delete Retain Message
  16. backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  17. ## Store Ack
  18. backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  19. ## Get offline messages
  20. ### "offline_opts": Get configuration for offline messages
  21. ### max_returned_count: Maximum number of offline messages get at a time
  22. ### time_range: Get only messages in the current time range
  23. ## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
  24. ## If you need to store Qos0 messages, you can enable the following configuration
  25. ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
  26. ## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of PostgreSQL Persistence Hooks

hooktopicactionDescription
client.connectedon_client_connectedStore client connected state
client.connectedon_subscribe_lookupSubscribed topics
client.disconnectedon_client_disconnectedStore client disconnected state
session.subscribed#on_message_fetchFetch offline messages
session.subscribed#on_retain_lookupLookup retained messages
message.publish#on_message_publishStore published messages
message.publish#on_message_retainStore retained messages
message.publish#on_retain_deleteDelete retained messages
message.acked#on_message_ackedProcess ACK

SQL Parameters Description

hookParametersExample (${name} represents available parameter)
client.connectedclientidinsert into conn(clientid) values(${clientid})
client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
message.deliveredmsgid, topic, clientidinsert into delivered(msgid, topic) values(${msgid}, ${topic})

Configure ‘action’ with SQL

PostgreSQL backend supports SQL in ‘action’:

  1. ## After a client is connected to the EMQX server, it executes a SQL command (multiple command also supported)
  2. backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Create PostgreSQL DB

  1. createdb mqtt -E UTF8 -e

Import PostgreSQL DB & Table Schema

  1. \i etc/sql/emqx_backend_pgsql.sql

TIP

DB name is free of choice

PostgreSQL Client Connection Table

mqtt_client stores client connection states:

  1. CREATE TABLE mqtt_client(
  2. id SERIAL primary key,
  3. clientid character varying(100),
  4. state integer,
  5. node character varying(100),
  6. online_at timestamp,
  7. offline_at timestamp,
  8. created timestamp without time zone,
  9. UNIQUE (clientid)
  10. );

Query a client’s connection state:

  1. select * from mqtt_client where clientid = ${clientid};

E.g., if client ‘test’ is online:

  1. select * from mqtt_client where clientid = 'test';
  2. id | clientid | state | node | online_at | offline_at | created
  3. ----+----------+-------+----------------+---------------------+---------------------+---------------------
  4. 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22
  5. (1 rows)

Client ‘test’ is offline:

  1. select * from mqtt_client where clientid = 'test';
  2. id | clientid | state | nod | online_at | offline_at | created
  3. ----+----------+-------+----------------+---------------------+---------------------+---------------------
  4. 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22
  5. (1 rows)

PostgreSQL Subscription Table

mqtt_sub stores subscriptions of clients:

  1. CREATE TABLE mqtt_sub(
  2. id SERIAL primary key,
  3. clientid character varying(100),
  4. topic character varying(200),
  5. qos integer,
  6. created timestamp without time zone,
  7. UNIQUE (clientid, topic)
  8. );

E.g., client ‘test’ subscribes to topic ‘test_topic1’ and ‘test_topic2’:

  1. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
  2. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);

Query subscription of a client:

  1. select * from mqtt_sub where clientid = ${clientid};

Query subscription of client ‘test’:

  1. select * from mqtt_sub where clientid = 'test';
  2. id | clientId | topic | qos | created
  3. ----+--------------+-------------+------+---------------------
  4. 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05
  5. 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51
  6. (2 rows)

PostgreSQL Message Table

mqtt_msg stores MQTT messages:

  1. CREATE TABLE mqtt_msg (
  2. id SERIAL primary key,
  3. msgid character varying(60),
  4. sender character varying(100),
  5. topic character varying(200),
  6. qos integer,
  7. retain integer,
  8. payload text,
  9. arrived timestamp without time zone
  10. );

Query messages published by a client:

  1. select * from mqtt_msg where sender = ${clientid};

Query messages published by client ‘test’:

  1. select * from mqtt_msg where sender = 'test';
  2. id | msgid | topic | sender | node | qos | retain | payload | arrived
  3. ----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------
  4. 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12
  5. 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45
  6. (2 rows)

PostgreSQL Retained Message Table

mqtt_retain stores retained messages:

  1. CREATE TABLE mqtt_retain(
  2. id SERIAL primary key,
  3. topic character varying(200),
  4. msgid character varying(60),
  5. sender character varying(100),
  6. qos integer,
  7. payload text,
  8. arrived timestamp without time zone,
  9. UNIQUE (topic)
  10. );

Query retained messages:

  1. select * from mqtt_retain where topic = ${topic};

Query retained messages with topic ‘retain’:

  1. select * from mqtt_retain where topic = 'retain';
  2. id | topic | msgid | sender | node | qos | payload | arrived
  3. ----+----------+-------------------------------+---------+------+------+---------+---------------------
  4. 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18
  5. (1 rows)

PostgreSQL Acknowledgement Table

mqtt_acked stores acknowledgements from the clients:

  1. CREATE TABLE mqtt_acked (
  2. id SERIAL primary key,
  3. clientid character varying(100),
  4. topic character varying(100),
  5. mid integer,
  6. created timestamp without time zone,
  7. UNIQUE (clientid, topic)
  8. );

Enable PostgreSQL Backend

  1. ./bin/emqx_ctl plugins load emqx_backend_pgsql