使用 JavaScript 开发 MQTT 客户端

作为标准的 MQTT 协议的一部分,EMQ X 支持通过 WebSocket 端口与客户端进行通信。WebSocket 是一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC 7936 补充规范。WebSocket API 也被 W3C 定为标准。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

客户端库比较

Paho.mqtt.js

Paho 是 Eclipse 的一个 MQTT 客户端项目,Paho JavaScript Client 是其基于浏览器 JavaScript 运行环境的库,它使用 WebSockets 连接到 MQTT 服务器。相较于 MQTT.js 来说,其功能较少,不推荐使用。

MQTT.js

MQTT.js 可用于 Node.js 环境和浏览器环境。在 Node.js 上可以通过全局安装使用命令行连接,同时还支持 MQTT ,MQTT TLS 证书连接;值得一提的是 MQTT.js 对微信小程序有较好的支持。

安装 MQTT.js

如果在您的系统中已经有了 Node.js 环境,可使用 NPM 安装:

npm 安装使用

  1. npm i mqtt
  2. import mqtt from 'mqtt'

或使用 CDN 引用

  1. <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
  2. <script>
  3. // 将在全局初始化一个 mqtt 变量
  4. console.log(mqtt)
  5. </script>

实现一个简单的客户端

mqtt.js 在浏览器环境下支持 WebSocket MQTT 连接,Node.js 环境下支持 TCP(SSL/TLS) MQTT 连接,请根据您的使用场景选择对应的连接方式:

  • 使用 connect() 函数连接并返回一个 client 实例
  • client 有多个事件,在 client 事件中使用回调函数处理相关逻辑:
    • connect:连接成功事件
    • reconnect:连接错误、异常断开后重连的事件
    • error:连接错误、终止连接事件
    • message:收到订阅消息事件
  • client 有多个基本函数
    • 使用 subscribe() 函数订阅主题
    • 使用 publish() 函数发布消息
    • 使用 end() 函数连断开和 broker 的连接

这里我们只对 MQTT.js 客户端做简单的介绍,关于该客户端的更详尽的说明请参阅 MQTT.js 文档

初始化和建立连接

EMQ X 使用 8083 端口用于 WebSocket 普通连接,8084 用于 SSL 上的 WebSocket 连接。

本地连接 URL 为: ws://localhost:8083/mqtt

上述示范的连接地址可以拆分为: ws: // localhost : 8083 /mqtt

协议 // 域名 : 端口 / 路径

建立连接代码如下:

  1. // <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
  2. // const mqtt = require('mqtt')
  3. import mqtt from 'mqtt'
  4. // 连接选项
  5. const options = {
  6. // 超时时间
  7. connectTimeout: 4000,
  8. // 认证信息
  9. clientId: 'emqx',
  10. // username: 'emqx',
  11. // password: 'emqx',
  12. // 心跳时间
  13. keepalive: 60,
  14. clean: true,
  15. }
  16. // WebSocket 连接字符串
  17. const WebSocket_URL = 'ws://localhost:8083/mqtt'
  18. // TCP/TLS 连接字符串,仅限 Node.js 环境
  19. const TCP_URL = 'mqtt://localhost:1883'
  20. const TCP_TLS_URL = 'mqtts://localhost:8883'
  21. const client = mqtt.connect(WebSocket_URL, options)
  22. client.on('connect', () => {
  23. console.log('连接成功')
  24. })
  25. client.on('reconnect', (error) => {
  26. console.log('正在重连:', error)
  27. })
  28. client.on('error', (error) => {
  29. console.log('连接失败:', error)
  30. })

指定地址的时候请注意以下的内容:

  • 连接地址没有指明协议:WebSocket 使用 ws(非加密)、wss(SSL 加密) 作为协议标识。MQTT.js 客户端支持多种协议,连接地址需指明协议类型;
  • 连接地址没有指明端口:MQTT 协议并未对 WebSocket 接入端口做出规定,EMQ X 默认使用 8083 8084 分别作为非加密连接、加密连接端口。而 WebSocket 协议默认端口同 HTTP 保持一致 (80/443),不填写端口则表明使用 WebSocket 的默认端口连接;而使用标准 MQTT 连接时则无需指定端口,如 MQTT.js 在 Node.js 端可以使用 mqtt://localhost 连接至标准 MQTT 8083 端口,当连接地址是 mqtts://localhost 则连接到 8884 端口;
  • 错误的路径:EMQ X 默认使用 /mqtt 作为 WebSocket 连接路径,连接时需指明;
  • 协议与端口不符:使用了 wss 连接却连接到 8083 端口;
  • 在 HTTPS 站点下使用非加密的 WebSocket 连接: Google Chrome 等浏览器在 HTTPS 站点下会自动禁止使用非加密的 ws 协议发起连接请求;
  • 证书与连接地址不符:服务器端配置了错误的 SSL 证书。

发布消息

使用 publish() 函数发布消息到主题,发布的主题必须符合 MQTT 发布主题规则,否则客户端将断开连接。发布之前无需订阅该主题,但要确保客户端已成功连接:

  1. // 监听接收消息事件
  2. client.on('message', (topic, message) => {
  3. console.log('收到来自', topic, '的消息', message.toString())
  4. })
  5. // 判断是否已成功连接
  6. if (!client.connected) {
  7. console.log('客户端未连接')
  8. return
  9. }
  10. // publich(topic, payload, options/callback)
  11. client.publish('hello', 'hello EMQ X', (error) => {
  12. console.log(error || '消息发布成功')
  13. })

订阅消息

客户端连接成功之后才能订阅主题,订阅的主题必须符合 MQTT 订阅主题规则;

由于 JavaScript 异步非阻塞特性,只有在 connect 事件后才能确保客户端已成功连接,可通过 client.connected 属性判断是否连接成功:

错误示例

  1. import mqtt from 'mqtt'
  2. client = mqtt.connect('ws://localhost:8083/mqtt')
  3. client.on('connect', handleConnect)
  4. client.subscribe('hello')
  5. client.publish('hello', 'Hello EMQ X')

正确示例

  1. client.on('connect', () => {
  2. console.log('成功连接服务器')
  3. // 订阅一个主题
  4. client.subscribe('hello', { qos: 1 }, (error) => {
  5. if (!error) {
  6. cosnole.log('订阅成功')
  7. client.publish('hello', 'Hello EMQ X', { qos: 1, rein: false }, (error) => {
  8. console.log(error || '发布成功')
  9. })
  10. }
  11. })
  12. // 订阅多个主题
  13. client.subscribe(
  14. ['hello', 'one/two/three/#', '#'],
  15. { qos: 1 },
  16. (err) => {
  17. console.log(err || '订阅成功')
  18. },
  19. )
  20. // 订阅不同 qos 的不同主题
  21. client.subscribe(
  22. [
  23. { hello: 1 },
  24. { 'one/two/three': 2 },
  25. { '#': 0 }
  26. ],
  27. (err) => {
  28. console.log(err || '订阅成功')
  29. },
  30. )
  31. })

取消订阅

  1. client.unsubscribe(
  2. // topic, topic Array, topic Array-Onject
  3. 'hello',
  4. (err) => {
  5. console.log(err || '取消订阅成功')
  6. },
  7. )

完整示例

  1. // 导入 MQTT.js
  2. import mqtt from 'mqtt'
  3. // <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
  4. // const mqtt = require('mqtt')
  5. // WebSocket 连接字符串
  6. const WebSocket_URL = 'ws://localhost:8083/mqtt'
  7. // TCP/TLS 连接字符串,仅限 Node.js 环境
  8. const TCP_URL = 'mqtt://localhost:1883'
  9. const TCP_TLS_URL = 'mqtts://localhost:8883'
  10. // 连接选项
  11. const options = {
  12. // 超时时间
  13. connectTimeout: 4000,
  14. // 认证信息
  15. clientId: 'emqx',
  16. // username: 'emqx',
  17. // password: 'emqx',
  18. // 心跳时间
  19. keepalive: 60,
  20. clean: true,
  21. }
  22. const client = mqtt.connect(TCP_URL, options)
  23. // 连接成功后初始化订阅
  24. client.on('connect', () => {
  25. console.log('Connected to', TCP_URL)
  26. // 订阅主题
  27. client.subscribe('hello', (err) => {
  28. console.log(err || '订阅成功')
  29. })
  30. // 发布消息
  31. client.publish('hello', 'Hello EMQ X', (err) => {
  32. console.log(err || '发布成功')
  33. })
  34. })
  35. // 为 message 时间添加处理函数
  36. client.on('message', (topic, message) => {
  37. console.log('收到来自', topic, '的消息:', message.toString())
  38. // 断开连接
  39. client.end()
  40. })

运行结果:

  1. Connected to mqtt://localhost:1883
  2. 发布成功
  3. 订阅成功
  4. 收到来自 hello 的消息: Hello EMQ X