新选择!基于Spring Boot监听MySQL日志Binlog实现数据实时同步

文摘   2024-11-22 08:02   新疆  

最新实战案例锦集:《Spring Boot3实战案例合集》持续更新,每天至少更新一篇文章,订阅后将赠送文章最后展示的所有MD文档(学习笔记)以及 合集全部源码。

环境:SpringBoot3.2.5



1. 简介

MySQL与Redis数据实时同步是将MySQL数据库中的数据变化实时地反映到Redis缓存系统中的过程。MySQL是一款稳定的关系型数据库,适合做持久化存储;而Redis是一个高性能的内存数据库,适合做缓存和实时数据处理。将两者结合使用,可以充分发挥各自的优势,提升系统性能和稳定性。

实现MySQL与Redis数据实时同步有多种方法,如使用MySQL的二进制日志(Binlog)配合Canal或Debezium等工具。此外,还可以在应用层进行双写操作或使用消息队列实现数据同步。

MySQL与Redis数据实时同步的主要目的是优化性能和保持数据一致性。通过将热点数据存储在Redis中,可以大大提高系统的访问速度,同时确保MySQL中的数据变化能够实时反映到Redis中,避免数据不一致的问题。这种同步机制在电商、社交等需要高并发访问和实时数据更新的场景中尤为重要。

在之前的二篇文章中介绍了关于MySQL与Redis同步的实现方案,有需要学习的可以查看下面的链接:

SpringBoot整合Flink CDC,实时追踪数据变动,无缝同步至Redis

在SpringBoot中通过Canal实现MySQL与Redis的数据同步

本篇文章我将介绍另外一款非常不错开源的组件mysql-binlog-connector-java。通过名称就能知道他是通过连接MySQL binlog日志来实现数据监听的。该组件不仅仅是能够实时监听binlog的变化,而且你还可以直接去读取binlog日志文件解析其内容。

该组件具备以下特性:

  • 自动解析二进制日志文件名/位置 | GTID 解析

  • 断开连接可恢复

  • 件化的故障转移策略

  • 支持 binlog_checksum=CRC32(适用于 MySQL 5.6.2+ 用户)

  • 过 TLS 进行安全通信

  • 友好的Java管理扩展(JMX)

  • 实时统计

  • Maven Central 上可用

  • 无第三方依赖,跨不同版本的 MySQL 发行版的测试套件

接下来,我将通过如下几方面介绍该组件在项目中的使用:

  • 编程解析binlog日志

  • 实时监听binlog日志

  • 通过JMX暴露binlog客户端

     

2. 实战案例

环境准备

<dependency>  <groupId>com.zendesk</groupId>  <artifactId>mysql-binlog-connector-java</artifactId>  <version>0.30.1</version></dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

当前mysql-binlog-connector-java最新版本为0.30.1。你可以通过下面地址查看仓库版本情况:

https://mvnrepository.com/artifact/com.zendesk/mysql-binlog-connector-java

注意:你的确定你开启了binlog日志

SHOW VARIABLES LIKE '%log_bin%'

通过上面的命令查看状态。

2.1 编程读取binlog日志

public static void main(String[] args) throws Exception {
File binlogFile = new File("C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000032") ; EventDeserializer eventDeserializer = new EventDeserializer() ;  // 设置兼容性模式 eventDeserializer.setCompatibilityMode(    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer); try { for (Event event; (event = reader.readEvent()) != null;) { EventData data = event.getData() ;      // 判断事件的类型 if (data instanceof WriteRowsEventData ed) { List<Serializable[]> rows = ed.getRows() ; rows.forEach(row -> { for (Serializable s : row) { if (s instanceof byte[] bs) { System.err.print(new String(bs) + "\t") ; } else { System.err.print(s + "\t") ; } } System.out.println() ; });      } else if (data instanceof QueryEventData ed) { System.out.printf("查询事件:%s%n", ed.getSql()) ; } else if (data instanceof DeleteRowsEventData ed) { System.err.println("删除事件") ; } else if (data instanceof TableMapEventData ed) { String database = ed.getDatabase() ; String table = ed.getTable() ; System.out.printf("数据库: %s, 表名: %s%n", database, table) ; } } } finally { reader.close(); }}

该组件定义了如下的事件类型

上面程序输出结果

查询事件:BEGIN数据库: testjpa, 表名: t_person删除事件查询事件:BEGIN数据库: testjpa, 表名: t_person2520  30  姓名 - 30  查询事件:BEGIN数据库: testjpa, 表名: t_person2521  44  姓名 - 44  查询事件:BEGIN数据库: testjpa, 表名: t_person2522  92  姓名 - 92  查询事件:BEGIN数据库: testjpa, 表名: t_person2523  71  姓名 - 71

正确的读取binlog日志中的信息。

2.2 实时监听Binlog日志

@Componentpublic class MySQLToRedisComponent implements CommandLineRunner {
public void listener() { BinaryLogClient client = new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123"); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); client.registerEventListener(new EventListener() { @Override public void onEvent(Event event) { EventHeader header = event.getHeader() ; switch(header.getEventType()) { case EXT_WRITE_ROWS: WriteRowsEventData writeData = event.getData() ; List<Serializable[]> rows = writeData.getRows() ; for (Serializable row : rows) { if (row.getClass().isArray()) { printRow(row); } } break ; case EXT_UPDATE_ROWS: UpdateRowsEventData updateData = event.getData() ; BitSet columns = updateData.getIncludedColumns() ; System.err.printf("更新列: %s%n", columns) ; List<Entry<Serializable[], Serializable[]>> updateRows = updateData.getRows() ; for (Entry<Serializable[], Serializable[]> entry : updateRows) { printRow(entry.getKey()) ; System.out.println(">>>>>>>>>>>>>>>>>>>>>before") ; printRow(entry.getValue()) ; System.out.println(">>>>>>>>>>>>>>>>>>>>>after") ; } break ; case EXT_DELETE_ROWS: DeleteRowsEventData deleteData = event.getData() ; List<Serializable[]> deleteRow = deleteData.getRows() ; for (Serializable row : deleteRow) { if (row.getClass().isArray()) { printRow(row); } } break ; case TABLE_MAP: TableMapEventData data = event.getData() ; System.out.printf("变更表: %s.%s%n", data.getDatabase(), data.getTable()) ; break ; default: break ; } } private void printRow(Serializable row) { Serializable[] ss = (Serializable[]) row ; for (Serializable s : ss) { if (s.getClass().isArray()) { System.out.print(new String((byte[])s) + "\t") ; } else { System.out.print(s + "\t") ; } } System.out.println() ; } });    client.connect();  } public void run(String... args) throws Exception { this.listener() ; }}

以上监听程序,我们仅对部分事件进行了监听处理。当数据发生变化后,输出如下:

变更表: test.t_person更新列: {0, 1, 2}1  张三  66  >>>>>>>>>>>>>>>>>>>>>before1  张三  22  >>>>>>>>>>>>>>>>>>>>>after

更序列使用了BitSet表示,所以如果你要与具体的列想对应,你还应该执行如下的语句来确定具体的列名:

mysql> describe t_person;+-------+--------------+------+-----+---------+----------------+| Field | Type         | Null | Key | Default | Extra          |+-------+--------------+------+-----+---------+----------------+| id    | int          | NO   | PRI | NULL    | auto_increment || name  | varchar(255) | YES  |     | NULL    |                || age   | int          | YES  |     | NULL    |                |+-------+--------------+------+-----+---------+----------------+

你可以通过JDBC的方式执行该语句获取对应的列,也可以通过Socket方式发送命令获取结果。推荐还是JDBC。

2.3 JMX暴露Binlog客户端

在Spring Boot中,我们可以非常方便的通过JMX暴露binlog客户端的相关操作,如下示例:

@BeanBinaryLogClient client() {  return new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123") ;}
@BeanMBeanExporter exporterClient(BinaryLogClient client) { MBeanExporter exporter = new MBeanExporter(); exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClient", client)) ; return exporter;}
@BeanMBeanExporter exporterClientStatistics(BinaryLogClient client) { MBeanExporter exporter = new MBeanExporter(); BinaryLogClientStatistics stats = new BinaryLogClientStatistics(client); exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClientStatistics", stats)) ; return exporter;}

启动应用后,通过JConsole查看

本案例源码已完整实现监听数据变化转对象,最后写入Redis。有需要的请留言!!!

以上是本篇文章的全部内容,如对你有帮助帮忙点赞+转发+收藏

推荐文章

不写一行代码通过UI界面配置HTTP接口

解锁Spring资源Resource的强大功能,提升开发效率

AOP高级应用,优雅实现异常重试机制

@Qualifier高级技巧,你会几个?深入底层原理

必学!Spring Boot结合MDC全方位的日志跟踪(支持跨线程)

Spring强大的URI操作工具类太方便了

使用Vault保护SpringBoot配置文件中的敏感数据

你的项目中是否运用了@ResponseStatus注解?你真的会用?

【Spring】来试试这道面试题你能答上来吗?

是否还记得SpringMVC中的@MatrixVariable注解?

详解基于SpringBoot实现多数据源动态切换及原理

基于Spring Boot给所有Controller接口添加统一前缀的5种方式

Spring全家桶实战案例源码
spring, springboot, springcloud 案例开发详解
 最新文章