0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在Python中使用MQTT

瑞科慧联(RAK) 2022-12-22 10:41 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

Python 是一种跨平台的计算机程序设计语言,是ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。

MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。

本文主要介绍如何在 Python 项目中使用paho-mqtt客户端库 ,实现客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。

一、项目准备

本项目使用 Python 3.10进行开发测试。

用户可用以下命令来确认 Python的版本:

python3 --version

Python 3.10.9

测试设备:

瑞科慧联(RAK)网关RAK7268 V2、带温湿度传感器的数据采集器Sensor Hub

二、选择 MQTT 客户端库

paho-mqtt是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。

三、Pip 安装 Paho MQTT 客户端

Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、连接 MQTT 服务器

本文将使用瑞科慧联LoRaWAN®网关提供的内置 MQTT服务,该服务基于 Mosquitto的开源消息代理。服务器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、导入 Paho MQTT客户端

from paho.mqtt import client as mqtt

3、设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Pythonrandom.randint函数随机生成 MQTT 客户端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、编写 MQTT 连接函数

编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据rc来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

5、发布消息

定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端publish函数向/python/mqtt主题发送消息。

ddefon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

6、订阅消息

编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代码

消息订阅代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦订阅到消息, 回调此方法"""

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解码前的data为: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解码后的data为: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("温度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("湿度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("湿度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息发布代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要发布的消息内容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

测试

消息发布

运行 MQTT消息发布代码,将看到客户端连接成功,并且成功将消息发布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息订阅

通过瑞科慧联带温湿度传感器的 Sensor hub进行数据传输,订阅并解析数据结果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、总结

至此,我们完成了使用paho-mqtt客户端连接到LoRaWAN®网关内置 MQTT服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。

与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 物联网
    +关注

    关注

    2950

    文章

    48132

    浏览量

    418564
  • python
    +关注

    关注

    58

    文章

    4885

    浏览量

    90314
  • MQTT
    +关注

    关注

    5

    文章

    739

    浏览量

    25260
收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    RK3576 单板机 C/Python/MQTT 应用开发手册(二)

    本文为创龙科技RK3576 单板机应用开发指南,包含开发环境配置、GDB 调试、外设控制(LED/CAN/ 串口)、Python 开发及 MQTT 通信案例。提供完整编译命令、代码解析与实测
    的头像 发表于 04-21 10:43 79次阅读
    RK3576 单板机 C/<b class='flag-5'>Python</b>/<b class='flag-5'>MQTT</b> 应用开发手册(二)

    RK3576 单板机 C/Python/MQTT 应用开发手册(一)

    本文为创龙科技RK3576 单板机应用开发指南,包含开发环境配置、GDB 调试、外设控制(LED/CAN/ 串口)、Python 开发及 MQTT 通信案例。提供完整编译命令、代码解析与实测
    的头像 发表于 04-20 13:55 209次阅读
    RK3576 单板机 C/<b class='flag-5'>Python</b>/<b class='flag-5'>MQTT</b> 应用开发手册(一)

    何在 S32 配置工具中添加 ADC 并在 Simulink 中使用?

    对于 S32K3,如何在 S32 配置工具中添加 ADC 并在 Simulink 中使用?
    发表于 04-07 07:09

    何在 VisionFive 上使用 Python 包?

    VisionFive Fedora 下的本地目录,请在源代码目录下执行以下命令: 提示:源代码可从以下位置下载:愿景五.gpio. sudo yum install python
    发表于 03-30 08:28

    【创芯工坊】PowerWriter 0048 如何在其他IDE中使用PowerWriter的Debugger(烧录器常见使用问题)

    【创芯工坊】PowerWriter 0048 如何在其他IDE中使用PowerWriter的Debugger(烧录器常见使用问题)
    发表于 03-26 10:38

    何在 S32 DS 中使用 BMS GEN2 SDK?

    do not support the BJB MC33777. 如何在 S32 DS 中使用 BMS GEN2 SDK?
    发表于 03-23 08:16

    何在 Vision Five 2 上安装 python 库?

    这可能是一个完全愚蠢的问题,但我如何在 Vision Five 2 上安装 python 库。 使用该命令后,它给了我这个错误。 默认为用户安装,因为普通站点包不可写 错误:找不到满足要求
    发表于 03-06 07:51

    DR1平台Linux应用开发指南:含GDB调试、PythonMQTT实战

    流程,以及 LED、按键、CAN、TCP/UDP、串口等常用开发案例,同时覆盖 Python 脚本开发与 MQTT 消息发布 / 订阅实战。文档基于 Ubuntu22.04
    的头像 发表于 01-05 16:48 4827次阅读
    DR1平台Linux应用开发指南:含GDB调试、<b class='flag-5'>Python</b>及<b class='flag-5'>MQTT</b>实战

    何在AMD Vitis Unified IDE中使用系统设备树

    您将在这篇博客中了解系统设备树 (SDT) 以及如何在 AMD Vitis Unified IDE 中使用 SDT 维护来自 XSA 的硬件元数据。本文还讲述了如何对 SDT 进行操作,以便在 Vitis Unified IDE 中实现更灵活的使用场景。
    的头像 发表于 11-18 11:13 3323次阅读
    如<b class='flag-5'>何在</b>AMD Vitis Unified IDE<b class='flag-5'>中使</b>用系统设备树

    何在vivadoHLS中使用.TLite模型

    本帖欲分享如何在vivadoHLS中使用.TLite模型。在Vivado HLS中导入模型后,需要设置其输入和输出接口以与您的设计进行适配。 1. 在Vivado HLS项目中导入模型文件 可以
    发表于 10-22 06:29

    物联网MQTT网关是什么

    物联网MQTT网关是一种采用MQTT物联网协议的智能设备或软件组件,其核心功能是连接不同通信协议的物联网设备与消息代理服务器,实现设备间的数据交换与集中管理,同时支持边缘计算、安全防护和协议转换
    的头像 发表于 08-29 15:24 1275次阅读

    请问如何在 Keil μVision 或 IAR EWARM 中使用观察点进行调试?

    何在 Keil μVision 或 IAR EWARM 中使用观察点进行调试?
    发表于 08-20 06:29

    第二十三章 W55MH32 MQTT_OneNET示例

    本文讲解了如何在 W55MH32 芯片上实现 MQTT 协议并连接 OneNET 平台,通过实战例程展示了从准备工作、连接配置到消息订阅、发布及接收处理的完整过程。文章详细介绍了 MQTT 协议
    的头像 发表于 07-24 14:59 1358次阅读
    第二十三章 W55MH32 <b class='flag-5'>MQTT</b>_OneNET示例

    精通 MQTT:消息队列遥测传输指南!

    ,解释了其关键组件,并演示了如何使用Python实现MQTT客户端。MQTT代理MQTT系统的核心是代理,它负责管理客户端之间的消息交换。MQTT
    的头像 发表于 06-16 16:56 1136次阅读
    精通 <b class='flag-5'>MQTT</b>:消息队列遥测传输指南!

    何在MQTT中发布和订阅实体

    MQTT中发布和订阅实体(主题)是MQTT通信的核心操作,下面将详细介绍其原理、步骤以及示例代码,帮助你全面理解这一过程。 一、MQTT发布与订阅的基本概念 发布(Publish):客户端将
    的头像 发表于 05-20 17:21 1616次阅读