手把手搭建自己私有的MQTT服务器,完成设备上云

2024-09-19 13:21   重庆  

一、前言

在众多物联网通信协议中,MQTT(Message Queuing Telemetry Transport)因其轻量、高效的特点而被广泛应用于各种物联网场景。它不仅能够满足设备低功耗的需求,还支持设备与云端之间稳定可靠的消息传递,是实现设备上云的理想选择之一。本文介绍利用EMQX服务器搭建自己私有的MQTT服务器,构建一套完整的物联网解决方案,涵盖从设备接入到数据存储,再到数据转发及上位机开发等多个关键环节。

EMQX,全称为Erlang/Enterprise Middleware MQTT Broker,是一款基于Erlang/OTP平台开发的开源物联网消息中间件。它专为大规模物联网应用设计,能够处理海量并发连接,并提供稳定的消息发布/订阅服务。作为一款高性能的MQTT协议服务器,EMQX不仅支持标准的MQTT v3.1、v3.1.1以及最新的v5.0版本协议,还提供了丰富的扩展功能来满足不同场景下的需求。

EMQX的核心优势在于其卓越的性能表现和高度可伸缩性。单个EMQX集群可以轻松管理数百万级别的设备连接,同时保持低延迟的消息传递能力。这使得EMQX成为构建大型物联网系统时的理想选择之一。此外,通过灵活配置规则引擎,用户可以根据业务逻辑定制化处理接收到的数据流,实现复杂事件处理、数据转换等功能。例如,当特定条件被触发时,可以自动执行预设的动作或将信息转发给其他系统进行进一步分析。

安全性方面,EMQX支持多种认证机制如用户名密码验证、客户端证书验证等,以确保只有授权用户才能访问敏感资源;同时也提供了TLS/SSL加密通信能力,保障了数据传输过程中的安全性和完整性。对于需要严格遵守行业标准的企业来说,这些特性尤为重要。

在集成度方面,EMQX展现了极高的灵活性与兼容性。无论是与其他数据库系统的对接(如MySQL, PostgreSQL, MongoDB等),还是与各种云服务提供商(如阿里云、AWS)的无缝衔接,EMQX都能很好地适应并促进整个生态系统的健康发展。EMQX还配备了详细的文档资料和技术支持服务,帮助开发者快速上手并解决遇到的问题。


本文章里用到的全部工具软件都可以在这里下载(放在网盘里了)。

https://pan.quark.cn/s/145a9b3f7f53


二、准备服务器

EMQX-服务器部署目前支持以下操作系统:

RedHat 

CentOS 

RockyLinux 

AmazonLinux 

Ubuntu 

Debian 

macOS 

Linux

 Windows(旧版本是支持的,新版不支持Windows系统,官网没有下载链接了

前期测试阶段,华为云、阿里云、腾讯云这些平台都有免费试用的服务器,可以领取一个月ECS服务器试用测试。

也可以自己本地Windows电脑上搭建服务器,这个我在另一篇文章里有详细介绍,还有对应的视频(Windows系统搭建)。

搭建自己的MQTT服务器、实现设备上云(windows+EMQX)


我下面是领取了华为云一个月的ECS服务器搭建此项目的服务器用于进行整体项目的测试。


2.1 登录官网

https://www.huaweicloud.com/


2.2 领取一个月的服务器

【1】选择服务器

配置过程中,系统选择ubuntu18.04



【2】返回弹性服务器的控制台

服务器领取配置好之后回到主界面。


【3】点击服务器名字,可以进入到详情页面。



2.3 配置安全组

要确保MQTT服务器常用的几个端口已经开放出出来。


2.4 安装FinalShell

Windows下安装 FinalShell 终端,方便使用SSH协议远程登录到云服务器。(当然,使用其他方式登录也是一样的)



2.5 远程登录到云服务器终端

【1】新建连接,选择SSH连接。


【2】填入IP地址、用户名、密码

这里的主机就是填服务器的公网IP地址,密码就是创建服务器输入的密码,用户名直接用root。



【3】点击连接服务器


【4】第一次登录会弹出提示框,选择接受并保存



【5】接下来可以看到服务器已经登录成功了。




三、Linux下安装EMQX

本章节将介绍如何在 Ubuntu 系统中下载安装并启动 EMQX。

支持的 Ubuntu 版本:

  • Ubuntu 22.04

  • Ubuntu 20.04

  • Ubuntu 18.04

2.1 官网地址

链接:https://www.emqx.io/docs/zh/v5.2/deploy/install-ubuntu.html

2.2 通过Apt源安装

EMQX 支持通过 Apt 源安装,免除了用户需要手动处理依赖关系和更新软件包等的困扰,具有更加方便、安全和易用等优点。

在命令行终端,复制下面的命令过去,按下回车键。

【1】通过以下命令配置 EMQX Apt 源:

curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash

【2】运行以下命令安装 EMQX:

sudo apt-get install emqx

【3】运行以下命令启动 EMQX:

sudo systemctl start emqx


过程如下:




2.3 EMQX常用的命令

sudo systemctl emqx start    启动
sudo systemctl emqx stop 停止
sudo systemctl emqx restart 重启




四、配置EMQX-MQTT服务器

4.1 登录EMQX内置管理控制台

EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。

在浏览器里输入: http://122.112.225.194:18083 就可以访问EMQX的后台管理页面。可以管理以连接的客户端或检查运行状态。

这里面的IP地址,就是自己ECS云服务器的公网IP地址。


打开浏览器后,输入地址后打开的效果:


默认用户名和密码:

用户名:admin
密码:public

第一次登录会提示你修改新密码,如果不想设置,也可以选择跳过(公网服务器部署,还是要修改密码安全些)。


下面修改新密码:

登录成功的页面显示如下:



4.2 MQTT配置

这里可以配置MQTT的一些参数,根据自己的需求进行配置。



4.3 测试MQTT通信

新建一个客户端,点击连接。


连接之后,然后点击订阅,和发布,如果下面消息能正常的接收。说明MQTT服务器通信是已经正常,没问题了。

并且在这个页面也可以看到主题发布主题订阅的格式。



4.4 MQTT客户端登录服务器测试

接下来就打开我们自己的MQTT客户端登录MQTT服务器进行测试数据的通信。

端口选择: 1883

根据软件参数填入参数,登录,进行主题的发布和订阅。



说明:目前还没有配置客户端认证,现在只要IP和端口输入正确,MQTT三元组可以随便输入,都可以登录上服务器的,服务器没有对三元组做校验。

EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。


然后打开EMQX的管理后台,可以看到我们的设备已经登录服务器了,名字为test1



在订阅主题的页面也可以看到我们客户端设备订阅的主题。



4.5 客户端认证配置

EMQX 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。

在正式产品里肯定是要启用认证的,不然任何设备都能接入。

下面就介绍如何配置 客户端认证。

【1】打开客户端认证页面


【2】选择密码认证


【3】选择内置数据库


【4】设置认证方式(都可以默认,不用改),直接点击创建。


【5】创建成功后,点击用户管理


【6】添加用户



【7】添加成功


【8】添加完毕之后,打开MQTT客户端可以进行测试。

登录的时候,MQTT用户名和密码必须输入正确,按照上一步添加的信息进行如实填写,否则是无法登录服务器的。



4.6 客户端授权配置

客户端授权页面可以配置每个客户端(设备)的主题发布,订阅权限。限制它是否可以发布主题,订阅主题。如果有需要就可以进行配置。


http://127.0.0.1:18083/#/authorization/detail/built_in_database?tab=users

【1】创建数据源


【2】选择内置数据库


【3】完成创建


【4】点击权限管理


【5】选择客户端ID,点击添加


【6】配置权限


4.7 数据转发(集成)

在集成选项里,可以对设备数据处理。比如:转发到自己的HTTP服务器,转发到自己其他的MQTT服务器,创建规则,某些事件触发某些动作等等。


选择数据桥接。

可以把数据发送端自己的HTTP服务器,或者发送到其他的MQTT服务器。


选择HTTP服务 (如果自己有HTTP服务器,可以将数据转发给自己的HTTP服务器)。



五、MQTT客户端消息互发测试

5.1 添加2个设备

为了方便测试设备间互相订阅主题,数据收发,在客户端认证页面至少添加2个设备。我这里分别添加了test1test2


5.2 设备间测试

设备A订阅设备B的主题,设备B订阅设备A的主题,实现数据互发。




设备A的MQTT信息:

MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端IDAAA
MQTT用户名:test1
MQTT登录密码:12345678

订阅主题:BBB/#
发布主题:AAA/1
发布的消息:{ "msg": "我是AAA设备" }


设备B的MQTT信息:

MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端IDBBB
MQTT用户名:test2
MQTT登录密码:12345678

订阅主题:AAA/#
发布主题:BBB/1
发布的消息:{ "msg": "我是BBB设备" }



五、单片机设备上云

只要是MQTT客户端能正常上云通信了,那么单片机也是一样的。

上位机也可以采用MQTT协议接入服务器,订阅设备的主题,就可以实时接收设备的消息(当然,也可以采用HTTP协议接入)。


六、数据桥接

EMQX支持将设备上传的数据转发到其他地方,比如,自己的HTTP服务器。方便自己服务器进行其他的处理。

通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。而 EMQX Dashboard 提供了可视化创建数据桥接的能力,只需在页面中配置相关资源即可。

本章节就介绍如何搭建自己的HTTP服务器。配置EMQX转发数据到自己的HTTP服务器,保存处理数据。

6.1 搭建HTTP服务器

我这里直接使用python写代码搭建一个HTTP服务器。ECS服务器上默认没有安装python3,需要先安装一下。

【1】安装python3

root@emqx:~/emqx# apt install python3 
Reading package lists... Done
Building dependency tree
Reading state information... Done
python3 is already the newest version (3.6.7-1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.


【2】编写代码

from flask import Flask, json, request

app = Flask(__name__)

@app.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)

将以上代码保存到一个名为 server.py 的文件中。

这段代码创建了一个使用 Flask 框架的 Web 服务器,可以接收根路径的 POST 请求。当接收到 POST 请求时,服务器会将请求的数据打印到终端,并返回一个 JSON 格式的响应给客户端。服务器将在本地运行,并监听默认的 8000 端口。


【3】运行程序

# 安装 flask 依赖
pip install flask
pip3 install flask

# 启动服务
python3 server.py

在命令行中执行 python3 server.py,就可以启动一个简单的HTTP服务器,可以接收并处理POST请求。当有POST请求发生时,服务器将返回收到的POST数据。可以根据自己的需要,进一步扩展处理POST请求的逻辑。


运行示例: (代码可以在本地写好上传到服务器,也可以直接 vim server.py 打开编辑器直接编写 )


可以通过发送 POST 请求到 http://your-server-ip:8000/ 的方式来测试这个服务器。

比如:

http://122.112.225.194:8000/


6.2 数据转发配置

【1】在集成选项里,可以对设备数据处理,将数据转发到自己的HTTP服务器。


【2】选择Webhook。

Webhook,使用 Webhook 来转发数据到 HTTP 服务;

使用 Webhook 其实就是将 EMQX 接收并处理后的数据发送到一个 HTTP 服务上,再根据预设好的 HTTP 服务来处理和集成业务数据。

同样用户需要有一个预先搭建好的 HTTP 服务,需要在配置信息页面填写 HTTP 请求的服务地址,选择一个请求方法 POST、GET、PUT 或 DELETE,配置请求头,将需要发送的数据使用模板语法填写到请求体(body)中即可。


【3】选配置Webhook

触发器选择所有消息和事件,URL里填自己的服务器地址。



【4】点击测试。测试服务器是否OK。



【6】没问题就直接点击保存



【7】创建成功


【8】保存之后。会自动创建规则数据桥接。非常方便。


6.3 测试转发效果

【1】打开MQTT客户端,发送数据测试。


【2】看python服务器的终端,可以看到收到了EMQX服务器转发过来的数据。

如果自己接下来想要进行其他的操作,服务器写代码进行对应的处理即可。


【3】 点击这个服务可以看到已经触发转发的详情。




七、API接口说明

一般开发一套完整的物联网产品。一般会分为设备端,服务器,上位机部分。

设备端:就是硬件端。采集本地传感器的数据上传到服务器,或者接收服务器下发的指令完成某些控制。

比如:STM32 + ESP8266 + 各种传感器 就是一个硬件设备端。可以通过ESP8266联网上传数据。

服务器:也就是MQTT服务器端,比如: 自己采用EMQX搭建的MQTT服务器,或者采用阿里云、华为云、OneNet这些平台的IOT服务器。

上位机:上位机就是给用户使用的,用于远程控制设备,查看设备。比如:微信小程序、手机APP、电脑上位机、web网页端等等。

那么这个章节,就介绍利用EMQX提供的API接口与MQTT客户端设备进行通信,完成数据上传,命令下发等功能,可以利用此接口完成上位机的开发。

7.1 查看全部的API接口

帮助文档地址: https://www.emqx.io/docs/zh/v5.0/admin/api.html#%E8%AE%A4%E8%AF%81

EMQX 提供了管理监控 REST API,这些 API 遵循 OpenAPI (Swagger) 3.0 规范。 EMQX 在 REST API 上做了版本控制,EMQX 5.0.0 以后的所有 API 调用均以 /api/v5 开头。

EMQX 服务启动后,可以访问 http://localhost:18083/api-docs/index.html (opens new window)来查看 API 的文档。还可以直接在 Swagger UI 上尝试执行一些 API。

比如: 我的EMQX服务器是在华为云ECS服务器上搭建,公网IP是: 122.112.225.194

那我访问API文档的地址就是下面这样的格式: 在浏览器里打开即可。

http://122.112.225.194:18083/api-docs/index.html


访问效果如下:



7.2 创建API密匙

【1】登录EMQX的后台管理页面: http://122.112.225.194:18083/

【2】找到菜单里的 系统设置选项-->API密匙。


【3】创建密匙。


【4】填写密匙名称


【5】创建成功


【6】得到API Key 和 Secret Key

API Key    :  f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG


7.3 测试API: 获取节点信息

上一步已经创建好API的访问密匙,这里就以 获取节点信息为例,调用获取节点信息的API接口,获取节点 信息。


接口在API文档里的介绍:


根据前面的API访问路径规则说明;那么,获取节点信息的API完整访问路径为:

http://122.112.225.194:18083/api/v5/nodes


接下来就用python写一份代码,测试一下接口是否可以正常访问。python代码直接放服务器运行(主要是我本地没有安装python环境,云服务器的环境是已经安装OK的,测试方便)。



【1】在云服务器上创建一个python文件,方便测试代码


【2】创建之后FinaShell自动上传到服务器


【3】编辑代码

在这里双击要编辑的文件,就可以打开文件进行编辑。默认采用内置的编辑器,也可以选择自己电脑上的外置编辑器。


【4】代码编辑完成,按下键盘快捷键Ctrl + S 保存文件内容,保存之后文件内容会自动同步到服务器。


保存后提示,自动上传。



写入的代码如下:

import urllib.request
import json
import base64

username = 'f072a6e9758b8cdf'
password = 'LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG'

url = 'http://122.112.225.194:18083/api/v5/nodes'

req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')

auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
req.add_header('Authorization', auth_header)

with urllib.request.urlopen(req) as response:
data = json.loads(response.read().decode())

print(data)


【5】执行代码,返回结果

通过返回信息来看,节点信息获取是没有问题的。

root@emqx:~/emqx# python3 http_api_test.sh 
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '612.59M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84000040, 'version': '5.3.1-alpha.1'}]
root@emqx:~/emqx# python3 http_api_test.sh
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '613.23M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84008046, 'version': '5.3.1-alpha.1'}]


7.4 在线调试(获取主题列表)

在编写代码之前,可以先测试下API接口的效果,可以直接在Swagger UI界面直接调试API。

地址: http://122.112.225.194:18083/api-docs/index.html#/


例如:以获取以订阅主题列表的API接口为例。


【1】在Swagger UI界面上找到对应的API接口。


【2】点击API说明,展开详情


【3】点击右边的试试看按钮。


【4】点击执行


【5】然后会弹出提示框,让你填入用户名和密码。

这个用户名和密码就是前面创建API密匙生成的API Key(用户名) 和 Secret Key(密码) 。

API Key    :  f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG



【6】根据提示输入用户名和密码,再点击登录。


【7】再次点击执行,就可以看到接口返回的数据了。

并且在页面上也写出了,请求的信息。使用curl命令行给出详细的请求过程,参考这个就可以自己写代码了。


【8】API调用,curl命令行执行代码如下:

curl -X 'GET' \
'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50' \
-H 'accept: application/json'


【9】python代码实现

import requests  

url = 'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50'
headers = {'accept': 'application/json'}

response = requests.get(url, headers=headers)

if response.status_code == 200:
data = response.json()
# 在这里处理返回的数据
print(data)
else:
print("请求失败,状态码:", response.status_code)



7.5 在线调试(发布主题)

API里也支持发布主题,利用HTTP协议发布主题消息,如果设备端订阅了该主题,就可以收到API接口发布的消息。


【1】先找到发布主题的API接口


【2】点击API名字,展开详情


【3】点击右边的Try it out按钮。


【4】参数填写说明

因为这个接口是发送主题的,需要填参数,填自己需要发布什么主题,什么消息。

出来的框框里就是发布信息,根据自己需要修改。


topic就是发布的主题。 payload 就是发布的消息内容。只要MQTT客户端订阅了这个主题,就可以收到发布的消息。

这个主题的名字可以随便改的。我这里就用默认的名字和内容测试。

{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}


【5】MQTT客户端登录。

打开MQTT客户端,登录服务器,订阅api/example/topic主题。

【6】在API调试页面,点击执行


【7】执行之后,在MQTT客户端的就可以收到API下发的消息了。


【8】API接口调用,curl命令行执行的代码如下:

curl -X 'POST' \
'http://122.112.225.194:18083/api/v5/publish' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}'


【9】Python代码实现

import requests  
import json

url = 'http://122.112.225.194:18083/api/v5/publish'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": False
}

response = requests.post(url, headers=headers, data=json.dumps(data))

if response.status_code == 200:
print("消息发布成功")
else:
print("消息发布失败,状态码:", response.status_code)


DS小龙哥 嵌入式技术资讯
不定时更新STM32、物联网、linux驱动、QT等技术文章;打造嵌入式开发相关知识分享、技术交流平台
 最新文章