0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在Python中使用MQTT

瑞科慧联(RAK) 2022-12-22 10:41 次阅读

Python 是一种跨平台的计算机程序设计语言,是ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。

MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。

本文主要介绍如何在 Python 项目中使用paho-mqtt客户端库 ,实现客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。

一、项目准备

本项目使用 Python 3.10进行开发测试。

用户可用以下命令来确认 Python的版本:

python3 --version

Python 3.10.9

测试设备:

瑞科慧联(RAK)网关RAK7268 V2、带温湿度传感器的数据采集器Sensor Hub

二、选择 MQTT 客户端库

paho-mqtt是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。

三、Pip 安装 Paho MQTT 客户端

Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、连接 MQTT 服务器

本文将使用瑞科慧联LoRaWAN®网关提供的内置 MQTT服务,该服务基于 Mosquitto的开源消息代理。服务器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、导入 Paho MQTT客户端

from paho.mqtt import client as mqtt

3、设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Pythonrandom.randint函数随机生成 MQTT 客户端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、编写 MQTT 连接函数

编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据rc来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

5、发布消息

定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端publish函数向/python/mqtt主题发送消息。

ddefon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

6、订阅消息

编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代码

消息订阅代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦订阅到消息, 回调此方法"""

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解码前的data为: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解码后的data为: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("温度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("湿度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("湿度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息发布代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要发布的消息内容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

测试

消息发布

运行 MQTT消息发布代码,将看到客户端连接成功,并且成功将消息发布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息订阅

通过瑞科慧联带温湿度传感器的 Sensor hub进行数据传输,订阅并解析数据结果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、总结

至此,我们完成了使用paho-mqtt客户端连接到LoRaWAN®网关内置 MQTT服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。

与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 物联网
    +关注

    关注

    2909

    文章

    44534

    浏览量

    372694
  • python
    +关注

    关注

    56

    文章

    4792

    浏览量

    84620
  • MQTT
    +关注

    关注

    5

    文章

    650

    浏览量

    22482
收藏 人收藏

    评论

    相关推荐

    何在Python中使用socket

    和UDP。 2. 创建Socket 在Python中,我们使用 socket 模块来创建socket。以下是创建一个TCP socket的示例代码: import socket # 创建一个socket
    的头像 发表于 11-01 16:10 210次阅读

    何在智能手机系统中使用bq27505

    电子发烧友网站提供《如何在智能手机系统中使用bq27505.pdf》资料免费下载
    发表于 10-17 10:21 0次下载
    如<b class='flag-5'>何在</b>智能手机系统<b class='flag-5'>中使</b>用bq27505

    何在MSP430™MCU中使用智能模拟组合

    电子发烧友网站提供《如何在MSP430™MCU中使用智能模拟组合.pdf》资料免费下载
    发表于 09-14 10:19 0次下载
    如<b class='flag-5'>何在</b>MSP430™MCU<b class='flag-5'>中使</b>用智能模拟组合

    何在反向降压-升压拓扑中使用TPS6290x

    电子发烧友网站提供《如何在反向降压-升压拓扑中使用TPS6290x.pdf》资料免费下载
    发表于 09-13 10:07 0次下载
    如<b class='flag-5'>何在</b>反向降压-升压拓扑<b class='flag-5'>中使</b>用TPS6290x

    何在汽车CAN应用中使用负边缘触发触发器节省电力

    电子发烧友网站提供《如何在汽车CAN应用中使用负边缘触发触发器节省电力.pdf》资料免费下载
    发表于 09-13 10:06 0次下载
    如<b class='flag-5'>何在</b>汽车CAN应用<b class='flag-5'>中使</b>用负边缘触发触发器节省电力

    何在新兴的低轨卫星应用中使用数字隔离器隔离信号

    电子发烧友网站提供《如何在新兴的低轨卫星应用中使用数字隔离器隔离信号.pdf》资料免费下载
    发表于 09-12 09:37 0次下载
    如<b class='flag-5'>何在</b>新兴的低轨卫星应用<b class='flag-5'>中使</b>用数字隔离器隔离信号

    何在RTOS中使用spi_interface.c?

    何在 RTOS 中使用 spi_interface.c?
    发表于 07-10 06:29

    求助,请问如何在RTOS SDK 1.5的PlatformIO IDE ESP8266实现MQTT

    ESP8266设备连接到 mqtt 代理。但 PlatformIO IDE 内置的 RTOS SDK 1.5 版本不支持 mqtt。此 SDK 没有 mqtt 示例。所以你能不能让我知道我如
    发表于 07-08 06:22

    请问cmakelists中的变量如何在程序中使用?

    大家好, 我有个问题请教,cmakelists.txt中的变量如何在程序中使用?比如以下cmakelists.txt文件中的PROJECT_VER变量,我如何在c程序中使用?试了很多办
    发表于 06-11 07:34

    工业计算机是什么?如何在不同行业中使用?

    工业电脑是专为在工业环境中使用而设计的计算机。它们可用于各个行业,包括制造、运 输和能源。它们通常比普通计算机更强大,并且能够在大多数计算机无法运行的环境中运行。在本文中,我们将更深入地了解什么是工业计算机以及它们如何在不同行业中使
    的头像 发表于 04-01 15:45 787次阅读
    工业计算机是什么?如<b class='flag-5'>何在</b>不同行业<b class='flag-5'>中使</b>用?

    Raspberry Pi树莓派使用Python实现MQTT通信设计

    这次的例子,主要讲述如何基于PYTHONMQTT 客户端的使用方法
    的头像 发表于 03-14 11:45 794次阅读
    Raspberry Pi树莓派使用<b class='flag-5'>Python</b>实现<b class='flag-5'>MQTT</b>通信设计

    何在测试中使用ChatGPT

    Dimitar Panayotov 在 2023 年 QA Challenge Accepted 大会 上分享了他如何在测试中使用 ChatGPT。
    的头像 发表于 02-20 13:57 744次阅读

    如何使用linux下gdb来调试python程序

    中,我们将介绍如何在Linux中使用GDB来调试Python程序。 一、安装GDB和Python调试符号 在使用GDB调试Python程序之
    的头像 发表于 01-31 10:41 2561次阅读

    何在DAVE IDE中使用XMC7200?

    能否在 DAVE IDE 中为 XMC 7200 EVK KIT 构建应用程序。我尝试打开一个项目但它最多只能显示 XMC48000。如何在 DAVE IDE 中使用 XMC7200 请帮忙。
    发表于 01-26 06:32

    何在ModustoolBox中使用XMC4000系列库?

    我发现 BSP Assistant 只能用于 XMC7000 系列,但我使用的是 XMC4000 系列。 如何在 ModustoolBox 中使用 XMC4000 系列库?
    发表于 01-24 06:16