第 1 章
MQ的介绍
MQ,全称为Message Queue,即消息队列。在微服务和分布式系统中,使用MQ可以实现服务间的异步通信、流量削峰、业务解耦和数据分发等功能,从而提升系统的响应速度和并发处理能力。
第 2 章
摘要
通过将Feign同步远程调用转变为MQ异步处理,用户数据成功存储到数据库后,立即发送用户ID至register.success.topic交换机,无需等待其他服务响应,即可即时反馈用户注册成功的结果,实现快速响应并提升系统整体性能。
第 3 章
同步通信和异步通信
在微服务架构中,服务间的数据交互是系统运行的核心环节。在实际开发中,为了实现高效且可靠的服务间交互,目前主要有同步通信与异步通信这两种数据交互方式。
3.1、同步通信
3.2、异步通信
第 4 章
MQ技术对比
常见的MQ实现有:RabbitMQ,ActiveMQ,RocketMQ和Kafka。
第 5 章
Docker安装RabbitMQ
//拉取镜像
docker pull rabbitmq:3.8-management
//安装RabbitMQ
docker volume create mq-plugins;
docker run \
-e RABBITMQ_DEFAULT_USER=yxclass \
-e RABBITMQ_DEFAULT_PASS=******* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
第 6 章
RabbitMQ控制台
6.1、控制台
http://ip:15672/可以访问RabbitMQ控制台。
6.2、数据隔离
第 7 章
SpringAMQP
7.1、SpringAMQP起步依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
7.2、消费者方配置文件
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
level:
net.yxclass: debug
spring:
rabbitmq:
host: yxclass.net
port: 5672
virtual-host: /yxcourse # 虚拟主机
username: yxcourse # 虚拟主机的用户名
password: **********
listener:
simple:
prefetch: 1 # 消费者一次只能获取1条消息
acknowledge-mode: manual
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
7.3、生产者方配置文件
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
level:
net.yxclass: debug
spring:
rabbitmq:
host: yxclass.net
port: 5672
virtual-host: /yxcourse
username: yxcourse
password: **********
connection-timeout: 1s
# 开启publisher confirm机制,并设置confirm类型
publisher-confirm-type: correlated
# 开启publisher return机制
publisher-returns: true
# 开启consumer手动确认模式
acknowledge-mode: manual
template:
retry:
retry:
enabled: true # 开启超时重连机制重试机制
initial-interval: 1000ms # 失败后的初始等待时间
max-attempts: 3 # 最大重试次数
multiplier: 1 # 失败后下次的等待时长倍数
7.4、RabbitTemplate
RabbitTemplate是Spring AMQP框架中的一个关键组件,它封装了AMQP协议的细节,并提供了简单的API来进行消息发送和接收。
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
RabbitTemplate提供的convertAndSend() 方法允许将对象直接转换为消息体,并发布到指定的交换机和路由键。Spring AMQP 将自动处理消息转换成 JSON 或其他序列化格式。
第 8 章
SpringAMQP五种消息模型
SpringAMQP五种消息模型分别是:
1.简单队列模型:一个生产者对应一个消费者;
2.任务队列模型:一个生产者对应多个消费者;
3.发布订阅广播模型:所有绑定到交换机的队列将收到每一条消息;
4.发布订阅路由模型:交换机基于RountingKey转发消息给订阅了消息的队列;
5.发布订阅主题模型:与路由模型相似,只不过RountingKey可以使用通配符。
8.1、Basic Queue简单队列模型
Basic queue,简单队列模型。简单来说就是让一个消费者绑定到一个队列消费队列中的消息。
发送消息和监听队列前,必须先创建队列,如果队列不存在则会报错。
消息发送成功后,RabbitMQ控制台将展示队列中新增的一条信息,确保消息已经投递到对应的队列中。
当队列中有新消息到达时,这些消息会被自动路由到相应的消费者方法中进行处理。消费者方法会根据消息的内容和类型执行相应的操作,确保消息得到及时处理。一旦消息被成功消费,它将会从队列中立即移除,确保消息不会滞留或泄露,实现“阅后即焚”的效果。
8.2、Work Queue任务队列
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
这种分布式消费的方式可以有效地提高消息处理的并发性和效率,因为消息可以并行地被多个消费者处理,而不是仅仅依赖一个消费者。
1.多个消费者绑定到一个队列,同一条消息只会被一个消费者处理;
2.通过设置prefetch来控制消费者预取的消息数量。
8.3、交换机介绍
前面简单队列模型和任务队列模型都是生产者直接发送消息到队列,在开发中通常不会这样使用。而是采用发布订阅模型,引入交换机,将消息投放到交换机中,由交换机将消息投递到绑定的队列中。
Exchange(交换机)只负责转发消息,不具备存储消息的能力。因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
8.4、Fanout发布订阅的广播模型
Fanout:广播模型允许交换机中的相同消息被多个队列消费,交换机把生产者的消息复制成若干个副本发送到交换机绑定的队列中。
我在日常开发中是使用基于@RabbitListener注解的方式声明,这种方式不仅代码简洁,而且能够直接映射到具体的业务逻辑中,使得消息队列的配置更加直观和易于维护。
@QueueBinding绑定队列;
@Queue声明队列名称;durable开启持久化策略;
@Exchange声明交换机,type是交换机类型,key是路由键。
8.5、Direct发布订阅的路由模型
Direct发布订阅路由模型,基于RoutingKey发送给订阅了消息的队列。
8.6、Topic发布订阅的主题模型
Topic发布订阅主题模型,与Direct类似,只不过RoutingKey可以使用通配符。Topic交换机接收的消息RoutingKey必须是多个单词,以 `**.**` 分割。
通配符省略规则:
#:代表0个或多个词。
*:代表1个词。
8.7、消息转换器
Spring会把生产者发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。默认情况下Spring采用的是JDK序列化方式。
众所周知,JDK序列化存在数据体积过大,有安全漏洞以及可读性差等问题。
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入jackson-dataformat-xml依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中都添加下面的配置。
@Bean
public MessageConverter jacksonMessageConvertor(){
// 1.设置消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.确保业务幂等性
// 3.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断消息是否重复
jjmc.setCreateMessageIds(true);
return jjmc;
}
在使用 RabbitMQ 发送和接收自定义对象时,确实需要确保生产者和消费者都拥有相同的类定义,以便正确地序列化和反序列化消息。如果方法上的形参类型与发送的对象类型不匹配,消息将不会被正确消费,并且可能导致消息丢失或消费者处理失败。
当生产者发送消息到 RabbitMQ 之后,在 RabbitMQ 的控制台可以看到对应的消息。
第 9 章
DelayExchange延迟交换机
9.1、下载延迟交换机插件
下载网址:
9.2、上传及安装延迟插件
docker volume inspect mq-plugins
插件目录被挂载到了`/var/lib/docker/volumes/mq-plugins/_data`这个目录后就可以执行安装命令。
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
看到上图就是安装成功了,同时RabbitMQ控制台的交换机这里可以添加延迟交换机。
9.3、声明延迟交换机
9.4、发送延迟消息
第 10 章
MQ常见问题及解决方案
10.1、如何保证消息可靠性
消息从生产者到消费者的每一个步骤都可能导致消息丢失,常见的丢失原因有以下几种:
1.消息发送时丢失,生产者发送的消息未送达Exchange,或者是消息到达Exchange未到达Queue。
2.MQ宕机了,导致队列里的消息丢失。
3.消费者收到消息未消费完成程序就宕机,导致消息丢失了。
步骤1:在配置文件中开启confirm模式:
对于MQ宕机导致消息丢失这个问题,可以通过开启消息持久化机制来解决。我在项目中使用SpringAMQP框架,将 durable 的值设置为 true 表示开启了交换机,队列和消息持久化。
对于消费者收到消息后未成功处理完成,因为程序宕机等原因导致消息丢失了的问题,可以通过采用消费者确认模式来解决。消费者确认模式有自动确认(Automatic Acknowledgement)和手动确认(Manual Acknowledgement)两种方式。
我在项目中是使用手动确认的方式来保证消息可靠性的,因为自动确认模式会有重复消费的问题,而且消费者接收到消息,RabbitMQ会立即视为该消息已成功消费并将其从队列中移除,即使消费者在处理消息过程出现问题也不管。因此,在对可靠性要求较高的场景下,自动确认并不适合用来防止因消费者中途故障导致的消息丢失。
手动ack实现步骤:
步骤1:配置失败重试机制,我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试机制,而不是无限次投递到队列中。
具体做法是修改consumer服务的application.yml文件,添加下面retry层级的内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
步骤2:定义消费者接收消息。
我是习惯使用@RabbitListener注解声明的交换机队列,默认情况下@RabbitListener注解中的ackMode属性值为AUTO,这意味着消息在消费者方法执行完毕后会自动发送确认信号给RabbitMQ,告知消息已被成功处理。在这种模式下,如果消费者在处理消息时发生异常,消息可能会丢失,因为RabbitMQ会认为消息已经被成功处理。
为了确保消息在消费者处理过程中出现异常时能够被重新投递,我们需要将ackMode属性的值设置为MANUAL。这样,消费者在处理完消息后需要手动发送确认信号给RabbitMQ。
当消费者成功处理消息并完成所有相关业务逻辑后,它会发送一个确认信息(ack)给RabbitMQ的Broker。一旦Broker接收到这个确认信息,它就会从相应的队列中移除该消息,确保消息只被处理一次,避免重复处理。
在消息处理过程中,如果发生异常或错误导致消息丢失,可以在消费者服务中配置重试机制,尝试重新接收和处理该消息。
channel.basicAck(deliveryTag,multiple,requeue)这个API可以实现手动确认,deliveryTag是交付的版本号,multiple参数定义是否批量确认消息,requeue参数定义是否重复投递。
上图代码模拟了异常情况发生,RabbitMQ会按照配置的重试策略,持续向消费者发送消息,直到达到预设的重试次数上限。如果重试后消息仍然无法成功处理,则消息将被丢弃,确保系统的稳定性和可靠性。
10.2、消息可靠性兜底方案
RabbitMQ作为一个中间件,确实在流量削峰和业务解耦方面发挥着重要作用,有效提升了系统的响应速度和并发处理能力。然而,我们也必须认识到,任何中间件的引入都可能会增加系统的复杂性和潜在风险。为了确保系统的稳定性和可靠性,我们需要提前准备异常情况下的预案。
因此,在引入RabbitMQ的同时,我们也应该考虑如何降低对其的依赖。一种有效的策略是采用SpringTask定时任务框架结合RestTemplate或Feign客户端进行同步通信。当RabbitMQ出现故障或不可用时,这些定时任务可以定期触发,通过RestTemplate或Feign客户端远程调用生产者服务,确保消息的传递和业务的正常运行。
10.3、惰性队列
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
1.消费者宕机或出现网络故障。
2.消息发送量激增,超过了消费者处理速度。
3.消费者处理业务发生阻塞。
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为称为PageOut。PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
1.接收到消息后直接存入磁盘而非内存。
2.消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)。
3.支持数百万条的消息存储。
4.在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
1.基于配置类声明惰性队列:
2.基于@RabbitListener注解声明惰性队列:
3.将所有包含lazy.queue的队列设置为惰性队列:
//连接上RabbitMQ客户端执行下面的命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
//docker容器安装需要先进入容器内部
docker exec -it mq
docker exec -it mq rabbitmqctl list_policies
由下图可知:策略 Lazy 已经被成功设置,并且它的正则表达式模式为 ^lazy-queue$,应用于 queues,定义(definition)为 {"queue-mode":"lazy"},优先级(priority)为 0。这表明该策略会应用于所有名称与 ^lazy-queue$ 匹配的新队列,并将这些队列的模式设置为 Lazy。
如果需要将其他所有的队列都改为惰性队列,就将^lazy-queue$改为^queue$。
10.4、如何避免消息堆积的问题
产生消息堆积主要是生产者和消费者不平衡导致的。出现消费堆积有很多种情况:
第1种是服务器的cpu和内存跟不上。
第2种是消费者消费速度跟不上,比如数据库的IO遇到了瓶颈,或者是消费者出现了其他的原因导致消费速度变慢。
避免消息堆积情况发生核心处理思路还是提高消费者的消费速率,保证消费者不出现消费故障。
如果是服务器的cpu和内存跟不上,就增加机器就好了。
如果是消费者消费能力原因的话,解决消息堆积问题我知道的有下述方案:
1.增加更多的消费者,提高消费速度。
2.在消费者内使用线程池加快线程处理速度。
3.优化sql语句和代码,比如使用乐观锁的方式替代悲观锁。
4.扩大队列容积,提高堆积上限。
5.使用惰性队列解决消息堆积,RabbitMQ的3.6版本开始,就增加了Lazy Queue的概念,惰性队列可以将接收到消息后直接存储到磁盘中,而不是占用系统内存。消费者需要消费消息时才会从磁盘中读取并加载到内存中。
10.5、如何保证消息的顺序性
1. 一个queue,有多个consumer去消费,这样就会造成顺序的错误。
2. 一个queue对应一个consumer,但是consumer的方法使用多线程接收消息,这样也会造成消息消费顺序错误。
1. 对于一个queue有多个consumer造成顺序错乱的问题,可以通过拆分多个queue,每个queue对应一个consumer,就是多一些queue,这样增加了队列可能会导致性能下降,但是可以在consumer的业务层使用线程池来提升消费者的消费速度。
2. 对于在消费者方法中使用多线程导致消息顺序错乱问题的解决方案是将consumer方法的线程池移到service业务层的方法,这样就可以解决多线程环境下消息顺序错乱的问题。
第 11 章
注册业务性能优化
11.1、同步通信注册业务流程
当用户打开注册页面时,前端会主动发起一个请求到后端服务器生成短信验证码的图片。为了确保业务的幂等性,前端会采用UUID(Universally Unique Identifier)来生成一个唯一的短验证码发送给后端。后端拿到这个UUID将作为键(key),而生成的验证码将作为值(value)存储在Redis缓存中。
在用户点击发送注册短信之前,系统首先要求用户输入图片验证码以进行验证。只有当用户成功通过验证码验证后,系统才会发送一个请求来查询用户输入的手机号码是否已经注册。
如果查询结果显示该手机号码尚未注册,系统才会进一步发送注册短信验证码至用户的手机。这一过程旨在确保注册流程的安全性,并防止恶意注册或滥用服务。
在提交注册表单时,为了保障用户密码的安全性,前端需要对用户输入的密码进行MD5加密,并在加密过程中加入随机生成的“盐值”(即一段随机字符串)。这种“加盐”的MD5加密方式可以有效防止密码被轻易破解或盗用。加密后的密码将被安全地传输至后端服务器进行存储和处理,从而确保用户信息的安全性。
11.2、变同步为异步提升业务性能
为了提升业务性能,我们可以将原本的同步操作转变为异步处理。在现有的业务流程中,用户服务、积分服务、优惠券服务、课程服务和短信服务都是同步执行的,每个服务假设耗时100毫秒,总共需要500毫秒才能完成整个注册流程,这在没有并发请求的情况下仍然显得较长。
为了改进这一状况,我们可以采用消息队列(MQ)的方式实现异步处理。当用户数据成功存储到用户服务数据库的用户表中后,我们并不等待其他服务的响应,而是立即将用户的ID发送到MQ的register.success.topic交换机。这样,系统可以在极短的时间内(减少了3个服务)响应用户注册成功的消息,让用户能够快速进行登录操作。
而积分服务、优惠券服务、课程服务和短信服务则会监听MQ中的register.success.topic交换机,一旦收到新用户注册成功的消息,它们会在后台异步地处理相应任务:积分服务为用户添加积分,优惠券服务发放优惠券,课程服务为用户添加低价课程,短信服务发送注册成功通知。由于这些操作是异步进行的,它们不会影响到用户注册的响应时间,从而大大提升了用户体验。
通过这种改造,我们不仅将原本500毫秒的注册响应时间缩短到了200毫秒以内,还提高了系统的整体吞吐量和并发处理能力。
11.3、注册业务兜底方案
为了确保注册业务的稳定性和可靠性,防止当RabbitMQ出现故障或者不可用时导致注册业务的不可用的问题,我使用SpringTask定时任务框架与Feign客户端进行同步服务调用。
当生产者成功发送消息至RabbitMQ后,会立即将用户表中的mq_status字段更新为1,表示消息已经成功投递至消息队列。
为了应对消息队列不可用的情况,我设置了一个每30秒执行一次的定时任务。这个任务会扫描用户表,查找那些mq_status字段值仍为0的用户。对于这些用户,定时任务会通过Feign客户端同步调用相应的服务(如积分服务、优惠券服务等),确保这些用户的注册流程得到正确处理。
此外,我还考虑到了消息发送到RabbitMQ,消费者处理消息失败的情况。为此,我设计了一个用于记录消息投递失败情况的表。定时任务会定期扫描这张表,对于投递失败的消息,同样通过Feign客户端进行同步处理,确保消息最终能够被正确处理。
通过上述兜底方案的设计与实施,确保了即使在RabbitMQ不可用的情况下,系统的注册业务也能得到妥善处理,从而保证了业务的稳定性和可靠性。
第 12 章
作者介绍
吴灿锦,泰伯一百零一世孙,明朝开国名将安陆侯吴复的后代,毕业于吉林财经大学;
第九届中国国际“互联网+”创新创业大赛吉林省赛区金奖项目第一主持人;
第十三届“挑战杯”中国大学生创业计划大赛吉林省赛区特等奖,国家级铜奖项目第一主持人;
2022年荣获吉林财经大学创业实践国家级立项第一名项目主持人。
· 往期回顾 ·