0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

浅析ESP32运行MQTT客户端进行主题的发布和订阅的方法

jf_Ufm3lVrj 来源:凌顺实验室 2024-03-18 10:05 次阅读

ESP32 MQTT的库有很多,凌顺实验室(lingshunlab.com)这次主要使用AsyncMQTT_ESP32,以后有机会再更多的MQTT其他库的使用方法。

前提条件

树莓派部署本地的MQTT服务端,具体安装请查看以下连接:

https://lingshunlab.com/book/raspberry-pi/raspberry-pi-install-mosquitto-mqtt-server-and-test-mqtt

ESP32和树莓派在同一WIFI网络里面

效果实现

凌顺实验室(lingshunlab.com)在本示例展示了使用两个ESP32,分别实现发布MQTT的主题消息和订阅并输出MQTT的主题内容。当然,可能会问能不能一个ESP32同时又是发布者,又是订阅者?答案是可以的,因为作为客户端,都是对中间商做信息交换。

BOM

需要准备2个ESP32,
一个ESP32用于发布,
一个ESP32用于订阅。

库的安装

可以在Arduino IDE的库管理里搜索并安装:

点击菜单栏的「工具」---> 「库管理」,然后在搜索框中输入“AsyncMQTT_ESP32”,点击安装即可

下图在我本地已经安装好了:

cf8d37e4-e402-11ee-a297-92fbcf53809c.png

又或者在Github中下载,并安装到Arduino的 "libraries"文件夹里

Github 地址:

https://github.com/khoih-prog/AsyncMQTT_ESP32

程序提点

1, 首先,需要加载AsyncMQTT_ESP32的库

#include 

2,配置MQTT的服务器信息,可以是IP或者域名的方式

//#define MQTT_HOST         IPAddress(192, 168, 100, 100)
#define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

3,设置主题,发布需要主题,订阅也需要主题

const char *Topic  = "lingshunlab/ESP32";               // 主题

4,创建MQTT客户端的实例

// 创建MQTT客户端的实例,名为mqttClient
AsyncMqttClient mqttClient;

5,认识mqttClient的可用的回调函数

当MQTT触发特定事件的时候,可以配置自定义的函数

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当MQTT取消订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息

6, 连接MQTT服务器

  mqttClient.setServer(MQTT_HOST, MQTT_PORT); //连接MQTT服务器

7,发布主题

通过以下代码,可以对配置好的主题发布消息

// 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);

8,订阅主题

通过以下代码,可以订阅配置好的主题

// 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);

9,当发生主题消息变化的时候的回调函数

mqttClient的回调函数有很多种,请仔细学习查看例子中其他的回调函数。在这里,特别说明一下onMessage的回调函数onMqttMessage(这个函数名称你可以自己定义,随喜),里面有不少参数,例如topic,payload等,其中payload即是消息的内容,可以通过输出显示。

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

10,请查看AsyncMQTT_ESP32的官方例子

可以学习到FreeRTOS的多线程如何应用。

程序代码

发布主题de完整代码

#include 

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载AsyncMQTT_ESP32库
#include  

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,100,100)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *PubTopic  = "lingshunlab/ESP32";               // 发布消息的主题

AsyncMqttClient mqttClient; // 创建 MQTT客户端实例

void onMqttConnect(bool sessionPresent)   // 编写对应的回调函数
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(PubTopic);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

void onMqttPublish(const uint16_t& packetId)
{
  Serial.println("Publish acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart(); // 重启esp32
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{
  // 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);
}

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,再之后每隔2秒发布一个对应主题的消息

cfa87fe0-e402-11ee-a297-92fbcf53809c.png

订阅主题de完整代码

#include 

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载 AsyncMQTT_ESP32 库
#include 

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,1,55)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *SubTopic  = "lingshunlab/ESP32";        // 订阅的主题

AsyncMqttClient mqttClient;

void onMqttConnect(bool sessionPresent)
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(SubTopic);

  // 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("=====On MQTT Subscribe=====");
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);  
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart();
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect);  // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT收到主题消息时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{

}

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,当连接MQTT时,则会订阅主题,之后每隔2秒就会收到主题发布的消息

cfc415ac-e402-11ee-a297-92fbcf53809c.png




审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 回调函数
    +关注

    关注

    0

    文章

    87

    浏览量

    11553
  • Arduino
    +关注

    关注

    188

    文章

    6468

    浏览量

    186906
  • 树莓派
    +关注

    关注

    116

    文章

    1705

    浏览量

    105599
  • MQTT
    +关注

    关注

    5

    文章

    650

    浏览量

    22479
  • ESP32
    +关注

    关注

    18

    文章

    968

    浏览量

    17194

原文标题:ESP32 运行MQTT客户端进行主题的发布和订阅

文章出处:【微信号:凌顺实验室,微信公众号:凌顺实验室】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    esp32c3想把数据上传到aws平台上,怎么保存历史记录呢?

    现在只能在mqtt测试客户端发布订阅消息,但一刷新界面,又得重新订阅主题,并且之前的消息记录都
    发表于 06-20 06:04

    MQTT协议介绍之三:发送和接收消息

    强烈建议您先阅读。上一节我们研究了建立MQTT客户端和代理之间的连接,所以本周我们将讨论发送和接收消息。发布MQTT客户端连接到代理后,可以
    发表于 08-25 19:54

    MQTT协议介绍之一:发布/订阅

    MQTT协议的内容。这里先介绍MQTT信息和协议背景。MQTT介绍MQTT客户端服务器发布/
    发表于 08-25 19:58

    MQTT 协议开发入门

    者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布
    发表于 11-05 09:38

    MQTT 协议 开发入门

    ,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消
    发表于 11-07 15:51

    MQTT的通信模型及消息

    同时是订阅者 。  MQTT 消息的发布者和订阅者都是客户端,服务器只是作为一个中转的存在,将发布
    发表于 01-19 15:57

    基于Eclipse paho-mqtt源码包的MQTT客户端设计(上)

    之间,它可以:接受来自客户的网络连接接收客户发布的应用信息处理来自客户端订阅和退订请求向订阅
    发表于 08-04 16:28

    基于Eclipse paho-mqtt源码包的MQTT客户端设计(下)

    ,如果有订阅的 Topic 没有设置回调函数,则使用该默认回调函数启动 MQTT 客户端配置完成 MQTT 客户端实例后,需要启动
    发表于 08-04 16:33

    esp32c3想把数据上传到aws平台上但怎么保存历史记录啊?

    现在只能在mqtt测试客户端发布订阅消息,但一刷新界面,又得重新订阅主题,并且之前的消息记录都
    发表于 02-17 08:19

    esp32-c3 mqtt遗嘱实验没成功的原因?

    ;/test" topic 的消息 消息内容cono 好像去掉了后面一部分。后来改变esp32-c3(发布者)的内容都还是订阅客户端这个topic出现。
    发表于 03-07 09:00

    如何在rtos mqtt示例中获取客户端句柄?

    中,我调用 代码:全选esp_mqtt_client_publish() 但我没有客户端句柄,它只在回调中设置。我尝试使用客户端句柄的全局副本,但它没有用,除了是个坏主意: 代码:全选
    发表于 05-15 08:13

    NodeMCU项目(三)MQTT客户端

    的限制。NodeMCU读取连接的温湿度传感器的信息,手机客户端订阅该信息,可以实现远程监控;NodeMCU订阅手机发布的命令主题,可以实现远
    发表于 11-05 17:05 1次下载
    NodeMCU项目(三)<b class='flag-5'>MQTT</b><b class='flag-5'>客户端</b>

    盘点2023年值得尝试的MQTT客户端工具

    随着物联网(IoT)的快速发展,MQTT协议被许多公司和开发人员广泛使用。在学习和使用 MQTT 的过程中,MQTT 客户端工具用于连接 MQTT
    的头像 发表于 07-13 10:11 5181次阅读
    盘点2023年值得尝试的<b class='flag-5'>MQTT</b><b class='flag-5'>客户端</b>工具

    MQTT中服务客户端

    服务器(broker),它是 MQTT 信息传输的枢纽,负责将 MQTT 客户端发送来的信息传递给 MQTT 客户端
    的头像 发表于 07-30 14:55 2618次阅读

    服务如何控制客户端之间的信息通讯

    进行管理。 比如上图所示,假设我们需要利用手机和电脑获取开发板在运行过程中 SoC 芯片的温度,那么首先电脑和手机这两个客户端需要向 MQTT服务器
    的头像 发表于 07-30 15:10 801次阅读
    服务<b class='flag-5'>端</b>如何控制<b class='flag-5'>客户端</b>之间的信息通讯