消息队列(Message Queue, MQ)作为现代分布式系统中非常重要的组件之一,能帮助我们解耦、削峰、提高系统吞吐量。今天,我们将用SpringBoot快速集成RabbitMQ,实现消息队列的基础模型和常用的发布订阅模式,让你轻松玩转MQ!
什么是消息队列?它能帮我们做什么?
消息队列就像一个中间人,负责接收消息并转发给需要它的服务。它的主要作用包括:
异步处理:将耗时任务放到后台执行,前端快速响应。 削峰填谷:高峰期的请求被缓存在队列中,按服务的处理能力逐步消化。 服务解耦:不同模块之间通过消息通信,减少直接依赖。
简单来说,消息队列就像发邮件:发件人把邮件交给邮局(消息队列),邮局再分发给收件人。这样发件人无需等待收件人立即接收邮件。
SpringBoot快速集成RabbitMQ
1. 引入依赖
首先,我们需要在项目中添加RabbitMQ的依赖。打开你的pom.xml
,添加以下内容:
ounter(lineounter(lineounter(lineounter(line
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
温馨提示:确保你的RabbitMQ服务已安装并运行。如果没有安装,可以参考官方文档或使用Docker快速启动。
2. 配置RabbitMQ
接下来,我们需要配置RabbitMQ的连接信息。可以通过Java配置类实现:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
@Configuration
public class RabbitMQConfig {
private final String host = "localhost";
private final int port = 5672;
private final String virtualHost = "/";
private final String username = "guest";
private final String password = "guest";
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory(host);
factory.setPort(port);
factory.setVirtualHost(virtualHost);
factory.setUsername(username);
factory.setPassword(password);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
}
这里我们配置了RabbitMQ的地址、端口、虚拟主机、用户名和密码。CachingConnectionFactory
负责管理与RabbitMQ的连接,而RabbitTemplate
是我们发送消息的工具。
温馨提示:以上配置也可以直接写到
application.yml
中,更简单!
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3. 消息发送与接收
(1) 编写消息发送方
假设我们需要上传一首音乐,并将上传任务交由后台处理。我们可以将消息发送到RabbitMQ队列:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
@RestController
@RequestMapping("/music")
public class MusicController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/upload")
public String uploadMusic(@RequestParam String name,
@RequestParam MultipartFile coverFile,
@RequestParam MultipartFile musicFile) throws IOException {
// 构造消息对象
MusicUploadMessage message = new MusicUploadMessage(name, coverFile.getBytes(), musicFile.getBytes());
// 发送消息到队列
rabbitTemplate.convertAndSend("music_queue", message);
return "音乐上传请求已提交,等待后台处理!";
}
}
这里我们定义了一个/upload
接口,将音乐文件打包成消息对象发送到名为music_queue
的队列中。
(2) 编写消息接收方
接收方监听队列中的消息,并处理音乐上传任务:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
@Component
@Slf4j
public class MusicUploadListener {
@RabbitListener(queues = "music_queue")
public void handleMusicUpload(MusicUploadMessage message) {
try {
// 模拟文件上传和数据库写入
String coverPath = "上传的封面路径";
String musicPath = "上传的音乐路径";
log.info("音乐上传成功:封面路径={},音乐路径={}", coverPath, musicPath);
} catch (Exception e) {
log.error("处理音乐上传失败:", e);
}
}
}
@RabbitListener
注解用来监听指定队列的消息。一旦有消息到达队列,方法handleMusicUpload
就会被触发。
4. 使用Fanout模式(广播)
有时,我们希望一条消息能够被多个消费者同时消费。例如,一首歌上传成功后,我们可能需要:
发送通知给用户。 更新音乐推荐列表。
这时可以使用Fanout模式(广播)。消息会被发送到交换机,交换机会将消息分发给所有绑定的队列。
(1) 配置交换机和队列
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
@Configuration
public class RabbitMQFanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("music_exchange");
}
@Bean
public Queue notificationQueue() {
return new Queue("notification_queue");
}
@Bean
public Queue recommendationQueue() {
return new Queue("recommendation_queue");
}
@Bean
public Binding bindNotificationQueue(FanoutExchange fanoutExchange, Queue notificationQueue) {
return BindingBuilder.bind(notificationQueue).to(fanoutExchange);
}
@Bean
public Binding bindRecommendationQueue(FanoutExchange fanoutExchange, Queue recommendationQueue) {
return BindingBuilder.bind(recommendationQueue).to(fanoutExchange);
}
}
这里我们创建了一个Fanout交换机,以及两个队列notification_queue
和recommendation_queue
。
(2) 修改发送方
将消息发送到交换机,而不是队列:
ounter(line
rabbitTemplate.convertAndSend("music_exchange", "", message);
(3) 修改接收方
分别处理两个队列的消息:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line
@Component
@Slf4j
public class NotificationListener {
@RabbitListener(queues = "notification_queue")
public void sendNotification(MusicUploadMessage message) {
log.info("发送通知:音乐《{}》上传成功!", message.getName());
}
}
@Component
@Slf4j
public class RecommendationListener {
@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(MusicUploadMessage message) {
log.info("更新推荐列表:音乐《{}》已加入推荐!", message.getName());
}
}
总结
消息队列的价值:解耦、异步、削峰等。 核心步骤:
配置RabbitMQ连接。 定义队列、交换机。 使用 RabbitTemplate
发送消息。使用 @RabbitListener
接收消息。
Fanout(广播):消息分发给多个队列。 Direct(定向):根据路由键精准投递。 Topic(通配符):支持模糊匹配的消息投递。
消息队列虽然强大,但使用时要注意消息可靠性、重复消费、消息堆积等问题。希望今天的内容能让你对SpringBoot与RabbitMQ的整合有一个清晰的认知,赶快动手试试吧!