在日常项目开发中,我们常常遇到类似的业务场景:“如果发生……,则……”,“当完成……后,请通知……”,“发生……时,需要……” 等。为了解耦业务流程并避免对主流程的影响,通常我们会采用领域事件的方式。在主业务流程完成后,发送一条消息,由消息监听者负责处理后续业务流程。最常见的实现方式是通过ApplicationEventPublisher
发送消息,使用@EventListener
注解来监听并处理相应的业务逻辑。
然而,在最近的代码走查中,我发现项目中发送消息的方式各不相同,问题颇多。有的同事直接将数据库DO对象作为消息体发送;有的则使用一个公共消息体对象,并通过类型进行区分,最终在消费逻辑中根据类型选择不同的处理逻辑。这样的实现方式存在很多问题。那么,我们该如何设计一个通用的领域事件发送流程呢?
本文将对此进行探讨。
基类设计
在领域事件的业务场景中,出现 “如果发生…,则…” 时,通常是领域对象发生了变化,常见的操作有创建、更新和删除。
我们可以首先定义一个枚举类,表示事件变更的类型:
public enum EventTypeEnum {
DELETE,
CREATE,
UPDATE;
/**
* Acquires an EventTypeEnum by its name. * @param name
* @return
*/
public static EventTypeEnum acquireByName(String name){
return Arrays.stream(EventTypeEnum.values())
.filter(e -> Objects.equals(e.name(), name))
.findFirst()
.orElseThrow(() -> new RuntimeException(String.format(" this EventType can not support %s", name)));
}
}
事件对象通常需要包含事件唯一编号、事件发生时间等基本属性,并且具有不可变性。为了统一时间的定义过程,我们可以对事件对象进行抽象,提炼出事件基类BaseEvent
:
@Getter
public abstract class BaseEvent implements Serializable {
private final String eventId;
private final LocalDateTime eventTime;
public BaseEvent() {
this.eventId = UUID.randomUUID().toString();
this.eventTime = LocalDateTime.now();
}
}
在发送事件时,我们可以直接将对象发送。结合这一点,可以设计一个泛型包装类,代表领域事件的基类DomainEvent
。在变更操作时,系统需要记录原始对象和变更后的对象。在DomainEvent
中,可以使用source
和after
属性来表示这些对象,同时提供了一个buildContext
方法用于构建对象变更说明,可用于记录日志。
@Getter
public class DomainEvent<T> extends BaseEvent{
private final T source;
private final T after;
private final EventTypeEnum eventType;
public DomainEvent(final T source, final T after, final EventTypeEnum eventType) {
this.eventType = eventType;
this.source = source;
this.after = after;
}
public String beforeSnapshot(){
return Objects.toString(source, "before unknown");
}
public String afterSnapshot() {
return Objects.toString(after, "after unknown");
}
/**
* Builds the context of the event change.
*/
public String buildContext() {
return String.format("%s changed(%s)[%s = > %s]", source.getClass().getTypeName(), eventTypeEnum.toString(), beforeSnapshot(), afterSnapshot());
}
}
至此,我们完成了领域事件基类的设计,类图如下所示:
接口实现设计
在项目中,如果是进程内通信,可以基于ApplicationEventPublisher
发布事件;而对于跨进程交互,则需要借助消息队列。无论何种实现方式,我们都应设计一个通用的领域事件发送接口。该接口提供onCreated
、onUpdated
和onDeleted
方法,分别对应事件的创建、更新和删除逻辑。由于并非所有业务场景都需要实现这三种事件变更,这里我们使用default
修饰符,以便业务按需实现。
public interface DataChangedEventPublisher<T> {
/**
* 处理新数据实体的创建。
*/
default void onCreated(final T data){
DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.CREATE);
publish(event);
}
/**
* 处理数据实体的删除。
*/
default void onDeleted(final T data){
DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.DELETE);
publish(event);
}
/**
* 处理数据实体的更新。
*/
default void onUpdated(final T data, final T before){
DomainEvent<T> event = new DomainEvent<>(data, before, EventTypeEnum.UPDATE);
publish(event);
}
/**
* Publishes the given DomainEvent.
*/
void publish(DomainEvent<T> domainEvent);
}
消息发送示例
一旦定义了DataChangedEventPublisher
接口,我们就可以实现消息发送逻辑。假设我们有一个业务场景:当用户创建或更新时需要发送消息。
定义用户对象
@Data
public class Account {
private String name;
private String nickName;
private Long id;
private Integer age;
}
创建变更消息对象
public class AccountChangedEvent extends DomainEvent<Account> {
public AccountChangedEvent(Account source, Account before, OperationType operationType) {
super(source, before, operationType);
}
@Override
public String buildContext() {
final Account source = getSource();
if (Objects.isNull(getAfter())) {
return String.format("the account [%s] is %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()));
}
return String.format("the account [%s] is %s : %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()), contrast());
}
/**
* 字段对比
*/
private Object contrast() {
final Account before = getSource();
Objects.requireNonNull(before);
final Account after = getAfter();
Objects.requireNonNull(after);
if (Objects.equals(before, after)) {
return "it no change";
}
final StringBuilder builder = new StringBuilder();
if (!Objects.equals(before.getName(), after.getName())) {
builder.append(String.format("name[%s => %s] ", before.getName(), after.getName()));
}
if (!Objects.equals(before.getUniqueName(), after.getNickName())) {
builder.append(String.format("uniqueName[%s => %s] ", before.getUniqueName(), after.getUniqueName()));
}
if (!Objects.equals(before.getAge(), after.getAge())) {
builder.append(String.format("age[%s => %s] ", before.getAge(), after.getAge()));
}
if (!Objects.equals(before.getNickName(), after.getNickName())) {
builder.append(String.format("nickName[%s => %s] ", before.getNickName(), after.getNickName()));
}
return builder.toString();
}
}
重写buildContext()
方法用于构建用户变更的详细信息,根据业务需要自由实现。
实现事件变更接口
@Slf4j
@Service
public class AccountChangedPublisher implements DataChangedEventPublisher<Account> {
private final ApplicationEventPublisher applicationEventPublisher;
public AccountChangedPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void onCreated(Account data) {
AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);
publish(changedEvent);
}
@Override
public void onUpdated(Account data, Account after) {
AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);
publish(changedEvent);
}
@Override
public void publish(DomainEvent<Account> domainEvent) {
String context = domainEvent.buildContext();
log.info(context);
applicationEventPublisher.publishEvent(domainEvent);
}
}
注入并使用Publisher进行消息发送
业务逻辑中可以注入AccountChangedPublisher
进行消息发送,同时监听AccountChangedEvent
:
@RestController
@RequestMapping("/api/demo/account")
@RequiredArgsConstructor
public class AccountController {
private final AccountChangedPublisher publisher;
@GetMapping("/message")
public void demo() {
Account account = new Account();
account.setAge(23);
account.setName("Jam");
account.setNickName("张张");
Account account2 = new Account();
account2.setNickName("赵赵");
account2.setAge(24);
account2.setName("Jam");
//发送变更消息
publisher.onUpdated(account,account2);
}
}
@Component
public class AccountEventConsumer {
@EventListener
public void handleSyncErrorEvent(AccountChangedEvent event) {
// Handle the event here
System.out.println("Received event: " + event.buildContext());
}
}
日志输出
当消息发送后,可以在日志中看到以下信息:
INFO c.j.d.m.t.e.a.AccountChangedPublisher - the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵]
Received event: the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵]
通过以上步骤,我们完成了事件消息的发送流程,整体类图如下所示:
设计扩展
该设计可以进行进一步扩展。如果系统要求存储所有发送过的消息以便后期检索和分析,我们可以创建一个抽象类AbstractLoggingEventPublisher
,用于处理此逻辑,后续事件发送器可直接继承此类。
@Slf4j
public abstract class AbstractLoggingEventPublisher<T> implements DataChangedEventPublisher<T>{
@Autowired
private LogService logService;
protected AbstractLoggingEventPublisher(){
}
/**
* Publishes the given DomainEvent.
* Logs the event details and delegates the actual publishing to the subclass.
*/
@Override
public void publish(DomainEvent<T> domainEvent) {
logEvent(domainEvent);
publishEvent(domainEvent);
}
/**
* Logs the details of the given DomainEvent.
* @param domainEvent the event to be logged
*/
private void logEvent(DomainEvent<T> domainEvent) {
logService.saveLog(domainEvent);
}
/**
* Abstract method for publishing the given DomainEvent.
* @param domainEvent the event to be published
*/
protected abstract void publishEvent(DomainEvent<T> domainEvent) ;
}
此时,AccountChangedPublisher
可以直接继承AbstractLoggingEventPublisher
,代码如下:
@Slf4j
@Service
public class AccountChangedEventPublisher extends AbstractLoggingEventPublisher<Account> {
private final ApplicationEventPublisher applicationEventPublisher;
public AccountChangedEventPublisher3(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void onUpdated(Account data, Account after) {
AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);
publish(changedEvent);
}
@Override
public void onCreated(Account data) {
AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);
publish(changedEvent);
}
@Override
protected void publishEvent(DomainEvent<Account> domainEvent) {
applicationEventPublisher.publishEvent(domainEvent);
}
}
总结
应用开发中,解耦业务流程至关重要,领域事件模式提供了一种有效的解决方案。通过设计通用的事件发布接口DataChangedEventPublisher
,我们能够实现对创建、更新和删除等业务操作的统一处理。借助DomainEvent
类封装事件信息,确保了事件的完整性和可追溯性。
- End-
DailyMart是一个基于 DDD 和Spring Cloud Alibaba的微服务商城系统,采用SpringBoot3.x以及JDK17。旨在为开发者提供集成式的学习体验,并将其无缝地应用于实际项目中。该专栏包含领域驱动设计(DDD)、Spring Cloud Alibaba企业级开发实践、设计模式实际应用场景解析、分库分表战术及实用技巧等内容。如果你对这个系列感兴趣,可在本公众号回复关键词 DDD 获取完整文档以及相关源码。