👉 这是一个或许对你有用的社群
🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料:
《项目实战(视频)》:从书中学,往事中“练” 《互联网高频面试题》:面朝简历学习,春暖花开 《架构 x 系统设计》:摧枯拉朽,掌控面试高频场景题 《精进 Java 学习指南》:系统学习,互联网主流技术栈 《必读 Java 源码专栏》:知其然,知其所以然
👉这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:
Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud 视频教程:https://doc.iocoder.cn 【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本
来源:juejin.cn/post/
7265624177775558716
前言
最近开发了一个内部消息组件,逻辑大体是通过定义注解 @MessageHub
,在启动时扫描全部bean中有使用了该注解的方法后台创建一个常驻线程代理消费数据,当线程消费到数据就回写到对应加了注解的方法里。
@Slf4j
@Service
public class RedisConsumerDemo {
@MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
public void consumer(Object message) {
log.info("pubsub info {} ", message);
}
}
实现redis的队列、stream方式实现都很简单,唯独发布订阅方式,网上的demo全都是一个固定套路,通过redis容器注入监听器,而且回写非常死板。那么如何将这块的逻辑统一呢。
之前总结过消息组件的代码设计,这里贴一下链接:
“
https://juejin.cn/post/7204113113699729463
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro 视频教程:https://doc.iocoder.cn/video/
常规写法
常规实现reids的发布订阅模式写法一共三步
1.创建消息监听器
@Bean
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
return new MessageListenerAdapter(messageListener, "onMessage");
}
2.创建订阅器
@Component
public class TestSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("get data :{}", msg);
}
}
3.向redis容器中添加消息监听器
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter smsExpirationListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
return container;
}
}
这样定义非常简单明了,但是有个问题是太代码僵硬了,创建监听者很不灵活,只能指定内部的onMessage方法,那么怎么才能融入到我们的内部消息流转中间件里呢。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud 视频教程:https://doc.iocoder.cn/video/
自定义注解实现
我们内部组件抽象了两个方法,生产和消费,但这两个方法逻辑截然不同,生产方法是暴露给serverice层接口调用,调用方在调用生产方法后能直接知道生产了几条数据和成功与否。而消费方法是配合Spring生命周期函数服务启动时建立常驻消费线程的。
/**
* 生产消息
*/
Integer producer(MessageForm messageForm);
/**
* 消费消息
*/
void consumer(ConsumerAdapterForm adapterForm);
生产消息当然很容易实现,只需要调用已经封装好的convertAndSend
方法。
stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());
消费方法就有说法了,动态生成监听者的场景下使用redis容器用代码挨个注册已经满足不了了,但仔细过一遍源代码就会发现,监听类的构造方法的入参只有两个,第一个需要回调的代理类,第二个消费到数据后回调的方法。
/**
* Create a new {@link MessageListenerAdapter} for the given delegate.
*
* @param delegate the delegate object
* @param defaultListenerMethod method to call when a message comes
* @see #getListenerMethodName
*/
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
this(delegate);
setDefaultListenerMethod(defaultListenerMethod);
}
那么好了好了,方案有了,本质上就是把RedisMessageListenerContainer
注入进来之后,扫描项目里所有加了 @MessageHub
的bean,包装成监听类加载到容器里就完事了。
怎么扫描的代码就不再赘述了,实现Spring的生命周期函数BeanPostProcessor#postProcessAfterInitialization
,在这里用AnnotationUtils
判断是否标注了注解。
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
continue;
}
标注了后判断如果是发布订阅,进入发布订阅的实现类。
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
@Resource
RedisMessageListenerContainer redisPubSubContainer;
@Override
public void produce(ProducerAdapterForm producerAdapterForm) {
stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
}
@Override
public void consume(ConsumerAdapterForm messageForm) {
MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
adapter.afterPropertiesSet();
redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
}
@Bean
public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
首先先将RedisMessageListenerContainer
注入到Spring容器里,produce方法只需要调用下现程的api。consume方法由于上一步我们获取了bean和对应的method,直接用MessageListenerAdapter
的构造器创建出监听器来,这里有个坑,需要手动调用adapter.afterPropertiesSet()
设置一些必要的属性,这个在常规写法里框架帮忙做了。如果不调用的话会出一些空指针之类的bug。
随后把监听器add到容器就实现了方法代理,背后的线程监听到数据会回调到标注了 @MessageHub
的方法里
欢迎加入我的知识星球,全面提升技术能力。
👉 加入方式,“长按”或“扫描”下方二维码噢:
星球的内容包括:项目实战、面试招聘、源码解析、学习路线。
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)