SpringBoot整合Canal+RabbitMQ监听数据变更~

文摘   2024-12-28 12:02   陕西  

今天我们来聊聊如何用 Spring Boot 整合 Canal 和 RabbitMQ 来监听 MySQL 的 binlog 数据变更。

这种架构其实在很多大数据系统或者微服务架构中都被广泛应用,尤其是在需要实时同步数据库变更的场景中,能大大提升数据一致性和系统性能。

一、架构设计概述

在这个场景中,Canal 作为 MySQL binlog 的监听者,能够捕获数据库表的所有变化(比如插入、更新、删除)。这些变化会通过 RabbitMQ 发送到消息队列中,其他服务可以异步消费这些数据并进行处理,比如更新缓存、触发后续业务逻辑等。这个架构有很多好处,尤其是在解耦和异步处理上,系统的稳定性和扩展性都能得到有效提升。

二、环境搭建

首先,想要让这个架构运作起来,我们需要在本地搭建一些基本环境:MySQL、RabbitMQ 和 Canal。

1. MySQL 配置

MySQL 要开启 binlog,且建议采用 ROW 格式,这样才能捕获表中每一行数据的变动,而不是简单的语句变动。可以通过以下命令来检查 MySQL 是否已经启用了 binlog:

SHOW VARIABLES LIKE 'log_bin';  -- 确保返回的值是 ON
SHOW VARIABLES LIKE 'binlog_format';  -- 确保是 ROW 格式

2. RabbitMQ 配置

RabbitMQ 是我们的消息队列,用于传输数据变更记录。在 Docker 中启动 RabbitMQ 非常简单,下面是一个 docker-compose.yml 示例:

version: '3.1'

services:
  mysql:
    image: mysql:5.7
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "3306:3306"
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
  canal:
    image: canal/canal-server
    ports:
      - "11111:11111"
    environment:
      - CANAL_ADAPTERS=canal-adapter-rabbitmq

3. Canal 配置

Canal 监听 MySQL 的 binlog 并将变化发送到 RabbitMQ。首先,我们需要配置 Canal 的 canal.properties 文件:

canal.serverMode = rabbitMQ
rabbitmq.host = rabbitmq
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2 # 阻塞模式,持久化消息

然后,配置 instance.properties 文件来连接 MySQL:

canal.instance.master.address=host.docker.internal:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root

三、Spring Boot 整合 Canal 客户端

在 Spring Boot 中,整合 Canal 客户端比较简单。我们通过 Maven 添加必要的依赖:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.5</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>1.1.5</version>
</dependency>

四、监听 MySQL binlog 变更

接下来,我们编写 Canal 客户端代码来监听 MySQL 的 binlog 变更。通过 CanalConnector,我们可以连接到 Canal 服务器,订阅 MySQL 的所有表,并持续获取数据变更。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements CommandLineRunner {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void run(String... args) throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost"11111), "canal""canal""canal");
        try {
            connector.connect();
            connector.subscribe(".*\\..*");  // 订阅所有表
            connector.rollback();  // 回滚到未确认的位置

            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    // 没有数据,休眠
                    Thread.sleep(2000);
                } else {
                    // 有数据,处理数据
                    processEntries(message.getEntries());
                }
                // 确认消息
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    private void processEntries(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("Error parsing binlog entry", e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("DELETE: " + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("INSERT: " + rowData.getAfterColumnsList());
                } else {
                    System.out.println("UPDATE: Before: " + rowData.getBeforeColumnsList() + ", After: " + rowData.getAfterColumnsList());
                }
            }
        }
    }
}

五、将数据推送到 RabbitMQ

一旦 Canal 客户端成功监听到 binlog 的变化,我们可以通过 RabbitMQ 将数据推送到消息队列。在 Spring Boot 中集成 RabbitMQ 非常简单,只需要添加 spring-boot-starter-amqp 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在 application.yml 中配置 RabbitMQ 的连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

创建一个简单的消息发送类,用来将 binlog 变化推送到 RabbitMQ:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("canal-exchange""canal-routing-key", message);
        System.out.println("Sent message: " + message);
    }
}

在 Canal 客户端中,监听到变更后发送消息到 RabbitMQ:

@Autowired
private RabbitMqSender rabbitMqSender;

private void processEntries(List<CanalEntry.Entry> entries) {
    for (CanalEntry.Entry entry : entries) {
        // Process event types and send message to RabbitMQ
        rabbitMqSender.send("EventType: " + entry.getEventType() + ", Data: " + entry.getRowData());
    }
}

六、启动与测试

最后,启动所有服务(MySQL、Canal、RabbitMQ),然后启动 Spring Boot 应用程序。Canal 会监听 MySQL 的 binlog 变更,并将变更数据通过 RabbitMQ 发送出去。

这样,我们就完成了通过 Canal 和 RabbitMQ 实现 MySQL 数据库变更监听和异步处理的功能。🎉


这个方案很适合做一些实时数据处理的应用,比如缓存更新、异步日志处理等。而且,借助消息队列的异步特性,整个系统能应对更高的并发量,避免了数据流通的瓶颈。希望大家能从中获得一些灵感,快来试试吧!

-END-


ok,今天先说到这,老规矩,给大家分享一份不错的副业资料,感兴趣的同学找我领取。

以上,就是今天的分享了,看完文章记得右下角给何老师点赞,也欢迎在评论区写下你的留言

程序员老鬼
10年+老程序员,专注于AI知识普及,已打造多门AI课程,本号主要分享国内AI工具、AI绘画提示词、Chat教程、AI换脸、Chat中文指令、Sora教程等,帮助读者解决AI工具使用疑难问题。
 最新文章