使用 Python 开发 MQTT 客户端

Python 是一种简单易学而又功能强大的解释性语言,它语法简洁,拥有丰富的标准库和第三方库,使用户能够专注于解决问题而非语言本身,非常适合快速开发。

本章节以简单的例子讲解如何建立一个初步的 Python MQTT 客户端。在这里,我们会用到 paho-mqtt 库。Paho 是 Eclipse 的一个开源 MQTT 项目,包含多种语言实现,Python 是其中之一。

安装 paho-mqtt

如果在您的系统中已经有了 Python 环境(大多数 linux 发布和 MacOS 中已经包含 Python 环境,在 Windows 下需要单独安装),使用以下命令即可安装 paho-mqtt:

  1. pip install paho-mqtt

或者使用 python 虚拟环境 virtualenv 建立一个和其他项目隔离的 mqtt 客户端项目环境,然后在此环境中安装 paho-mqtt:

  1. virtualenv mqtt-client
  2. source mqtt-client/bin/active
  3. pip install paho-mqtt

实现一个简单的客户端

paho-mqtt 提供了 3 个类,Client、Publish 和 Subscribe。后两者仅提供了简单的方法用于一次性的发布和订阅消息,但并不保持连接。Client 包含了连接、订阅、发布和回调函数。编写一个 MQTT 客户端,一般会使用 Client 类,用 Client 的实例来建立并维持和 Broker 的连接、订阅和发送消息,并在需要时断开连接:

  • 建立一个 Client 实例
  • 使用 Client 实例的 connect*() 函数进行连接
  • 用 loop*() 函数保持和处理客户端和 broker 之间连接和数据流
  • 使用 subscribe() 函数订阅主题
  • 使用 publish() 函数发布消息
  • 使用 disconnect() 函数连断开和 broker 的连接

在应用程序需要处理事件的时候,回调函数会被调用。

这里我们只对 Paho 客户端的 Python 实现做简单的介绍,关于该客户端的更详尽的说明请参阅 官方文档

初始化和建立连接

使用 Client 类的构造函数如下:

  1. Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

构造函数的参数定义了 Client 的基本属性,并都有缺省值:

  • client_id:MQTT 协议的 client ID;
  • clean_session:MQTT 协议的 clean_session 属性;
  • userdata:用户定义的任何类型的数据。会在回调函数中以 userdata 参数传递给该回调函数;
  • protocol:使用的 MQTT 协议版本;
  • transport:使用的传输协议。

除了以上这些基本属性,paho 客户端的 Client 类还提供了其他函数来配置客户端,如设置 Inflight 窗口大小,配置 TLS 连接,设置 Will 消息,配置 Logger 等。

  1. # 导入 paho-mqtt 的 Client:
  2. import paho.mqtt.client as mqtt
  3. # 用于响应服务器端 CONNACK 的 callback,如果连接正常建立,rc 值为 0
  4. def on_connect(client, userdata, flags, rc):
  5. print("Connection returned with result code:" + str(rc))
  6. # 用于响应服务器端 PUBLISH 消息的 callback,打印消息主题和内容
  7. def on_message(client, userdata, msg):
  8. print("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
  9. # 在连接断开时的 callback,打印 result code
  10. def on_disconnect(client, userdata, rc):
  11. print("Connection returned result:"+ str(rc))
  12. # 构造一个 Client 实例
  13. client = mqtt.Client()
  14. client.on_connect = on_connect
  15. client.on_disconnect= on_disconnect
  16. client.on_message = on_message
  17. # 连接 broker
  18. # connect() 函数是阻塞的,在连接成功或失败后返回。如果想使用异步非阻塞方式,可以使用 connect_async() 函数。
  19. client.connect("192.168.1.165", 1883, 60)

网络循环

在连接到 broker 之后,需要网络循环函数来处理消息收发和保持和 broker 的连接。paho 提供 loop(), loop_forever(),loop_start() 和 loop_stop() 四个函数处理网络循环。 最基本的方式是使用 loop() 函数:

  1. loop(timeout = 1.0, max_packets = 1)

参数中的 timeout 单位为秒,设置不应超过 keepalive 值,否则可能会导致经常性的断线。max_packets 参数现已废弃,不应再使用。loop() 函数会阻塞到 select() 调用返回套接字可读 / 写或者发生 timeout。

loop_forever() 以阻塞的方式处理网络循环,直到客户端调用 disconnect() 之前都不会返回。它自动处理重连。

loop_start()/loop_stop() 以实现线程接口的方式异步非阻塞的处理网络循环。loop_start() 会在后台起一个线程自动调用 loop() 函数,这样主线程可以处理其他工作。loop_start() 函数自动处理重连。调用 loop_stop() 会中止这个后台线程。

例程:

  1. client.loop_start()

发布消息

publish() 函数发送一条消息至 broker。

  1. publish(topic, payload=None, qos=0, retain=False)

参数 topic 为发送消息的主题,不可为空。payload 长度和 qos 参数的设定需要符合 MQTT 协议标准,retain 默认是 False。

publish() 函数返回一个 MQTTMessageInfo 对象,该对象的 rc 属性为 result code,mid 属性为 message ID。其他属性和方法请参阅 官方文档

当消息被发送到 broker 之后,on_publish() 回调函数会被调用。

例程:

  1. client.publish("hello", payload = "Hello world!")

订阅消息

subscribe() 函数为客户端订阅一个或多个主题。

  1. subscribe(topic, qos = 0)

使用 subscribe() 函数有 3 种形式:

  • subscribe(“my/topic”, 2)
    topic 为字符串,指定需要订阅的主题。qos 为订阅的 QoS 级别,默认为 0
  • subscribe((“my/topic”, 1))
    topic 为二元组,二元组的第一个元素为字符串,指定需要订阅的主题,第二个元素为订阅的 QoS 级别。这两个元素都必须出现在元组内。qos 未使用。
  • subscribe([(“my/topic”, 0), (“another/topic”, 2)])
    topic 为一个二元组列表,列表中的元素同第二种方法。qos 未使用。这种方法可以以一次函数调用完成多个订阅。

当 broker 确认订阅以后,on_subscribe() 回调函数会被调用。

例程:

  1. client.subscribe([("temperature", 0), ("humidity", 0)])

退订消息

退订一个主题:

  1. unsubscribe(topic)

当 broker 确认退订以后,on_unsubscribe() 回调函数会被调用。

完整例程

  1. #-*-coding:utf-8-*-
  2. # 导入 paho-mqtt 的 Client:
  3. import paho.mqtt.client as mqtt
  4. import time
  5. unacked_sub = [] #未获得服务器响应的订阅消息 id 列表
  6. # 用于响应服务器端 CONNACK 的 callback,如果连接正常建立,rc 值为 0
  7. def on_connect(client, userdata, flags, rc):
  8. print("Connection returned with result code:" + str(rc))
  9. # 用于响应服务器端 PUBLISH 消息的 callback,打印消息主题和内容
  10. def on_message(client, userdata, msg):
  11. print("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
  12. # 在连接断开时的 callback,打印 result code
  13. def on_disconnect(client, userdata, rc):
  14. print("Disconnection returned result:"+ str(rc))
  15. # 在订阅获得服务器响应后,从为响应列表中删除该消息 id
  16. def on_subscribe(client, userdata, mid, granted_qos):
  17. unacked_sub.remove(mid)
  18. # 构造一个 Client 实例
  19. client = mqtt.Client()
  20. client.on_connect = on_connect
  21. client.on_disconnect= on_disconnect
  22. client.on_message = on_message
  23. client.on_subscribe = on_subscribe
  24. # 连接 broker
  25. # connect() 函数是阻塞的,在连接成功或失败后返回。如果想使用异步非阻塞方式,可以使用 connect_async() 函数。
  26. client.connect("192.168.1.165", 1883, 60)
  27. client.loop_start()
  28. # 订阅单个主题
  29. result, mid = client.subscribe("hello", 0)
  30. unacked_sub.append(mid)
  31. # 订阅多个主题
  32. result, mid = client.subscribe([("temperature", 0), ("humidity", 0)])
  33. unacked_sub.append(mid)
  34. while len(unacked_sub) != 0:
  35. time.sleep(1)
  36. client.publish("hello", payload = "Hello world!")
  37. client.publish("temperature", payload = "24.0")
  38. client.publish("humidity", payload = "65%")
  39. # 断开连接
  40. time.sleep(5) #等待消息处理结束
  41. client.loop_stop()
  42. client.disconnect()

运行结果:

  1. Connection returned with result code: 0
  2. Received message, topic: hello payload: Hello world!
  3. Received message, topic: temperature payload: 24.0
  4. Received message, topic: humidity payload: 65%
  5. Disconnection returned result: 0