今天我们来聊聊如何用 Spring Boot 整合 Canal 和 RabbitMQ 来监听 MySQL 的 binlog 数据变更。
这种架构其实在很多大数据系统或者微服务架构中都被广泛应用,尤其是在需要实时同步数据库变更的场景中,能大大提升数据一致性和系统性能。
一、架构设计概述
在这个场景中,Canal 作为 MySQL binlog 的监听者,能够捕获数据库表的所有变化(比如插入、更新、删除)。这些变化会通过 RabbitMQ 发送到消息队列中,其他服务可以异步消费这些数据并进行处理,比如更新缓存、触发后续业务逻辑等。这个架构有很多好处,尤其是在解耦和异步处理上,系统的稳定性和扩展性都能得到有效提升。
二、环境搭建
首先,想要让这个架构运作起来,我们需要在本地搭建一些基本环境:MySQL、RabbitMQ 和 Canal。
1. MySQL 配置
MySQL 要开启 binlog,且建议采用 ROW
格式,这样才能捕获表中每一行数据的变动,而不是简单的语句变动。可以通过以下命令来检查 MySQL 是否已经启用了 binlog:
SHOW VARIABLES LIKE 'log_bin'; -- 确保返回的值是 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 确保是 ROW 格式
2. RabbitMQ 配置
RabbitMQ 是我们的消息队列,用于传输数据变更记录。在 Docker 中启动 RabbitMQ 非常简单,下面是一个 docker-compose.yml
示例:
version: '3.1'
services:
mysql:
image: mysql:5.7
environment:
MYSQL_ROOT_PASSWORD: root
ports:
- "3306:3306"
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
canal:
image: canal/canal-server
ports:
- "11111:11111"
environment:
- CANAL_ADAPTERS=canal-adapter-rabbitmq
3. Canal 配置
Canal 监听 MySQL 的 binlog 并将变化发送到 RabbitMQ。首先,我们需要配置 Canal 的 canal.properties
文件:
canal.serverMode = rabbitMQ
rabbitmq.host = rabbitmq
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2 # 阻塞模式,持久化消息
然后,配置 instance.properties
文件来连接 MySQL:
canal.instance.master.address=host.docker.internal:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root
三、Spring Boot 整合 Canal 客户端
在 Spring Boot 中,整合 Canal 客户端比较简单。我们通过 Maven 添加必要的依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
四、监听 MySQL binlog 变更
接下来,我们编写 Canal 客户端代码来监听 MySQL 的 binlog 变更。通过 CanalConnector,我们可以连接到 Canal 服务器,订阅 MySQL 的所有表,并持续获取数据变更。
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements CommandLineRunner {
private final static int BATCH_SIZE = 1000;
@Override
public void run(String... args) throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "canal", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有表
connector.rollback(); // 回滚到未确认的位置
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 没有数据,休眠
Thread.sleep(2000);
} else {
// 有数据,处理数据
processEntries(message.getEntries());
}
// 确认消息
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Error parsing binlog entry", e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
System.out.println("DELETE: " + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
System.out.println("INSERT: " + rowData.getAfterColumnsList());
} else {
System.out.println("UPDATE: Before: " + rowData.getBeforeColumnsList() + ", After: " + rowData.getAfterColumnsList());
}
}
}
}
}
五、将数据推送到 RabbitMQ
一旦 Canal 客户端成功监听到 binlog 的变化,我们可以通过 RabbitMQ 将数据推送到消息队列。在 Spring Boot 中集成 RabbitMQ 非常简单,只需要添加 spring-boot-starter-amqp
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后在 application.yml
中配置 RabbitMQ 的连接信息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
创建一个简单的消息发送类,用来将 binlog 变化推送到 RabbitMQ:
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("canal-exchange", "canal-routing-key", message);
System.out.println("Sent message: " + message);
}
}
在 Canal 客户端中,监听到变更后发送消息到 RabbitMQ:
@Autowired
private RabbitMqSender rabbitMqSender;
private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
// Process event types and send message to RabbitMQ
rabbitMqSender.send("EventType: " + entry.getEventType() + ", Data: " + entry.getRowData());
}
}
六、启动与测试
最后,启动所有服务(MySQL、Canal、RabbitMQ),然后启动 Spring Boot 应用程序。Canal 会监听 MySQL 的 binlog 变更,并将变更数据通过 RabbitMQ 发送出去。
这样,我们就完成了通过 Canal 和 RabbitMQ 实现 MySQL 数据库变更监听和异步处理的功能。🎉
这个方案很适合做一些实时数据处理的应用,比如缓存更新、异步日志处理等。而且,借助消息队列的异步特性,整个系统能应对更高的并发量,避免了数据流通的瓶颈。希望大家能从中获得一些灵感,快来试试吧!
-END-
以上,就是今天的分享了,看完文章记得右下角给何老师点赞,也欢迎在评论区写下你的留言。