论坛网站模板源码下载,修改网站的设计,深圳平台推广,农村建设网站的重要性作为嵌入式开发爱好者#xff0c;在完成 ESP8266 结合 MQTT 协议的物联网通信实战后#xff0c;我决定将整个过程记录并分享出来。本文以正点原子 F429 开发板 ESP8266 无线模块为硬件载体#xff0c;基于 RT-Thread Studio 开发环境#xff0c;搭配 EMQX 云服务器#x…作为嵌入式开发爱好者在完成 ESP8266 结合 MQTT 协议的物联网通信实战后我决定将整个过程记录并分享出来。本文以正点原子 F429 开发板 ESP8266 无线模块为硬件载体基于 RT-Thread Studio 开发环境搭配 EMQX 云服务器服务端和 MQTTXPC 客户端实现 MCU 与云端的 MQTT 双向通信。整个过程涵盖硬件连接、软件配置、服务器部署及代码开发希望能为同学们提供一些参考。一、配置 UART 串口实现 MCU 与 ESP8266 通信我的硬件方案中MCU 与 ESP8266 采用串口通信具体使用开发板的 UART3 接口连接 ESP8266 的串口引脚。要实现这一通信链路首先需要在 RT-Thread Studio 中完成 UART3 的驱动配置与引脚映射1.打开 RT-Thread Settings启用串口设备驱动程序组件这是 RT-Thread 操作硬件串口的基础2.在board.h文件中添加 UART3 的引脚定义代码指定 TX/RX 对应的硬件引脚#define BSP_USING_UART3 #define BSP_UART3_TX_PIN PB10 #define BSP_UART3_RX_PIN PB11注串口的波特率、数据位等参数可参考 ESP8266 模块的默认配置正点原子 F429 开发板保持默认即可。配置完成后可通过 RT-Thread 的list device命令查看串口设备是否成功挂载这是验证串口配置是否生效的关键步骤。二、配置 AT-DEVICE 软件包实现 ESP8266 联网ESP8266 模块通过 AT 指令与 MCU 交互RT-Thread 提供的at_device软件包已封装好 ESP8266 的 AT 指令集能大幅简化联网开发流程具体配置步骤如下1.在 RT-Thread Studio 的软件包中心搜索并添加at_device软件包2.启用软件包中乐鑫 ESP8266 的驱动支持并打开示例工程在配置项中填写需要连接的 WiFi 名称、密码以及设备的标识名称3.进入组件的网络配置项启用 AT 客户端功能其余配置保持默认即可。完成上述配置后编译并下载程序到开发板此时可在串口终端中看到 ESP8266 连接 WiFi 的日志信息提示 “wifi 已经连接成功”。为了进一步验证网络通畅性还可以使用ping命令如ping www.bing.com测试模块的网络连接状态。三、部署 EMQX 云服务器搭建 MQTT 服务端MQTT 通信需要服务端作为消息中转本文选用 EMQX Cloud云端 MQTT 服务器其提供的 Serverless 版本可免费使用需注意该版本仅支持 TLS 协议具体部署步骤如下1.访问 EMQX Cloud 官网https://cloud.emqx.com/console/deployments注册账号并创建项目2.在项目中新建 Serverless 类型的部署等待部署完成3.部署添加客户端认证信息自定义连接所需的用户名和密码4.下载服务器的 CA 证书TLS 连接必需并记录服务器的连接地址、端口、认证账号密码等关键参数这些是客户端连接的核心信息。部署完成后可使用 MQTTXPC 端 MQTT 客户端工具创建客户端并连接该服务器若 EMQX Cloud 后台显示连接数为 1说明服务器配置成功。四、配置 MQTT 客户端实现 MCU 与服务器通信RT-Thread 集成了 Paho MQTT 客户端库结合前文的 TLS 协议要求需完成以下配置并编写代码1.启用 Paho MQTT 客户端库由于使用 TLS 协议需确保 MQTT 线程的栈大小大于 6144 字节否则会编译报错2.配置 TLS 协议相关参数本文选择使用用户证书的方式将下载的 CA 证书拷贝到项目的certs文件夹根目录执行scons命令重新编译使证书内容被加载到const char mbedtls_root_certificate[]数组中3.需注意配置足够的最大字节数若该值过小会导致证书验证失败。4.完成上述配置后参考 MQTT 例程编写 MCU 客户端的代码代码见下文实现 MQTT 连接、消息发布、订阅及回调处理等功能。#include stdlib.h #include string.h #include stdint.h #include rtthread.h // RT-Thread操作系统核心头文件 // 启用RT-Thread的调试日志功能配置 #define DBG_ENABLE #define DBG_SECTION_NAME mqtt_port // 调试日志的模块名称便于区分日志来源 #define DBG_LEVEL DBG_LOG // 日志级别LOG级别包含INFO/DEBUG/ERROR等 #define DBG_COLOR // 启用日志颜色便于终端区分不同级别日志 #include rtdbg.h // RT-Thread调试日志头文件 #include paho_mqtt.h // Paho MQTT客户端库核心头文件 // 条件编译启用TLS时显式包含TLS客户端相关头文件库头文件已包含此处为冗余保障 #ifdef MQTT_USING_TLS #include tls_client.h // RT-Thread的mbedtls封装客户端头文件 #endif #include main.h // 硬件相关的主头文件如LED引脚定义、HAL库等 /** * MQTT相关配置宏定义TLS版本 * 说明基于TCP正常工作版本改造仅将协议改为sslTLS其余配置保持一致 */ // MQTT TLS连接的URIssl协议 服务器地址 TLS端口8883为MQTTS默认端口 #define MQTT_URI ssl://p6121ba8.ala.cn-hangzhou.emqxsl.cn:8883 // 发布主题STM32向PC发送数据的主题stm32 to pc #define MQTT_PUBTOPIC /stm32topc/test // 订阅主题STM32接收PC数据的主题pc to stm32 #define MQTT_SUBTOPIC /pctostm32/test // 遗嘱消息当STM32异常断开连接时MQTT服务器会自动发布此消息到指定主题 #define MQTT_WILLMSG Goodbye! // MQTT服务器认证信息用户名密码需与服务器配置一致 #define MQTT_USERNAME #define MQTT_PASSWORD /* 全局变量定义 */ static MQTTClient client; // MQTT客户端上下文结构体存储客户端所有状态和配置 static int is_started 0; // MQTT客户端启动状态标记0-未启动1-已启动 // ************************* MQTT回调函数定义 ************************* /** * brief 预设订阅主题的消息回调函数对应MQTT_SUBTOPIC * param c: MQTT客户端上下文指针库传入避免依赖全局变量提升健壮性 * param msg_data: 接收到的消息数据包含主题和有效载荷 * note 1. 修复了原代码中使用全局client的问题改为使用传入的c指针 * 2. 处理PC发送的消息如LED控制并将接收到的消息回发至发布主题 */ static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data) { // 安全处理确保消息有效载荷以\0结尾避免字符串越界访问 if (msg_data-message-payloadlen c-readbuf_size - 1) { *((char *)msg_data-message-payload msg_data-message-payloadlen) \0; } // 打印接收到的订阅消息主题内容 LOG_D(mqtt sub callback: %.*s %s, msg_data-topicName-lenstring.len, // 主题长度 msg_data-topicName-lenstring.data, // 主题内容 (char *)msg_data-message-payload); // 消息内容 // 业务逻辑根据接收到的消息控制LED灯 if (rt_strcmp((char *)msg_data-message-payload, led on) 0) { HAL_GPIO_WritePin(LED0_GPIO_Port, LED1_Pin, GPIO_PIN_RESET); // LED亮根据硬件电路配置 rt_kprintf(led on\n); } else if (rt_strcmp((char *)msg_data-message-payload, led off) 0) { HAL_GPIO_WritePin(LED0_GPIO_Port, LED1_Pin, GPIO_PIN_SET); // LED灭根据硬件电路配置 rt_kprintf(led off\n); } // 额外逻辑将接收到的消息回发至发布主题实现回声功能 int ret paho_mqtt_publish(client, QOS1, MQTT_PUBTOPIC, (char *)msg_data-message-payload); if (ret 0) { LOG_D(publish to default topic %s: %s, MQTT_PUBTOPIC, (char *)msg_data-message-payload); } else { LOG_E(publish to default topic %s failed, ret: %d, MQTT_PUBTOPIC, ret); } } /** * brief MQTT默认订阅消息回调函数未匹配预设主题的消息会进入此回调 * param c: MQTT客户端上下文指针 * param msg_data: 接收到的消息数据 * note 处理所有未被预设主题过滤器匹配的订阅消息 */ static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data) { // 安全处理确保消息有效载荷以\0结尾 if (msg_data-message-payloadlen c-readbuf_size - 1) { *((char *)msg_data-message-payload msg_data-message-payloadlen) \0; } // 打印默认回调的消息 LOG_D(mqtt sub default callback: %.*s %s, msg_data-topicName-lenstring.len, msg_data-topicName-lenstring.data, (char *)msg_data-message-payload); } /** * brief MQTT连接成功回调函数 * param c: MQTT客户端上下文指针 * note 可在此添加连接成功后的初始化逻辑如发布上线消息 */ static void mqtt_connect_callback(MQTTClient *c) { LOG_D(enter mqtt_connect_callback!); } /** * brief MQTT客户端上线回调函数 * param c: MQTT客户端上下文指针 * note 客户端成功连接并上线时触发 */ static void mqtt_online_callback(MQTTClient *c) { LOG_D(enter mqtt_online_callback!); } /** * brief MQTT客户端离线回调函数 * param c: MQTT客户端上下文指针 * note 客户端断开连接主动/异常时触发可在此添加重连逻辑或资源清理 */ static void mqtt_offline_callback(MQTTClient *c) { LOG_D(enter mqtt_offline_callback!); } // ************************* MQTT客户端核心功能函数 ************************* /** * brief 启动MQTT客户端注册为RT-Thread的MSH命令mqtt_start * param argc: 命令行参数个数MSH命令调用时argc1表示无额外参数 * param argv: 命令行参数数组 * return 0-成功-1-失败 * note 1. 基于TCP正常版本改造仅新增TLS相关配置库原生处理TLS无需手动创建mbedtls会话 * 2. 分步骤配置MQTT客户端参数、申请内存、设置回调、启动客户端线程 */ static int mqtt_start(int argc, char **argv) { // MQTT连接参数初始化使用库提供的初始化宏避免手动赋值遗漏 MQTTPacket_connectData condata MQTTPacket_connectData_initializer; static char cid[20] {0}; // 存储客户端ID需唯一避免冲突 // 命令行参数校验mqtt_start命令不接受额外参数 if (argc ! 1) { rt_kprintf(mqtt_start --start a mqtt worker thread.\n); return -1; } // 状态校验避免重复启动MQTT客户端 if (is_started) { LOG_E(mqtt client is already connected.); return -1; } /* 第一步配置MQTT客户端上下文参数 */ { client.isconnected 0; // 初始化连接状态未连接 client.uri MQTT_URI; // 设置MQTT连接的URITLS版本为ssl://xxx:8883 /* 生成唯一的客户端ID基于系统tick值确保每次启动的ID不同避免冲突 */ rt_snprintf(cid, sizeof(cid), rtthread%d, rt_tick_get()); /* 配置MQTT连接核心参数 */ rt_memcpy(client.condata, condata, sizeof(condata)); // 拷贝初始化的连接参数 client.condata.clientID.cstring cid; // 设置客户端ID client.condata.keepAliveInterval 30; // 心跳间隔30秒避免服务器判定客户端离线 client.condata.cleansession 1; // 清理会话断开后清除服务器端的会话状态 /* 配置MQTT服务器认证信息用户名密码 */ client.condata.username.cstring MQTT_USERNAME; client.condata.password.cstring MQTT_PASSWORD; /* 配置MQTT遗嘱消息参数客户端异常断开时触发 */ client.condata.willFlag 1; // 启用遗嘱消息 client.condata.will.qos 1; // 遗嘱消息的QoS等级1至少一次送达 client.condata.will.retained 0; // 不保留遗嘱消息 client.condata.will.topicName.cstring MQTT_PUBTOPIC; // 遗嘱消息的发布主题 client.condata.will.message.cstring MQTT_WILLMSG; // 遗嘱消息内容 /* 申请MQTT发送/接收缓冲区使用rt_calloc自动初始化为0 */ client.buf_size client.readbuf_size 1024; // 缓冲区大小1024字节可根据需求调整 client.buf rt_calloc(1, client.buf_size); // 发送缓冲区 client.readbuf rt_calloc(1, client.readbuf_size); // 接收缓冲区 // 内存申请失败处理 if (!(client.buf client.readbuf)) { LOG_E(no memory for MQTT client buffer!); return -1; } /* 设置MQTT事件回调函数 */ client.connect_callback mqtt_connect_callback; // 连接成功回调 client.online_callback mqtt_online_callback; // 客户端上线回调 client.offline_callback mqtt_offline_callback; // 客户端离线回调 /* 配置预设订阅主题提前订阅MQTT_SUBTOPIC无需手动调用subscribe命令 */ client.messageHandlers[0].topicFilter rt_strdup(MQTT_SUBTOPIC); // 动态复制主题字符串避免常量指针问题 client.messageHandlers[0].callback mqtt_sub_callback; // 绑定主题的回调函数 client.messageHandlers[0].qos QOS1; // 订阅的QoS等级1 /* 设置默认订阅回调函数处理未匹配预设主题的消息 */ client.defaultMessageHandler mqtt_sub_default_callback; // 第二步TLS相关配置库原生支持无需手动创建mbedtls会话仅做可选配置 #ifdef MQTT_USING_TLS // 可选配置1手动创建TLS会话若库未自动处理需调用tls_client_create接口根据实际库版本调整 // client.tls_session tls_client_create(MQTT_URI); // 可选配置2关闭TLS证书验证测试环境使用生产环境强烈不推荐存在安全风险 // tls_client_set_verify(client.tls_session, 0); #endif // 可选配置将发布模式改为非阻塞避免库中pub_sem信号量死锁问题也可修复库的信号量释放逻辑 // paho_mqtt_control(client, MQTT_CTRL_PUBLISH_BLOCK, RT_NULL); } /* 第三步启动MQTT客户端线程库会自动处理TLS连接、管道、信号量的初始化 */ if (paho_mqtt_start(client) ! 0) { LOG_E(start mqtt client failed!); // 启动失败时释放已申请的内存避免内存泄漏 rt_free(client.buf); rt_free(client.readbuf); rt_free(client.messageHandlers[0].topicFilter); return -1; } is_started 1; // 更新客户端启动状态 LOG_I(mqtt client start success, client ID: %s, cid); // 打印启动成功信息包含客户端ID return 0; } /** * brief 停止MQTT客户端注册为RT-Thread的MSH命令mqtt_stop * param argc: 命令行参数个数 * param argv: 命令行参数数组 * return 0-成功-1-失败 * note 1. 停止MQTT客户端线程释放所有申请的内存 * 2. 库会自动处理TLS连接、管道、信号量的释放无需手动操作 */ static int mqtt_stop(int argc, char **argv) { // 命令行参数提示若传入额外参数打印使用说明 if (argc ! 1) { rt_kprintf(mqtt_stop --stop mqtt worker thread and free mqtt client object.\n); } // 仅在客户端已启动时执行停止逻辑 if (is_started) { // 停止MQTT客户端线程库自动释放TLS连接、管道、信号量等资源 paho_mqtt_stop(client); is_started 0; // 重置启动状态 /* 释放申请的内存避免内存泄漏 */ if (client.buf) { rt_free(client.buf); client.buf RT_NULL; } if (client.readbuf) { rt_free(client.readbuf); client.readbuf RT_NULL; } if (client.messageHandlers[0].topicFilter) { rt_free(client.messageHandlers[0].topicFilter); client.messageHandlers[0].topicFilter RT_NULL; } LOG_I(mqtt client stop success, memory released); } else { LOG_W(mqtt client is not running); // 提示客户端未启动 } return 0; } /** * brief MQTT发布消息函数注册为RT-Thread的MSH命令mqtt_publish * param argc: 命令行参数个数2-发布到默认主题3-发布到指定主题 * param argv: 命令行参数数组argv[1]消息/主题argv[2]消息 * return 0-成功-1-失败 * note 1. 修复了原代码中未判断发布返回值的问题添加了错误日志 * 2. 支持两种发布模式默认主题、指定主题 */ static int mqtt_publish(int argc, char **argv) { int ret 0; // 存储发布操作的返回值 // 状态校验客户端未启动时禁止发布消息 if (is_started 0) { LOG_E(mqtt client is not connected.); return -1; } // 模式1发布到默认主题mqtt_publish message if (argc 2) { ret paho_mqtt_publish(client, QOS1, MQTT_PUBTOPIC, argv[1]); if (ret 0) { LOG_D(publish to default topic %s: %s, MQTT_PUBTOPIC, argv[1]); } else { LOG_E(publish to default topic %s failed, ret: %d, MQTT_PUBTOPIC, ret); } } // 模式2发布到指定主题mqtt_publish topic message else if (argc 3) { ret paho_mqtt_publish(client, QOS1, argv[1], argv[2]); if (ret 0) { LOG_D(publish to topic %s: %s, argv[1], argv[2]); } else { LOG_E(publish to topic %s failed, ret: %d, argv[1], ret); } } // 参数错误打印使用说明 else { rt_kprintf(mqtt_publish message --mqtt publish message to default topic.\n); rt_kprintf(mqtt_publish topic message --mqtt publish message to specified topic.\n); return -1; } return ret; } // ************************* MQTT订阅/取消订阅功能函数 ************************* /** * brief 新订阅主题的消息回调函数手动订阅时使用区别于预设主题的回调 * param client: MQTT客户端上下文指针 * param msg_data: 接收到的消息数据 * note 处理通过mqtt_subscribe命令手动订阅的主题消息 */ static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data) { // 安全处理确保消息有效载荷以\0结尾 if (msg_data-message-payloadlen client-readbuf_size - 1) { *((char *)msg_data-message-payload msg_data-message-payloadlen) \0; } // 打印手动订阅的消息 LOG_D(mqtt new subscribe callback: %.*s %s, msg_data-topicName-lenstring.len, msg_data-topicName-lenstring.data, (char *)msg_data-message-payload); } /** * brief MQTT订阅主题函数注册为RT-Thread的MSH命令mqtt_subscribe * param argc: 命令行参数个数必须为2即mqtt_subscribe topic * param argv: 命令行参数数组argv[1]要订阅的主题 * return 0-成功-1-失败 * note 手动订阅指定主题使用mqtt_new_sub_callback处理消息 */ static int mqtt_subscribe(int argc, char **argv) { // 参数校验必须传入订阅的主题 if (argc ! 2) { rt_kprintf(mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n); return -1; } // 状态校验客户端未启动时禁止订阅 if (is_started 0) { LOG_E(mqtt client is not connected.); return -1; } // 调用库接口订阅主题QoS1 int ret paho_mqtt_subscribe(client, QOS1, argv[1], mqtt_new_sub_callback); if (ret 0) { LOG_D(subscribe topic %s success, argv[1]); } else { LOG_E(subscribe topic %s failed, ret: %d, argv[1], ret); } return ret; } /** * brief MQTT取消订阅主题函数注册为RT-Thread的MSH命令mqtt_unsubscribe * param argc: 命令行参数个数必须为2即mqtt_unsubscribe topic * param argv: 命令行参数数组argv[1]要取消订阅的主题 * return 0-成功-1-失败 * note 取消之前订阅的指定主题 */ static int mqtt_unsubscribe(int argc, char **argv) { // 参数校验必须传入取消订阅的主题 if (argc ! 2) { rt_kprintf(mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for unsuback before returning.\n); return -1; } // 状态校验客户端未启动时禁止取消订阅 if (is_started 0) { LOG_E(mqtt client is not connected.); return -1; } // 调用库接口取消订阅主题 int ret paho_mqtt_unsubscribe(client, argv[1]); if (ret 0) { LOG_D(unsubscribe topic %s success, argv[1]); } else { LOG_E(unsubscribe topic %s failed, ret: %d, argv[1], ret); } return ret; } // 导出命令mqtt_start - 启动MQTT客户端 MSH_CMD_EXPORT(mqtt_start, startup mqtt client); // 导出命令mqtt_stop - 停止MQTT客户端 MSH_CMD_EXPORT(mqtt_stop, stop mqtt client); // 导出命令mqtt_publish - 发布消息到指定主题 MSH_CMD_EXPORT(mqtt_publish, mqtt publish message to specified topic); // 导出命令mqtt_subscribe - 订阅指定主题 MSH_CMD_EXPORT(mqtt_subscribe, mqtt subscribe topic); // 导出命令mqtt_unsubscribe - 取消订阅指定主题 MSH_CMD_EXPORT(mqtt_unsubscribe, mqtt unsubscribe topic);烧写程序到开发板后通过串口终端输入mqtt_start命令启动 MQTT 线程此时 MCU 即可与 PC 端的 MQTTX 客户端实现双向通信MCU 可接收 PC 发送的消息如控制 LED 亮灭并将消息回发至指定主题PC 也可接收 MCU 发布的消息。五、实战小结第一次配置这套方案时我遇到了不少问题比如 TLS 证书验证失败、MQTT 线程栈空间不足、消息发布订阅无响应等。解决这些问题的关键是开启 RT-Thread 的日志功能通过日志信息逐步定位问题所在 —— 比如证书验证失败时检查证书路径和最大字节数配置线程异常时调整栈空间大小。只要耐心排查这些问题都能逐一解决。