一、前言
在众多物联网通信协议中,MQTT(Message Queuing Telemetry Transport)因其轻量、高效的特点而被广泛应用于各种物联网场景。它不仅能够满足设备低功耗的需求,还支持设备与云端之间稳定可靠的消息传递,是实现设备上云的理想选择之一。本文介绍利用EMQX服务器搭建自己私有的MQTT服务器,构建一套完整的物联网解决方案,涵盖从设备接入到数据存储,再到数据转发及上位机开发等多个关键环节。
EMQX,全称为Erlang/Enterprise Middleware MQTT Broker,是一款基于Erlang/OTP平台开发的开源物联网消息中间件。它专为大规模物联网应用设计,能够处理海量并发连接,并提供稳定的消息发布/订阅服务。作为一款高性能的MQTT协议服务器,EMQX不仅支持标准的MQTT v3.1、v3.1.1以及最新的v5.0版本协议,还提供了丰富的扩展功能来满足不同场景下的需求。
EMQX的核心优势在于其卓越的性能表现和高度可伸缩性。单个EMQX集群可以轻松管理数百万级别的设备连接,同时保持低延迟的消息传递能力。这使得EMQX成为构建大型物联网系统时的理想选择之一。此外,通过灵活配置规则引擎,用户可以根据业务逻辑定制化处理接收到的数据流,实现复杂事件处理、数据转换等功能。例如,当特定条件被触发时,可以自动执行预设的动作或将信息转发给其他系统进行进一步分析。
安全性方面,EMQX支持多种认证机制如用户名密码验证、客户端证书验证等,以确保只有授权用户才能访问敏感资源;同时也提供了TLS/SSL加密通信能力,保障了数据传输过程中的安全性和完整性。对于需要严格遵守行业标准的企业来说,这些特性尤为重要。
在集成度方面,EMQX展现了极高的灵活性与兼容性。无论是与其他数据库系统的对接(如MySQL, PostgreSQL, MongoDB等),还是与各种云服务提供商(如阿里云、AWS)的无缝衔接,EMQX都能很好地适应并促进整个生态系统的健康发展。EMQX还配备了详细的文档资料和技术支持服务,帮助开发者快速上手并解决遇到的问题。
本文章里用到的全部工具软件都可以在这里下载(放在网盘里了)。
https://pan.quark.cn/s/145a9b3f7f53
二、准备服务器
EMQX-服务器部署目前支持以下操作系统:
RedHat
CentOS
RockyLinux
AmazonLinux
Ubuntu
Debian
macOS
Linux
Windows(旧版本是支持的,新版不支持Windows系统,官网没有下载链接了
)
前期测试阶段,华为云、阿里云、腾讯云这些平台都有免费试用的服务器,可以领取一个月ECS服务器试用测试。
也可以自己本地Windows电脑上搭建服务器,这个我在另一篇文章里有详细介绍,还有对应的视频(Windows系统搭建)。
搭建自己的MQTT服务器、实现设备上云(windows+EMQX)
我下面是领取了华为云一个月的ECS服务器搭建此项目的服务器用于进行整体项目的测试。
2.1 登录官网
https://www.huaweicloud.com/
2.2 领取一个月的服务器
【1】选择服务器
配置过程中,系统选择ubuntu18.04
。
【2】返回弹性服务器的控制台
服务器领取配置好之后回到主界面。
【3】点击服务器名字,可以进入到详情页面。
2.3 配置安全组
要确保MQTT服务器常用的几个端口已经开放出出来。
2.4 安装FinalShell
Windows下安装 FinalShell 终端,方便使用SSH协议远程登录到云服务器。(当然,使用其他方式登录也是一样的)
2.5 远程登录到云服务器终端
【1】新建连接,选择SSH连接。
【2】填入IP地址、用户名、密码
这里的主机就是填服务器的公网IP地址,密码就是创建服务器输入的密码,用户名直接用root。
【3】点击连接服务器
【4】第一次登录会弹出提示框,选择接受并保存
【5】接下来可以看到服务器已经登录成功了。
三、Linux下安装EMQX
本章节将介绍如何在 Ubuntu 系统中下载安装并启动 EMQX。
支持的 Ubuntu 版本:
Ubuntu 22.04
Ubuntu 20.04
Ubuntu 18.04
2.1 官网地址
链接:https://www.emqx.io/docs/zh/v5.2/deploy/install-ubuntu.html
2.2 通过Apt源安装
EMQX 支持通过 Apt 源安装,免除了用户需要手动处理依赖关系和更新软件包等的困扰,具有更加方便、安全和易用等优点。
在命令行终端,复制下面的命令过去,按下回车键。
【1】通过以下命令配置 EMQX Apt 源:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
【2】运行以下命令安装 EMQX:
sudo apt-get install emqx
【3】运行以下命令启动 EMQX:
sudo systemctl start emqx
过程如下:
2.3 EMQX常用的命令
sudo systemctl emqx start 启动
sudo systemctl emqx stop 停止
sudo systemctl emqx restart 重启
四、配置EMQX-MQTT服务器
4.1 登录EMQX内置管理控制台
EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。
在浏览器里输入: http://122.112.225.194:18083
就可以访问EMQX的后台管理页面。可以管理以连接的客户端或检查运行状态。
这里面的IP地址,就是自己ECS云服务器的公网IP地址。
打开浏览器后,输入地址后打开的效果:
默认用户名和密码:
用户名:admin
密码:public
第一次登录会提示你修改新密码,如果不想设置,也可以选择跳过(公网服务器部署,还是要修改密码安全些)。
下面修改新密码:
登录成功的页面显示如下:
4.2 MQTT配置
这里可以配置MQTT的一些参数,根据自己的需求进行配置。
4.3 测试MQTT通信
新建一个客户端,点击连接。
连接之后,然后点击订阅,和发布,如果下面消息能正常的接收。说明MQTT服务器通信是已经正常,没问题了。
并且在这个页面也可以看到主题发布
和主题订阅
的格式。
4.4 MQTT客户端登录服务器测试
接下来就打开我们自己的MQTT客户端登录MQTT服务器进行测试数据的通信。
端口选择: 1883
根据软件参数填入参数,登录,进行主题的发布和订阅。
说明:目前还没有配置客户端认证,现在只要IP和端口输入正确,MQTT三元组可以随便输入,都可以登录上服务器的,服务器没有对三元组做校验。
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。
然后打开EMQX
的管理后台,可以看到我们的设备已经登录服务器了,名字为test1
。
在订阅主题的页面也可以看到我们客户端设备订阅的主题。
4.5 客户端认证配置
EMQX 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。
在正式产品里肯定是要启用认证的,不然任何设备都能接入。
下面就介绍如何配置 客户端认证。
【1】打开客户端认证页面
【2】选择密码认证
【3】选择内置数据库
【4】设置认证方式(都可以默认,不用改),直接点击创建。
【5】创建成功后,点击用户管理
【6】添加用户
【7】添加成功
【8】添加完毕之后,打开MQTT客户端可以进行测试。
登录的时候,MQTT用户名和密码必须输入正确,按照上一步添加的信息进行如实填写,否则是无法登录服务器的。
4.6 客户端授权配置
客户端授权页面可以配置每个客户端(设备)的主题发布,订阅权限。限制它是否可以发布主题,订阅主题。如果有需要就可以进行配置。
http://127.0.0.1:18083/#/authorization/detail/built_in_database?tab=users
【1】创建数据源
【2】选择内置数据库
【3】完成创建
【4】点击权限管理
【5】选择客户端ID,点击添加
【6】配置权限
4.7 数据转发(集成)
在集成选项里,可以对设备数据处理。比如:转发到自己的HTTP服务器,转发到自己其他的MQTT服务器,创建规则,某些事件触发某些动作等等。
选择数据桥接。
可以把数据发送端自己的HTTP服务器,或者发送到其他的MQTT服务器。
选择HTTP服务 (如果自己有HTTP服务器,可以将数据转发给自己的HTTP服务器)。
五、MQTT客户端消息互发测试
5.1 添加2个设备
为了方便测试设备间互相订阅主题,数据收发,在客户端认证页面至少添加2个设备。我这里分别添加了test1
和test2
。
5.2 设备间测试
设备A订阅设备B的主题,设备B订阅设备A的主题,实现数据互发。
设备A的MQTT信息:
MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端ID:AAA
MQTT用户名:test1
MQTT登录密码:12345678
订阅主题:BBB/#
发布主题:AAA/1
发布的消息:{ "msg": "我是AAA设备" }
设备B的MQTT信息:
MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端ID:BBB
MQTT用户名:test2
MQTT登录密码:12345678
订阅主题:AAA/#
发布主题:BBB/1
发布的消息:{ "msg": "我是BBB设备" }
五、单片机设备上云
只要是MQTT客户端能正常上云通信了,那么单片机也是一样的。
上位机也可以采用MQTT协议接入服务器,订阅设备的主题,就可以实时接收设备的消息(当然,也可以采用HTTP协议接入)。
六、数据桥接
EMQX支持将设备上传的数据转发到其他地方,比如,自己的HTTP服务器。方便自己服务器进行其他的处理。
通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。而 EMQX Dashboard 提供了可视化创建数据桥接的能力,只需在页面中配置相关资源即可。
本章节就介绍如何搭建自己的HTTP服务器。配置EMQX转发数据到自己的HTTP服务器,保存处理数据。
6.1 搭建HTTP服务器
我这里直接使用python写代码搭建一个HTTP服务器。ECS服务器上默认没有安装python3
,需要先安装一下。
【1】安装python3
root@emqx:~/emqx# apt install python3
Reading package lists... Done
Building dependency tree
Reading state information... Done
python3 is already the newest version (3.6.7-1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.
【2】编写代码
from flask import Flask, json, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
将以上代码保存到一个名为 server.py
的文件中。
这段代码创建了一个使用 Flask 框架的 Web 服务器,可以接收根路径的 POST 请求。当接收到 POST 请求时,服务器会将请求的数据打印到终端,并返回一个 JSON 格式的响应给客户端。服务器将在本地运行,并监听默认的 8000
端口。
【3】运行程序
# 安装 flask 依赖
pip install flask
pip3 install flask
# 启动服务
python3 server.py
在命令行中执行 python3 server.py
,就可以启动一个简单的HTTP服务器,可以接收并处理POST请求。当有POST请求发生时,服务器将返回收到的POST数据。可以根据自己的需要,进一步扩展处理POST请求的逻辑。
运行示例: (代码可以在本地写好上传到服务器,也可以直接 vim server.py
打开编辑器直接编写 )
可以通过发送 POST 请求到 http://your-server-ip:8000/
的方式来测试这个服务器。
比如:
http://122.112.225.194:8000/
6.2 数据转发配置
【1】在集成选项里,可以对设备数据处理,将数据转发到自己的HTTP服务器。
【2】选择Webhook。
Webhook,使用 Webhook 来转发数据到 HTTP 服务;
使用 Webhook 其实就是将 EMQX 接收并处理后的数据发送到一个 HTTP 服务上,再根据预设好的 HTTP 服务来处理和集成业务数据。
同样用户需要有一个预先搭建好的 HTTP 服务,需要在配置信息页面填写 HTTP 请求的服务地址,选择一个请求方法 POST、GET、PUT 或 DELETE,配置请求头,将需要发送的数据使用模板语法填写到请求体(body)中即可。
【3】选配置Webhook
触发器选择所有消息和事件,URL里填自己的服务器地址。
【4】点击测试。测试服务器是否OK。
【6】没问题就直接点击保存
【7】创建成功
【8】保存之后。会自动创建规则
和数据桥接
。非常方便。
6.3 测试转发效果
【1】打开MQTT客户端,发送数据测试。
【2】看python服务器的终端,可以看到收到了EMQX服务器转发过来的数据。
如果自己接下来想要进行其他的操作,服务器写代码进行对应的处理即可。
【3】 点击这个服务可以看到已经触发转发的详情。
七、API接口说明
一般开发一套完整的物联网产品。一般会分为设备端,服务器,上位机部分。
设备端:就是硬件端。采集本地传感器的数据上传到服务器,或者接收服务器下发的指令完成某些控制。
比如:STM32 + ESP8266 + 各种传感器 就是一个硬件设备端。可以通过ESP8266联网上传数据。
服务器:也就是MQTT服务器端,比如: 自己采用EMQX搭建的MQTT服务器,或者采用阿里云、华为云、OneNet这些平台的IOT服务器。
上位机:上位机就是给用户使用的,用于远程控制设备,查看设备。比如:微信小程序、手机APP、电脑上位机、web网页端等等。
那么这个章节,就介绍利用EMQX提供的API接口与MQTT客户端设备进行通信,完成数据上传,命令下发等功能,可以利用此接口完成上位机的开发。
7.1 查看全部的API接口
帮助文档地址: https://www.emqx.io/docs/zh/v5.0/admin/api.html#%E8%AE%A4%E8%AF%81
EMQX 提供了管理监控 REST API,这些 API 遵循 OpenAPI (Swagger) 3.0 规范。 EMQX 在 REST API 上做了版本控制,EMQX 5.0.0 以后的所有 API 调用均以 /api/v5 开头。
EMQX 服务启动后,可以访问 http://localhost:18083/api-docs/index.html (opens new window)来查看 API 的文档。还可以直接在 Swagger UI 上尝试执行一些 API。
比如: 我的EMQX服务器是在华为云ECS服务器上搭建,公网IP是: 122.112.225.194
那我访问API文档的地址就是下面这样的格式: 在浏览器里打开即可。
http://122.112.225.194:18083/api-docs/index.html
访问效果如下:
7.2 创建API密匙
【1】登录EMQX的后台管理页面: http://122.112.225.194:18083/
【2】找到菜单里的 系统设置选项-->API密匙。
【3】创建密匙。
【4】填写密匙名称
【5】创建成功
【6】得到API Key 和 Secret Key
API Key : f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG
7.3 测试API: 获取节点信息
上一步已经创建好API的访问密匙,这里就以 获取节点信息为例,调用获取节点信息的API接口,获取节点 信息。
接口在API文档里的介绍:
根据前面的API访问路径规则说明;那么,获取节点信息的API完整访问路径为:
http://122.112.225.194:18083/api/v5/nodes
接下来就用python写一份代码,测试一下接口是否可以正常访问。python代码直接放服务器运行(主要是我本地没有安装python环境,云服务器的环境是已经安装OK的,测试方便)。
【1】在云服务器上创建一个python文件,方便测试代码
【2】创建之后FinaShell
自动上传到服务器
【3】编辑代码
在这里双击
要编辑的文件,就可以打开文件进行编辑。默认采用内置的编辑器,也可以选择自己电脑上的外置编辑器。
【4】代码编辑完成,按下键盘快捷键Ctrl + S
保存文件内容,保存之后文件内容会自动同步到服务器。
保存后提示,自动上传。
写入的代码如下:
import urllib.request
import json
import base64
username = 'f072a6e9758b8cdf'
password = 'LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG'
url = 'http://122.112.225.194:18083/api/v5/nodes'
req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')
auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
req.add_header('Authorization', auth_header)
with urllib.request.urlopen(req) as response:
data = json.loads(response.read().decode())
print(data)
【5】执行代码,返回结果
通过返回信息来看,节点信息获取是没有问题的。
root@emqx:~/emqx# python3 http_api_test.sh
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '612.59M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84000040, 'version': '5.3.1-alpha.1'}]
root@emqx:~/emqx# python3 http_api_test.sh
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '613.23M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84008046, 'version': '5.3.1-alpha.1'}]
7.4 在线调试(获取主题列表)
在编写代码之前,可以先测试下API接口的效果,可以直接在Swagger UI
界面直接调试API。
地址: http://122.112.225.194:18083/api-docs/index.html#/
例如:以获取以订阅主题列表的API接口为例。
【1】在Swagger UI
界面上找到对应的API接口。
【2】点击API说明,展开详情
【3】点击右边的试试看
按钮。
【4】点击执行
【5】然后会弹出提示框,让你填入用户名和密码。
这个用户名和密码就是前面创建API密匙生成的API Key(用户名)
和 Secret Key(密码)
。
API Key : f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG
【6】根据提示输入用户名和密码,再点击登录。
【7】再次点击执行,就可以看到接口返回的数据了。
并且在页面上也写出了,请求的信息。使用curl
命令行给出详细的请求过程,参考这个就可以自己写代码了。
【8】API调用,curl
命令行执行代码如下:
curl -X 'GET' \
'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50' \
-H 'accept: application/json'
【9】python代码实现
import requests
url = 'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50'
headers = {'accept': 'application/json'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# 在这里处理返回的数据
print(data)
else:
print("请求失败,状态码:", response.status_code)
7.5 在线调试(发布主题)
API里也支持发布主题,利用HTTP协议发布主题消息,如果设备端订阅了该主题,就可以收到API接口发布的消息。
【1】先找到发布主题的API接口
【2】点击API名字,展开详情
【3】点击右边的Try it out
按钮。
【4】参数填写说明
因为这个接口是发送主题的,需要填参数,填自己需要发布什么主题,什么消息。
出来的框框里就是发布信息,根据自己需要修改。
topic
就是发布的主题。 payload
就是发布的消息内容。只要MQTT客户端订阅了这个主题,就可以收到发布的消息。
这个主题的名字可以随便改的。我这里就用默认的名字和内容测试。
{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}
【5】MQTT客户端登录。
打开MQTT客户端,登录服务器,订阅api/example/topic
主题。
【6】在API调试页面,点击执行
【7】执行之后,在MQTT客户端的就可以收到API下发的消息了。
【8】API接口调用,curl
命令行执行的代码如下:
curl -X 'POST' \
'http://122.112.225.194:18083/api/v5/publish' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}'
【9】Python代码实现
import requests
import json
url = 'http://122.112.225.194:18083/api/v5/publish'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": False
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("消息发布成功")
else:
print("消息发布失败,状态码:", response.status_code)