Oracle Database

EMQX 支持将数据桥接到 Oracle Database,您可以将 MQTT 消息和客户端事件存储到 Oracle Database 中,也可以通过事件触发对 Oracle Database 中数据的更新或删除操作,从而实现对诸如设备在线状态、上下线历史等的记录。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用Oracle Database - 图1 (opens new window)

前置准备

功能清单

快速开始

本节将带您安装 Oracle Database 服务器并创建数据表,然后在 EMQX 创建 Oracle Database 的数据桥接,之后再通过创建规则来将消息和事件转发至 Oracle Database,以及验证数据桥接和规则是否正常工作。

本教程假定 EMQX 与 Oracle Database 均在本地运行,如您在远程运行 EMQX 及 Oracle Database,请根据实际情况调整相应配置。

安装 Oracle Database 服务器

通过 Docker 安装并启动 Oracle Database:

  1. # 启动本地 Oracle Database 的 docker 镜像
  2. docker run --name oracledb -p 1521:1521 -d oracleinanutshell/oracle-xe-11g:1.0.0
  3. # 启动远程 Oracle Database 的 docker 镜像
  4. docker run --name oracledb -p 1521:1521 -e ORACLE_ALLOW_REMOTE=true -d oracleinanutshell/oracle-xe-11g:1.0.0
  5. # 出于性能考虑,您可能希望禁用磁盘异步IO:
  6. docker run --name oracledb -p 1521:1521 -e ORACLE_DISABLE_ASYNCH_IO=true -d oracleinanutshell/oracle-xe-11g:1.0.0
  7. # 进入容器
  8. docker exec -it oracledb bash
  9. # 连接到默认数据库 "XE"
  10. # 用户名: "system"
  11. # 密码: "oracle"
  12. sqlplus

创建数据表

使用下面的 SQL 语句在 Oracle Database 中创建数据表 t_mqtt_msgs,该数据表用来存储每一条消息的消息 ID, 客户端 ID,主题, QoS, retain 标记, 消息 payload 和时间戳。

  1. CREATE TABLE t_mqtt_msgs (
  2. msgid VARCHAR2(64),
  3. sender VARCHAR2(64),
  4. topic VARCHAR2(255),
  5. qos NUMBER(1),
  6. retain NUMBER(1),
  7. payload NCLOB,
  8. arrived TIMESTAMP
  9. );

使用下面的 SQL 语句在 Oracle Database 中创建数据表 t_emqx_client_events,该数据表用来存储每一个事件的客户端 ID, 事件类型和创建时间。

  1. CREATE TABLE t_emqx_client_events (
  2. clientid VARCHAR2(255),
  3. event VARCHAR2(255),
  4. created_at TIMESTAMP
  5. );

创建 Oracle Database 数据桥接

消息存储和事件记录需要设置不同的 SQL 模版,因此您需要分别创建两个不同的数据桥接。

  1. 登陆 EMQX Dashboard,点击左侧目录菜单中的数据集成 -> 数据桥接

  2. 点击页面右上角的创建

  3. 数据桥接类型中选择 Oracle Database,点击下一步

  4. 输入数据桥接名称,名称应为大/小写字母和数字的组合。

  5. 输入以下连接信息:

    • 服务器地址:输入 http://127.0.0.1:1521,如果 Oracle Database 服务器在远程运行,则需输入实际地址。
    • 数据库名字: 输入 XE
    • Oracle Database SID: Input XE
    • 用户名: 输入 system
    • 密码: 输入 oracle
  6. 根据业务实现配置 SQL 模版

    注意:此处为预处理 SQL,字段不应当包含引号,SQL 末尾不要带分号 ;

    • 想要为消息存储创建数据桥接,使用下面的 SQL 语句:

      1. INSERT INTO t_mqtt_msgs(msgid, sender, topic, qos, retain, payload, arrived) VALUES(
      2. ${id},
      3. ${clientid},
      4. ${topic},
      5. ${qos},
      6. ${flags.retain},
      7. ${payload},
      8. TO_TIMESTAMP('1970-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(${timestamp}/1000, 'SECOND')
      9. )
    • 想要为上下线状态记录创建数据桥接,使用下面的 SQL 语句:

      1. INSERT INTO t_emqx_client_events(clientid, event, created_at) VALUES (
      2. ${clientid},
      3. ${event},
      4. TO_TIMESTAMP('1970-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(${timestamp}/1000, 'SECOND')
      5. )
  7. 其余选项均设为默认值。

  8. 在完成创建之前,您可以点击测试连接来测试桥接可以连接到 Oracle Database 服务器。

  9. 点击创建按钮完成数据桥接创建。

    在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 Oracle Database 的数据。详细步骤参考创建 Oracle Database 数据桥接规则

至此,您已经完成数据桥接的创建,在 Dashboard 的数据桥接页面,可以看到 Oracle Database 数据桥接的状态为已连接

创建 Oracle Database 数据桥接规则

在成功创建 Oracle Database 数据桥接之后,您需要继续为消息存储和设备上下线记录创建两条不同的转发规则。

  1. 转到 Dashboard 数据集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule,在 SQL 编辑器中根据业务实现需要输入规则:

    • 如需实现对指定主题消息的转发,例如将 t/# 主题的 MQTT 消息转发至 Oracle Database,输入以下 SQL 语法:

      注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。

      1. SELECT
      2. *
      3. FROM
      4. "t/#"
    • 如需实现设备上下线记录,输入以下 SQL 语法:

      1. SELECT
      2. *
      3. FROM
      4. "$events/client_connected", "$events/client_disconnected"
  4. 点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 Oracle Database 数据桥接。点击添加

  5. 点击创建按钮完成规则创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果转发至 Oracle Database。

测试桥接和规则

使用 MQTTX 向 t/1 主题发布消息,此操作同时会触发上下线事件:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Oracle Database" }'

分别查看两个数据桥接运行统计,命中、发送成功次数均 +1。

查看数据是否被写入 t_mqtt_msgs 数据表。

  1. SELECT * FROM t_mqtt_msgs;
  2. MSGID SENDER TOPIC QOS RETAIN PAYLOAD ARRIVED
  3. -------------------------------- ------ ----- --- ------ ---------------------------------- ----------------------------
  4. 0005FA6CE9EF9F24F442000048100002 emqx_c t/1 0 0 { "msg": "hello Oracle Database" } 28-APR-23 08.22.51.760000 AM

查看数据是否被写入 t_emqx_client_events 数据表。

  1. SELECT * FROM t_emqx_client_events;
  2. CLIENTID EVENT CREATED_AT
  3. -------- ------------------- ----------------------------
  4. emqx_c client.connected 28-APR-23 08.22.51.757000 AM
  5. emqx_c client.disconnected 28-APR-23 08.22.51.760000 AM