今天咱们聊点稍微硬核一点的内容:Spring Boot
配合事务钩子函数!
要知道,平时写业务代码,事务处理就是“家常便饭”。
可当我们要在一个支付系统中搞资金流水记录,又想顺便把消息推送到 Kafka
里,事情就不那么简单了。
这里,我就说说最近鼓捣出来的一个方案,用 Spring Boot
搞个二方库,还真省了不少事儿!
案例背景
场景是这样的:假设有个支付系统,业务需要记录每一笔资金流水,还得及时归档。为了能稳定地记录和推送这些数据,咱们计划做一个二方库来帮忙处理。
库的核心任务就是确保每条流水在事务成功后发送到 Kafka
,这样消费者就能从 Kafka
中接收到消息,并安全地存档。
哎,不用说,这事听起来就有点小复杂,不过我们可以用 Spring Boot
+ 事务钩子函数来轻松搞定。
方案确定
要实现上述需求,咱们来聊聊二方库的设计。主要考虑以下几点:
二方库采用 Spring Boot starter 方式:这样,使用方可以直接在 pom.xml
中引入依赖,轻松上手,扩展性还高。直接使用 Kafka API 而不是 Spring KafkaTemplate:虽然 KafkaTemplate
是个不错的选择,但直接用Kafka
原生 API 可以更灵活,还能避开一些模板的限制。简单易用的 API:库的 API 要简洁,让业务方用起来轻松自如,不会有一堆配置和初始化代码。 支持事务,且尽量不影响主业务:这个是重头戏。毕竟,资金流水的记录需要强一致性,我们必须保证事务提交后消息才能被发出,不能随随便便影响到主业务。
下面就来细说下用到的“神器”——TransactionSynchronizationManager
!
TransactionSynchronizationManager 的使用
这个类真是个宝,它可以帮我们检测事务状态、注册事务同步器,从而在事务提交后自动触发自定义逻辑。整个流程都可以异步化,真是为我们这类场景量身定做的。
1. 判断是否存在事务
首先,我们得判断当前是否处在事务中,防止“还没开始就开跑”。使用 TransactionSynchronizationManager.isSynchronizationActive()
就能轻松搞定,它会返回一个布尔值,告诉我们是否有事务激活:
if (TransactionSynchronizationManager.isSynchronizationActive()) {
System.out.println("当前有事务活跃!");
} else {
System.out.println("没有事务,别乱动!");
}
这里要特别注意,它依赖于线程变量 ThreadLocal
。因为事务状态是绑定在线程上的,所以这个方法特别适合咱们的二方库逻辑。只要事务开了,我们的代码就能稳妥地判断并执行。
2. 在事务提交后触发自定义逻辑
既然知道事务在跑,那接下来的任务就是在事务提交后执行咱们的“额外操作”——把流水消息发送到 Kafka
。这时候,咱们需要用到 TransactionSynchronizationManager.registerSynchronization()
方法。
这方法允许我们注册一个同步器,比如 TransactionSynchronizationAdapter
,并重写 afterCompletion
方法。当事务提交或回滚后,该方法就会被触发。
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == STATUS_COMMITTED) {
// 事务提交成功后执行的逻辑
sendMessageToKafka();
} else {
// 事务回滚,打个日志吧
System.out.println("事务回滚了,消息不发送");
}
}
});
在上面的代码中,我们注册了 TransactionSynchronizationAdapter
,并在 afterCompletion
方法中检查事务的状态。如果事务成功提交,我们就调用 sendMessageToKafka()
把消息发出去。
这里为什么不在事务内直接发消息呢?
事务内发消息的后果可能是:事务还没提交,Kafka
消息却已经出去了,这样数据状态就不一致了。比如,用户钱扣了,消息还没发;或者消息发出去了,钱没扣。这种情况对金融系统就是“灾难级别”的!所以,通过在事务提交后触发钩子函数,可以确保数据和消息一致性,大大降低业务风险。
代码示例:二方库的主流程
为了让二方库尽量简单好用,可以设计一个类似于 TransactionManager
的工具类来包装这些逻辑。比如:
public class CustomTransactionManager {
public static void executeWithTransaction(Runnable mainBusinessLogic, Runnable afterCommitLogic) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 执行主业务逻辑
mainBusinessLogic.run();
// 注册事务同步器,在事务提交后执行额外逻辑
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == STATUS_COMMITTED) {
afterCommitLogic.run();
}
}
});
} else {
throw new IllegalStateException("当前没有事务,不要贸然执行!");
}
}
}
调用的时候,只需要传入两个参数:主业务逻辑(比如扣钱)、事务提交后的逻辑(比如发 Kafka
消息):
CustomTransactionManager.executeWithTransaction(
() ->扣钱业务代码,
() -> sendMessageToKafka()
);
用法上是不是很简单?传两个 Runnable
对象,扣钱和发送消息的逻辑就顺利接上了。
事务和钩子函数的实际运用
我们在上面的代码示例中,通过 TransactionSynchronizationManager
确保了事务和消息一致性。这种模式的好处是什么呢?简单总结几个要点:
避免重复发消息:使用 ThreadLocal
变量,可以保证即使在多线程环境下,也不会发生重复消息发送的情况。事务一致性:事务成功后才发消息,保证了业务和消息的一致性,尤其在支付系统中,这点非常重要。 灵活扩展:如果以后要扩展更多的逻辑,只需多注册几个同步器,不影响主业务代码。
通过这样的设计,二方库不仅符合“好用”的原则,还具备了灵活性和高可靠性。对于支付系统、订单系统等需要保证高一致性的业务场景,简直就是个宝藏设计。
最后的一些感悟
这一波搞下来,我感到 Spring
的事务管理真是无敌了。想起当年刚入门 Spring
,看见事务总觉得有点头大,没想到现在可以用事务钩子搞出这么实用的二方库。
而 Kafka
和事务钩子结合更是让业务逻辑变得简洁优雅,再也不用担心事务里一不小心的“越界操作”了。
至于代码中的例子,大家可以根据自己的场景稍微改一下。比如,有些场景可能需要事务失败时的补偿机制,这种情况下,可以再加个逻辑处理 STATUS_ROLLED_BACK
状态。
总之,灵活运用 Spring Boot
和 TransactionSynchronizationManager
,事务钩子的威力就全释放了。
-END-
以上,就是今天的分享了,看完文章记得右下角给何老师点赞,也欢迎在评论区写下你的留言。