PostgreSQL

通过 PostgreSQL 数据桥接可以将 MQTT 消息和客户端事件存储到 PostgreSQL 中,也可以通过事件触发对 PostgreSQL 中数据的更新或删除操作,从而实现对诸如设备在线状态、上下线历史等的记录。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用PostgreSQL - 图1 (opens new window)

提示

本章节内容同样适用于 TimescaleDB 以及 MatrixDB。

先决条件

特性

配置参数

快速开始

安装 PostgreSQL

通过 Docker 安装并启动 PostgreSQL:

  1. # 启动一个 PostgreSQL 容器并设置密码为 public
  2. docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres
  3. # 进入容器
  4. docker exec -it PostgreSQL bash
  5. # 在容器中连接到 PostgreSQL 服务器,需要输入预设的密码
  6. psql -U postgres -W
  7. # 创建并选择数据库
  8. CREATE DATABASE emqx_data;
  9. \c emqx_data;

在 PostgreSQL 中创建两张表:

数据表 t_mqtt_msg存储每条消息的发布者客户端 ID、主题、Payload 以及发布时间:

  1. CREATE TABLE t_mqtt_msg (
  2. id SERIAL primary key,
  3. msgid character varying(64),
  4. sender character varying(64),
  5. topic character varying(255),
  6. qos integer,
  7. retain integer,
  8. payload text,
  9. arrived timestamp without time zone
  10. );

数据表 emqx_client_events 存储上下线的客户端 ID、事件类型以及事件发生时间:

  1. CREATE TABLE emqx_client_events (
  2. id SERIAL primary key,
  3. clientid VARCHAR(255),
  4. event VARCHAR(255),
  5. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  6. );

连接到 PostgreSQL

需要创建两个 PostgreSQL 数据桥接分别完成消息存储与事件记录:

消息存储

本节我们将创建第一个 PostgreSQL 数据桥接来实现对客户端发布消息的存储。

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。

  2. 点击页面右上角的创建

  3. 在数据桥接类型中选择 PostgreSQL,点击下一步

  4. 输入数据桥接名称,要求是大小写英文字母和数字的组合。

  5. 输入 PostgreSQL 连接信息,主机列表填写 127.0.0.1:5432数据库填写 emqx_data用户名postgres密码public

  6. 配置 SQL 模板,使用如下 SQL 完成数据插入,此处为预处理 SQL,字段不应当包含引号,SQL 末尾不要带分号 ;

    1. INSERT INTO t_mqtt_msg(msgid, sender, topic, qos, payload, arrived) VALUES(
    2. ${id},
    3. ${clientid},
    4. ${topic},
    5. ${qos},
    6. ${payload},
    7. TO_TIMESTAMP((${timestamp} :: bigint)/1000)
    8. )
  7. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考配置参数

  8. 点击创建按钮完成数据桥接创建。

至此您已经完成数据桥接创建,接下来将继续创建一条规则来指定需要写入的数据。

创建数据转发规则

  1. 转到 Dashboard 数据集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule,在 SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息存储至 PostgreSQL,请确规则选择出来的字段(SELECT 部分)包含所有 SQL 模板中用到的变量,此处规则 SQL 如下:

    1. SELECT
    2. *
    3. FROM
    4. "t/#"
  4. 添加动作,在动作下拉框中选择 使用数据桥接转发 选项,选择先前创建好的 PostgreSQL 数据桥接。

  5. 点击最下方创建按钮完成规则创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 PostgreSQL 存储。

上下线记录

在上面一个章节,我们实现了对指定主题消息的存储,本节我们将演示如何通过 PostgreSQL 实现对设备上下线的记录。

注意:除 SQL 模板与规则外,其他操作步骤与消息存储章节完全相同。

数据桥接的 SQL 模板如下,请注意字段不应当包含引号,SQL 末尾不要带分号 ;:

  1. INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  2. ${clientid},
  3. ${event},
  4. TO_TIMESTAMP((${timestamp} :: bigint)/1000)
  5. )

规则 SQL 如下:

  1. SELECT
  2. *
  3. FROM
  4. "$events/client_connected", "$events/client_disconnected"

测试数据桥接与规则

使用 MQTTX 向 t/1 主题发布消息,此操作同时会触发上下线事件:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello PostgreSQL" }'

分别查看两个数据桥接运行统计,命中、发送成功次数均 +1。

查看数据是否已经写入表中,emqx_messages 表:

  1. emqx_data=# select * from t_mqtt_msg;
  2. id | msgid | sender | topic | qos | retain | payload
  3. | arrived
  4. ----+----------------------------------+--------+-------+-----+--------+-------------------------------+---------------------
  5. 1 | 0005F298A0F0AEE2F443000012DC0002 | emqx_c | t/1 | 0 | | { "msg": "hello PostgreSQL" } | 2023-01-19 07:10:32
  6. (1 row)

emqx_client_events 表:

  1. emqx_data=# select * from emqx_client_events;
  2. id | clientid | event | created_at
  3. ----+----------+---------------------+---------------------
  4. 3 | emqx_c | client.connected | 2023-01-19 07:10:32
  5. 4 | emqx_c | client.disconnected | 2023-01-19 07:10:32
  6. (2 rows)