Python 是一种跨平台的计算机程序设计语言,是ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。
MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。
本文主要介绍如何在 Python 项目中使用paho-mqtt客户端库 ,实现客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。
一、项目准备
本项目使用 Python 3.10进行开发测试。
用户可用以下命令来确认 Python的版本:
python3 --version
Python 3.10.9
测试设备:
瑞科慧联(RAK)网关RAK7268 V2、带温湿度传感器的数据采集器Sensor Hub
二、选择 MQTT 客户端库
paho-mqtt是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。
三、Pip 安装 Paho MQTT 客户端
Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。
pip3install paho.mqtt
四、Python MQTT 使用
1、连接 MQTT 服务器
本文将使用瑞科慧联LoRaWAN®网关提供的内置 MQTT服务,该服务基于 Mosquitto的开源消息代理。服务器接入信息如下:
- Broker:192.168.230.1
- TCP Port:1883
2、导入 Paho MQTT客户端
from paho.mqtt import client as mqtt
3、设置 MQTT Broker 连接参数
设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Pythonrandom.randint函数随机生成 MQTT 客户端 id。
MQTT_SERVER_IP ="192.168.230.1"
MQTT_PORT =1883
4、编写 MQTT 连接函数
编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据rc来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到broker.emqx.io。
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""连接MQTT服务器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回连接状态的回调函数
mqttClient.on_message=on_message # 返回订阅消息回调函数
MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 启用线程连接
returnmqttClient
5、发布消息
定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端publish函数向/python/mqtt主题发送消息。
ddefon_publish():
# 发布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/1/device/0000000000000444/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}条消息发送成功'.format(msg_count))
else:
print('第{}条消息发送失败'.format(msg_count))
msg_count+=1
6、订阅消息
编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。
defon_subscribe():
"""订阅主题:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
time.sleep(1)
7、完整代码
消息订阅代码
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
网关通过mqtt发出数据
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦连接成功, 回调此方法"""
rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]
print("connect:",rc_status[rc])
defon_message(client,userdata,msg):
"""一旦订阅到消息, 回调此方法"""
print("主题"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))
print("主题"+msg.topic +" 消息"+str(msg.payload.decode()))
try:
temp=json.loads(msg.payload.decode())
# client.disconnect()
deveui=temp['devEUI']
print("devEUI: ",deveui)
data=temp['data']
print("解码前的data为: ",data)
data_decode=base64.b64decode(data).hex()
print("解码后的data为: ",data_decode)
str1=data_decode[4:]
ifstr1[0:4]=="0167":
a=int(str1[4:8],16)*0.1
print("温度:",a,"℃")
ifstr1[8:12]=="0268":
b=int(str1[12:16],16)
print("湿度:",b,"%RH")
elifstr1[0:4]=="0268":
c=int(str1[4:8],16)
print("湿度:",c,"%RH")
exceptExceptionase:
print(e)
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""连接MQTT服务器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回连接状态的回调函数
mqttClient.on_message=on_message # 返回订阅消息回调函数
MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 启用线程连接
returnmqttClient
defon_subscribe():
"""订阅主题:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
# allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")
# mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)
time.sleep(1)
if__name__=='__main__':
on_subscribe()
消息发布代码
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
网关通过mqtt发出数据
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦连接成功, 回调此方法"""
rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]
print("connect:",rc_status[rc])
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""连接MQTT服务器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回连接状态的回调函数
MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 启用线程连接
returnmqttClient
defon_publish():
# 发布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/x/device/x/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要发布的消息内容
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}条消息发送成功'.format(msg_count))
else:
print('第{}条消息发送失败'.format(msg_count))
msg_count+=1
if__name__=='__main__':
on_publish()
测试
消息发布
运行 MQTT消息发布代码,将看到客户端连接成功,并且成功将消息发布。
消息订阅
通过瑞科慧联带温湿度传感器的 Sensor hub进行数据传输,订阅并解析数据结果如下:
五、总结
至此,我们完成了使用paho-mqtt客户端连接到LoRaWAN®网关内置 MQTT服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。
与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!
-
物联网
+关注
关注
2902文章
44196浏览量
370789 -
python
+关注
关注
55文章
4777浏览量
84421 -
MQTT
+关注
关注
5文章
648浏览量
22407
发布评论请先 登录
相关推荐
评论