学会Springboot快速整合MQ,轻松玩转编码

文摘   2024-12-13 23:02   江苏  

消息队列(Message Queue, MQ)作为现代分布式系统中非常重要的组件之一,能帮助我们解耦、削峰、提高系统吞吐量。今天,我们将用SpringBoot快速集成RabbitMQ,实现消息队列的基础模型和常用的发布订阅模式,让你轻松玩转MQ!


什么是消息队列?它能帮我们做什么?

消息队列就像一个中间人,负责接收消息并转发给需要它的服务。它的主要作用包括:

  1. 异步处理:将耗时任务放到后台执行,前端快速响应。
  2. 削峰填谷:高峰期的请求被缓存在队列中,按服务的处理能力逐步消化。
  3. 服务解耦:不同模块之间通过消息通信,减少直接依赖。

简单来说,消息队列就像发邮件:发件人把邮件交给邮局(消息队列),邮局再分发给收件人。这样发件人无需等待收件人立即接收邮件。


SpringBoot快速集成RabbitMQ

1. 引入依赖

首先,我们需要在项目中添加RabbitMQ的依赖。打开你的pom.xml,添加以下内容:

ounter(lineounter(lineounter(lineounter(line<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

温馨提示:确保你的RabbitMQ服务已安装并运行。如果没有安装,可以参考官方文档或使用Docker快速启动。


2. 配置RabbitMQ

接下来,我们需要配置RabbitMQ的连接信息。可以通过Java配置类实现:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@Configurationpublic class RabbitMQConfig {
    private final String host = "localhost";    private final int port = 5672;    private final String virtualHost = "/";    private final String username = "guest";    private final String password = "guest";
    @Bean    public CachingConnectionFactory connectionFactory() {        CachingConnectionFactory factory = new CachingConnectionFactory(host);        factory.setPort(port);        factory.setVirtualHost(virtualHost);        factory.setUsername(username);        factory.setPassword(password);        return factory;    }
    @Bean    public RabbitTemplate rabbitTemplate() {        return new RabbitTemplate(connectionFactory());    }}

这里我们配置了RabbitMQ的地址、端口、虚拟主机、用户名和密码。CachingConnectionFactory负责管理与RabbitMQ的连接,而RabbitTemplate是我们发送消息的工具。

温馨提示:以上配置也可以直接写到application.yml中,更简单!

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(linespring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest

3. 消息发送与接收

(1) 编写消息发送方

假设我们需要上传一首音乐,并将上传任务交由后台处理。我们可以将消息发送到RabbitMQ队列:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@RestController@RequestMapping("/music")public class MusicController {
    @Autowired    private RabbitTemplate rabbitTemplate;
    @PostMapping("/upload")    public String uploadMusic(@RequestParam String name,                              @RequestParam MultipartFile coverFile,                              @RequestParam MultipartFile musicFile) throws IOException {        // 构造消息对象        MusicUploadMessage message = new MusicUploadMessage(name, coverFile.getBytes(), musicFile.getBytes());                // 发送消息到队列        rabbitTemplate.convertAndSend("music_queue", message);                return "音乐上传请求已提交,等待后台处理!";    }}

这里我们定义了一个/upload接口,将音乐文件打包成消息对象发送到名为music_queue的队列中。


(2) 编写消息接收方

接收方监听队列中的消息,并处理音乐上传任务:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@Component@Slf4jpublic class MusicUploadListener {
    @RabbitListener(queues = "music_queue")    public void handleMusicUpload(MusicUploadMessage message) {        try {            // 模拟文件上传和数据库写入            String coverPath = "上传的封面路径";            String musicPath = "上传的音乐路径";
            log.info("音乐上传成功:封面路径={},音乐路径={}", coverPath, musicPath);        } catch (Exception e) {            log.error("处理音乐上传失败:", e);        }    }}

@RabbitListener注解用来监听指定队列的消息。一旦有消息到达队列,方法handleMusicUpload就会被触发。


4. 使用Fanout模式(广播)

有时,我们希望一条消息能够被多个消费者同时消费。例如,一首歌上传成功后,我们可能需要:

  1. 发送通知给用户。
  2. 更新音乐推荐列表。

这时可以使用Fanout模式(广播)。消息会被发送到交换机,交换机会将消息分发给所有绑定的队列。

(1) 配置交换机和队列

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@Configurationpublic class RabbitMQFanoutConfig {
    @Bean    public FanoutExchange fanoutExchange() {        return new FanoutExchange("music_exchange");    }
    @Bean    public Queue notificationQueue() {        return new Queue("notification_queue");    }
    @Bean    public Queue recommendationQueue() {        return new Queue("recommendation_queue");    }
    @Bean    public Binding bindNotificationQueue(FanoutExchange fanoutExchange, Queue notificationQueue) {        return BindingBuilder.bind(notificationQueue).to(fanoutExchange);    }
    @Bean    public Binding bindRecommendationQueue(FanoutExchange fanoutExchange, Queue recommendationQueue) {        return BindingBuilder.bind(recommendationQueue).to(fanoutExchange);    }}

这里我们创建了一个Fanout交换机,以及两个队列notification_queuerecommendation_queue


(2) 修改发送方

将消息发送到交换机,而不是队列:

ounter(linerabbitTemplate.convertAndSend("music_exchange", "", message);

(3) 修改接收方

分别处理两个队列的消息:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@Component@Slf4jpublic class NotificationListener {
    @RabbitListener(queues = "notification_queue")    public void sendNotification(MusicUploadMessage message) {        log.info("发送通知:音乐《{}》上传成功!", message.getName());    }}
@Component@Slf4jpublic class RecommendationListener {
    @RabbitListener(queues = "recommendation_queue")    public void updateRecommendation(MusicUploadMessage message) {        log.info("更新推荐列表:音乐《{}》已加入推荐!", message.getName());    }}

总结

  1. 消息队列的价值:解耦、异步、削峰等。
  2. 核心步骤
  • 配置RabbitMQ连接。
  • 定义队列、交换机。
  • 使用RabbitTemplate发送消息。
  • 使用@RabbitListener接收消息。
  • 扩展模式
    • Fanout(广播):消息分发给多个队列。
    • Direct(定向):根据路由键精准投递。
    • Topic(通配符):支持模糊匹配的消息投递。

    消息队列虽然强大,但使用时要注意消息可靠性、重复消费、消息堆积等问题。希望今天的内容能让你对SpringBoot与RabbitMQ的整合有一个清晰的认知,赶快动手试试吧!

    夜半探案
    每日一案,一案一法,一起学习生活中的法律知识。
     最新文章