新手友好 | 简单几步快速实现MQTT与云平台通信

科技   2024-11-15 17:20   湖北  

MQTT介绍


MQTT 协议是当今世界上最受欢迎的物联网协议,没有之一。MQTT 协议为设备提供了稳定、可靠、简单易用的通信基础,截至目前通过 MQTT 协议连接的设备已经过亿,广泛应用于 IoT、M2M 等领域。


目前 MQTT 主流版本有 MQTT3.1.1 和 MQTT5。MQTT5 完全兼容 MQTT3.1.1,是在 MQTT3.1.1 的基础上进行完善补充。目前 MQTT3.1.1 的使用人数还是更多,所以本文用 MQTT3.1.1 来讲解。



1. MQTT是什么


MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)协议是基于 TCP/IP 协议栈构建的异步通信消息协议,是一种轻量级的、客户端服务端架构的、发布/订阅模式的消息传输协议。


MQTT协议最初版本是在1999年建立的,发明人是 Andy Stanford-Clark 和 Arlen Nipper。MQTT 是他们为了利用卫星通讯监控输油管道所开发的协议,由此可见,MQTT 就是专为低带宽、高延迟或不可靠的网络而设计的。


MQTT 协议特点:


•简单易用,方便集成

•安全可靠,支持TLS/SSL加密和认证机制

•轻量级,占用带宽小,支持多种消息传输模式

•灵活性,可知设备连接状态,可控数据传输质量



2. MQTT原理


在 MQTT 协议通讯中,最重要的两个角色是服务端和客户端。客户端向一「主题」「发布」消息,服务端处理并推送给「订阅」了该「主题」的其他客户端。


这么说是不是一头雾水?我打个比方,将整个 MQTT 比作我们熟悉的视频软件,一一对应关系如下。



假如你是张三,一名普通的抖音用户,你关注了李四的抖音账号。在这里,张三跟李四不会直接产生关系,而是会通过抖音服务器。抖音服务器就是「服务端」,所有抖音用户就是「客户端」,你关注李四的这个动作,就叫作「订阅」。


李四如果「发布」了一条视频,那么张三、王五、老六,等等所有关注了李四的粉丝都会收到这个视频推送。这是因为抖音里没有主题的概念,只要李四有发视频,粉丝都会收到推送。


这就是 MQTT 的基础概念。


2.1 服务端


MQTT 服务端通常是一台服务器,它充当着 MQTT 信息传输的中心节点。其主要功能是接收来自 MQTT 客户端的信息并将其传递给其他 MQTT 客户端。此外,MQTT 服务端还负责管理客户端,确保客户端之间的通信畅通无阻,并确保 MQTT 消息被正确接收和准确投递。



服务端一般就是云平台,OneNET、阿里云、腾讯云等;也可以用 EMQ 或 Mosquitto 自己搭建服务端。


2.2 客户端


MQTT 客户端可以向服务端通过「发布」发送信息,也可以从服务端「订阅」来收取信息。


客户端一般就是我们的单片机,STM32、C51、树莓派等。


2.3 主题


在 MQTT 通讯中,客户端订阅的是一个个「主题」。MQTT 服务端在管理信息通讯时,使用「主题」来控制。


2.4 发布与订阅的特点


相互独立:客户端相互独立,彼此没有直接联系,不用知道对方的任何状态、情况。


空间分离:客户端只要连接同一个 MQTT 通讯网络,无论是互联网或者局域网都可以通讯。


时间异步:客户端的发布与订阅无需同步。若有客户端断连,服务端保存信息,待客户端上线后推送。



3. MQTT报文


MQTT 协议通过交换预定义的 MQTT 控制报文来通信。MQTT 控制报文简称 MQTT 报文,接下来我将详细介绍 MQTT 报文。


3.1 报文结构


一个 MQTT 报文由固定报头、可变报头、有效载荷三部分组成:


固定报头(Fixed header),所有 MQTT 报文有,表示报文类型及报文的分组类标识。可变报头(Variable header),部分 MQTT 报文有,报文类型决定了可变头是否存在及其具体内容。有效载荷/消息体(Payload),部分 MQTT 报文有,存放报文的具体内容。示意图如下:


整体的报文结构介绍完,下面介绍每个的细节。如果看不懂,没关系,后面会有案例,更加清晰明了。


3.2 固定报头(Fixed header)


3.2.1 消息类型(message type)


位于 byte 1 的第 7~4 位,表示 MQTT 报文类型,有下面这么多类型:



3.2.2 标志位(DUP、QoS Level、RET)


位于 byte 1 的第 3~0 位,表示 MQTT 报文的分组类标识。在不使用标识位的消息类型中,标识位被做为保留位。如果收到无效的标志时,接收端必须关闭网络连接。


DUP:发布消息的副本

QoS:发布消息的服务质量

RETAIN:发布保留标识



3.2.3 剩余长度(Remaining Length)


位于 byte 2 的第 3~0 位,表示当前剩余字节数,包括可变报头和负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。


3.3 可变报头(Variable header)


某些 MQTT 报文有可变头。它在固定头和有效载荷之间。可变头的内容根据报文类型的不同而不同。可变头的报文标识符字段存在于在多个类型的报文里。


可变报头在后续的报文案例中会详细介绍。


3.4 有效载荷(Payload)


有效载荷就是应用消息,但并不是所有的报文都有有效载荷,只有部分 MQTT 报文有有效载荷,具体如下:




4. QoS,服务质量


QoS(Quality of Service,服务质量)。在数据通信的过程中,有的消息很重要,不可以丢失;有的消息不重要,丢了也没关系。所以在 MQTT 中可以配置 QoS,给不同重要的消息不同的服务质量。


MQTT 协议有三种服务质量级别:


QoS = 0:最多发一次

QoS = 1:最少发一次

QoS = 2:保证收一次 对于不同重要级的消息选择不同的 QoS,较为重要消息的使用 QoS = 1 和 QoS = 2。


4.1 QoS = 0:最多发一次


这种服务质量消息最多只发送一次。接收者不会发送响应,发送者也不会重试。消息可能送达一次也可能根本没送达。


想象你是一个快递员,而你要将包裹(消息)送到不同的收件人(订阅者)。QoS 级别就像你和收件人之间的交付服务等级,它决定了你在送货过程中提供的保证。


QoS 0(最多发一次)相当于你将包裹送给收件人后,没有任何确认回执。你只是简单地把包裹放在门口,然后离开。在这种情况下,你无法确定包裹是否成功被收件人接收,也无法知道是否有其他人偷了这个包裹。



4.2 QoS = 1:最少发一次


服务质量确保消息至少送达一次。QoS 1 的 PUBLISH 报文的可变报头中包含一个报文标识符,需要 PUBACK 报文确认。


QoS 1(最少发一次)相当于你在送货后要求收件人给你一个回执确认。你将包裹送给收件人,然后等待他给你一个回执,告诉你已经收到包裹。如果你没有收到回执,你会重新尝试送货,直到收到回执为止。这样,你可以确保包裹被收件人接收,但可能会增加一些延迟和工作量。



4.3 QoS = 2:保证收一次


这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。QoS 2 的消息可变报头中有报文标识符。QoS 2 的 PUBLISH 报文的接收者使用一个两步确认过程来确认收到。


QoS 2(保证收一次)相对于你要确认对方可以收货再发货。你在送货前给收件人发消息问他在不在家,收件人告诉你他在家,你把将包裹送给收件人,然后等待他给你一个回执,告诉你已经收到包裹。如果他没回消息,不在家,就继续发消息直到收件人回消息,告诉你他在家,再送包裹。




5. MQTT心跳机制


MQTT心跳机制可以比喻为人体的心脏跳动,两者都是为了维持正常的运行状态和连接的稳定性。


当MQTT客户端定期发送心跳包时,它就像是我们的心脏,定时地向服务器发送信号,表明自己的存在和健康状况。如果服务器在一定时间内没有接收到心跳包,就会认为客户端出现异常或离线,类似于身体出现问题时,医生会检查心跳情况来判断身体的健康状况。


客户端定时向服务端发送心跳请求(PINGREQ),告诉服务端,我还和你连接着哦。服务端收到心跳请求后,会回复一条心跳响应(PINGRESP),告诉客户端,我知道你还连着我啦。



通过心跳机制,MQTT 可以实时监测客户端的连接状态,及时发现和处理异常情况,确保通信的可靠性和稳定性。就像我们依赖心脏维持身体的正常运转一样,MQTT的心跳机制也是保障通信链路顺畅运行的重要机制之一。



6. MQTT遗嘱


遗嘱,和前面的心跳一样,有心跳请求证明客户端还连着服务端,客服端还活着。那么遗嘱就很生动形象了,客户端先把自己的遗嘱给服务端,万一客户端嘎了,服务端就可以执行遗嘱了。


MQTT遗嘱是一种机制,允许客户端在「活着」的时候设置并发送遗嘱消息,以便在客户端意外断线时由服务端公布。


意外断线指的是当客户端在没有发送 DISCONNECT 报文的情况下失去了心跳信号,这通常发生在网络故障或电池耗尽等情况下。此时,服务端会察觉到客户端的异常断开,并将客户端的遗嘱消息发布出来。然而,如果客户端正常断开连接并发送了 DISCONNECT 报文,遗嘱则不会启动,服务端也不会发布客户端的遗嘱消息。


通过合理设置和使用 MQTT 遗嘱机制,可以增强客户端在服务端管理中的作用,并提供实时的设备状态信息。



7. DHT11上传到OneNET平台


登入OneNet平台,创建一个DHT11设备如下:



记录:


设备ID:dht11_01

产品ID:16o95O4eF8

设备密钥:

bk9vd3h3VzFxTk1PMHdKMHNYbHdsUmFIcFdKZFZxd1g=


ESP8266初始化:


void esp8266_init(uint32_t baudrate){    printf("esp8266初始化开始...\r\n");    esp8266_uart_init(baudrate);        //esp8266其他初始化    printf("1. 测试esp8266是否存在...\r\n");    while(esp8266_at_test())        delay_ms(500);        printf("2. 设置工作模式为STA...\r\n");    while(esp8266_set_mode(ESP8266_STA_MODE))        delay_ms(500);        printf("3. 设置单路链接模式...\r\n");    while(esp8266_connection_mode(ESP8266_SINGLE_CONNECTION))        delay_ms(500);        printf("4. 连接wifi,SSID: %s, PWD: %s\r\n", WIFI_SSID, WIFI_PWD);    while(esp8266_join_ap(WIFI_SSID, WIFI_PWD))        delay_ms(1500);        printf("5. 连接TCP服务器,server_ip:%s, server_port:%s\r\n", TCP_SERVER_IP, TCP_SERVER_PORT);    while(esp8266_connect_tcp_server(TCP_SERVER_IP, TCP_SERVER_PORT))        delay_ms(500);        printf("6. 进入透传模式..\r\n");    while(esp8266_enter_unvarnished())        delay_ms(500);        printf("ESP8266已连接上TCP服务器并进入透传模式\r\n");    printf("ESP8266初始化完成\r\n");}


MQTT初始化:


void mqtt_init(void){    mqtt_login_init(PRODUCT_KEY,DEVICE_NAME,DEVICE_SECRET);    //缓冲区赋值    mqtt_rxbuf = _mqtt_rxbuf;    mqtt_rxlen = sizeof(_mqtt_rxbuf);    mqtt_txbuf = _mqtt_txbuf;    mqtt_txlen = sizeof(_mqtt_txbuf);    memset(mqtt_rxbuf,0,mqtt_rxlen);    memset(mqtt_txbuf,0,mqtt_txlen);        //无条件先主动断开    mqtt_disconnect();    delay_ms(100);    mqtt_disconnect();    delay_ms(100);}


宏定义设备ID,产品ID,设备密钥:


//云服务器设备证书#define PRODUCT_KEY "16o95O4eF8"#define DEVICE_NAME "dht11_01"#define DEVICE_SECRET "J6OmW6IjqBcXO9beNPOdkxI3Ves%3D"
//订阅与发布主题#define RELY_PUBLISH_TOPIC  "$sys/16o95O4eF8/dht11_01/thing/property/set_reply"  #define SET_TOPIC  "$sys/16o95O4eF8/dht11_01/thing/property/set"#define POST_TOPIC "$sys/16o95O4eF8/dht11_01/thing/property/post"


获得DHT11标识符,实现JSON解析数据上传云平台:



#include "sys.h"#include "delay.h"#include "uart1.h"#include "dht11.h"#include "esp8266.h"#include "onenet.h"
int main(void){    uint8_t data_send[512] = {0};    uint8_t dht11_data[4] = {0};        while(1)    {            memset(dht11_data, 0, 4);        dht11_read(dht11_data);        sprintf((char *)data_send, "{\"id\":\"1386772172\",\"version\":\"1.0\",\"params\":{\"CurrentTemperature\":{\"value\":%d.%d},\"CurrentHumidity\":{\"value\":%d.%d}}}",            dht11_data[2], dht11_data[3], dht11_data[0], dht11_data[1]);              mqtt_publish_data(POST_TOPIC, (char *)data_send, 0);                delay_ms(3000);                printf("\r\n~~~~~~~~~~~~~~~~~发送心跳包~~~~~~~~~~~~~~~~~\r\n");        mqtt_send_heart();        printf("\r\n~~~~~~~~~~~~~~~~~心跳包结束~~~~~~~~~~~~~~~~~\r\n");    }}


实验结果:














讲师招募


招募要求

完成符合要求的机器人相关视频制作

总时长需达到 3小时以上

视频内容需为精品课程,确保高质量和专业性


讲师奖励

享受课程收入分成

赠送 2门 古月学院在售精品课程(训练营除外)


联系我们

添加工作人员微信:GYH-xiaogu




古月居
专业的ROS机器人知识社区和产业服务平台
 最新文章