大家好,我是鹏磊。
一、案例背景
二、确定方案
技术栈使用的springboot,因此,这里最好以starter的方式提供 二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的 kafkaTemplate
,因为集成方有可能已经使用了kafkaTemplate
。不能与集成方造成冲突。如果你近期准备面试跳槽,建议在ddkk.com在线刷题,涵盖 一万+ 道 Java 面试题,几乎覆盖了所有主流技术面试题,还有市面上最全的技术五百套,精品系列教程,免费提供。 减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。 发送消息这个操作需要支持事务,尽量不影响主业务
三、TransactionSynchronizationManager显神威
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void sendLog() {
// 判断当前是否存在事务
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
// 无事务,异步发送消息给kafka
executor.submit(() -> {
// 发送消息给kafka
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
return;
}
// 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事务提交后,再异步发送消息给kafka
executor.submit(() -> {
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
}
}
});
}
TransactionSynchronizationManager
的使用。3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive()
方法显神威
// TransactionSynchronizationManager.java类内部的部分代码
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization
,其源码如下所示:/**
* Activate transaction synchronization for the current thread.
* Called by a transaction manager on transaction begin.
* @throws IllegalStateException if synchronization is already active
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}
isSynchronizationActive
返回true,则代表当前有事务。因此,结合这两个方法我们是指能解决我们最开始提出的疑问:要如何判断当前是否存在事务3.2、如何在事务提交后触发自定义逻辑?TransactionSynchronizationManager.registerSynchronization()
方法显神威
/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
synchronizations
线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?synchronizations
内部添加了一个TransactionSynchronizationAdapter
,内部并重写了afterCompletion
方法,其代码如下所示:TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事务提交后,再异步发送消息给kafka
executor.submit(() -> {
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
}
}
});
registerSynchronization
的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet
添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”。这是什么意思呢?invokeAfterCommit
和invokeAfterCompletion
这两个方法来选了。TransactionSynchronization
的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter
)。但是要注意一点:invokeAfterCommit
只能拿到集合,invokeAfterCompletion
除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。invokeAfterCompletion
方法,我们除了能拿到集合外,还能拿到当前事务的状态。因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。四、总结
上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。
来源:juejin.cn/post/6984574787511123999