EMQX Broker 写入

MQTT 是流行的物联网数据传输协议,EMQX是一开源的 MQTT Broker 软件,无需任何代码,只需要在 EMQX Dashboard 里使用“规则”做简单配置,即可将 MQTT 的数据直接写入 TDengine。EMQX 支持通过 发送到 Web 服务的方式保存数据到 TDengine,也在企业版上提供原生的 TDengine 驱动实现直接保存。

前置条件

要让 EMQX 能正常添加 TDengine 数据源,需要以下几方面的准备工作。

  • TDengine 集群已经部署并正常运行
  • taosAdapter 已经安装并正常运行。具体细节请参考 taosAdapter 的使用手册
  • 如果使用后文介绍的模拟写入程序,需要安装合适版本的 Node.js,推荐安装 v12

安装并启动 EMQX

用户可以根据当前的操作系统,到 EMQX 官网下载安装包,并执行安装。下载地址如下:https://www.emqx.io/zh/downloads。安装后使用 sudo emqx startsudo systemctl start emqx 启动 EMQX 服务。

注意:本文基于 EMQX v4.4.5 版本,其他版本由于相关配置界面、配置方法以及功能可能随着版本升级有所区别。

创建数据库和表

在 TDengine 中为接收 MQTT 数据创建相应数据库和表结构。进入 TDengine CLI 复制并执行以下 SQL 语句:

  1. CREATE DATABASE test;
  2. USE test;
  3. CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volume FLOAT, pm10 FLOAT, pm25 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, sensor_id NCHAR(255), area TINYINT, coll_time TIMESTAMP);

注:表结构以博客数据传输、存储、展现,EMQX + TDengine 搭建 MQTT 物联网数据可视化平台为例。后续操作均以此博客场景为例进行,请你根据实际应用场景进行修改。

配置 EMQX 规则

由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.3 为例,其他版本请参考相应官网文档。

登录 EMQX Dashboard

使用浏览器打开网址 http://IP:18083 并登录 EMQX Dashboard。初次安装用户名为 admin 密码为:public

TDengine Database EMQX login dashboard

创建规则(Rule)

选择左侧“规则引擎(Rule Engine)”中的“规则(Rule)”并点击“创建(Create)”按钮:

TDengine Database EMQX rule engine

编辑 SQL 字段

复制以下内容输入到 SQL 编辑框:

  1. SELECT
  2. payload
  3. FROM
  4. "sensor/data"

其中 payload 代表整个消息体, sensor/data 为本规则选取的消息主题。

TDengine Database EMQX create rule

新增“动作(action handler)”

TDengine Database EMQX

新增“资源(Resource)”

TDengine Database EMQX create resource

选择“发送数据到 Web 服务”并点击“新建资源”按钮:

编辑“资源(Resource)”

选择“WebHook”并填写“请求 URL”为 taosAdapter 提供 REST 服务的地址,如果是本地启动的 taosadapter, 那么默认地址为:

  1. http://127.0.0.1:6041/rest/sql

其他属性请保持默认值。

TDengine Database EMQX edit resource

编辑“动作(action)”

编辑资源配置,增加 Authorization 认证的键/值配对项。默认用户名和密码对应的 Authorization 值为:

  1. Basic cm9vdDp0YW9zZGF0YQ==

相关文档请参考TDengine REST API 文档

在消息体中输入规则引擎替换模板:

  1. INSERT INTO test.sensor_data VALUES(
  2. now,
  3. ${payload.temperature},
  4. ${payload.humidity},
  5. ${payload.volume},
  6. ${payload.PM10},
  7. ${payload.pm25},
  8. ${payload.SO2},
  9. ${payload.NO2},
  10. ${payload.CO},
  11. '${payload.id}',
  12. ${payload.area},
  13. ${payload.ts}
  14. )

TDengine Database EMQX edit action

最后点击左下方的 “Create” 按钮,保存规则。

编写模拟测试程序

  1. // mock.js
  2. const mqtt = require('mqtt')
  3. const Mock = require('mockjs')
  4. const EMQX_SERVER = 'mqtt://localhost:1883'
  5. const CLIENT_NUM = 10
  6. const STEP = 5000 // Data interval in ms
  7. const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
  8. const CLIENT_POOL = []
  9. startMock()
  10. function sleep(timer = 100) {
  11. return new Promise(resolve => {
  12. setTimeout(resolve, timer)
  13. })
  14. }
  15. async function startMock() {
  16. const now = Date.now()
  17. for (let i = 0; i < CLIENT_NUM; i++) {
  18. const client = await createClient(`mock_client_${i}`)
  19. CLIENT_POOL.push(client)
  20. }
  21. // last 24h every 5s
  22. const last = 24 * 3600 * 1000
  23. for (let ts = now - last; ts <= now; ts += STEP) {
  24. for (const client of CLIENT_POOL) {
  25. const mockData = generateMockData()
  26. const data = {
  27. ...mockData,
  28. id: client.clientId,
  29. area: 0,
  30. ts,
  31. }
  32. client.publish('sensor/data', JSON.stringify(data))
  33. }
  34. const dateStr = new Date(ts).toLocaleTimeString()
  35. console.log(`${dateStr} send success.`)
  36. await sleep(AWAIT)
  37. }
  38. console.log(`Done, use ${(Date.now() - now) / 1000}s`)
  39. }
  40. /**
  41. * Init a virtual mqtt client
  42. * @param {string} clientId ClientID
  43. */
  44. function createClient(clientId) {
  45. return new Promise((resolve, reject) => {
  46. const client = mqtt.connect(EMQX_SERVER, {
  47. clientId,
  48. })
  49. client.on('connect', () => {
  50. console.log(`client ${clientId} connected`)
  51. resolve(client)
  52. })
  53. client.on('reconnect', () => {
  54. console.log('reconnect')
  55. })
  56. client.on('error', (e) => {
  57. console.error(e)
  58. reject(e)
  59. })
  60. })
  61. }
  62. /**
  63. * Generate mock data
  64. */
  65. function generateMockData() {
  66. return {
  67. "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
  68. "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
  69. "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
  70. "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
  71. "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
  72. "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  73. "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  74. "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  75. "area": Mock.Random.integer(0, 20),
  76. "ts": 1596157444170,
  77. }
  78. }

查看源码

注意:代码中 CLIENT_NUM 在开始测试中可以先设置一个较小的值,避免硬件性能不能完全处理较大并发客户端数量。

TDengine Database EMQX client num

执行测试模拟发送 MQTT 数据

  1. npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
  2. node mock.js

TDengine Database EMQX run-mock

验证 EMQX 接收到数据

在 EMQX Dashboard 规则引擎界面进行刷新,可以看到有多少条记录被正确接收到:

TDengine Database EMQX rule matched

验证数据写入到 TDengine

使用 TDengine CLI 程序登录并查询相应数据库和表,验证数据是否被正确写入到 TDengine 中:

TDengine Database EMQX result in taos

TDengine 详细使用方法请参考 TDengine 官方文档。 EMQX 详细使用方法请参考 EMQX 官方文档