SpringBoot项目应该这样发送事件消息,很优雅!

科技   2024-10-31 08:58   北京  

在日常项目开发中,我们常常遇到类似的业务场景:“如果发生……,则……”“当完成……后,请通知……”“发生……时,需要……” 等。为了解耦业务流程并避免对主流程的影响,通常我们会采用领域事件的方式。在主业务流程完成后,发送一条消息,由消息监听者负责处理后续业务流程。最常见的实现方式是通过ApplicationEventPublisher发送消息,使用@EventListener注解来监听并处理相应的业务逻辑。

然而,在最近的代码走查中,我发现项目中发送消息的方式各不相同,问题颇多。有的同事直接将数据库DO对象作为消息体发送;有的则使用一个公共消息体对象,并通过类型进行区分,最终在消费逻辑中根据类型选择不同的处理逻辑。这样的实现方式存在很多问题。那么,我们该如何设计一个通用的领域事件发送流程呢?

本文将对此进行探讨。

基类设计

在领域事件的业务场景中,出现 “如果发生…,则…” 时,通常是领域对象发生了变化,常见的操作有创建更新删除

我们可以首先定义一个枚举类,表示事件变更的类型:

public enum EventTypeEnum {  
    DELETE,  
    CREATE,  
    UPDATE;
  
    /**  
  * Acquires an EventTypeEnum by its name. * @param name  
  * @return  
  */

    public static EventTypeEnum acquireByName(String name){  
        return Arrays.stream(EventTypeEnum.values())  
                .filter(e -> Objects.equals(e.name(), name))  
                .findFirst()  
                .orElseThrow(() -> new RuntimeException(String.format(" this EventType can not support %s", name)));  
    }  
  
}

事件对象通常需要包含事件唯一编号、事件发生时间等基本属性,并且具有不可变性。为了统一时间的定义过程,我们可以对事件对象进行抽象,提炼出事件基类BaseEvent

@Getter  
public abstract class BaseEvent implements Serializable {  
    private final String eventId;  
    private final LocalDateTime eventTime;  
  
    public BaseEvent() {  
        this.eventId = UUID.randomUUID().toString();  
        this.eventTime = LocalDateTime.now();  
    }  
}

在发送事件时,我们可以直接将对象发送。结合这一点,可以设计一个泛型包装类,代表领域事件的基类DomainEvent。在变更操作时,系统需要记录原始对象和变更后的对象。在DomainEvent中,可以使用sourceafter属性来表示这些对象,同时提供了一个buildContext方法用于构建对象变更说明,可用于记录日志。

@Getter  
public class DomainEvent<Textends BaseEvent{  
    private final T source;   
 private final T after;  
    private final EventTypeEnum eventType;  
  
  
    public DomainEvent(final T source, final T after, final EventTypeEnum eventType) {  
        this.eventType = eventType;  
        this.source = source;  
        this.after = after;  
    }  
  
  
    public String beforeSnapshot(){  
        return Objects.toString(source, "before unknown");  
    }  
  
    public String afterSnapshot() {  
        return Objects.toString(after, "after unknown");  
    }  
  
    /**  
     *  Builds the context of the event change.
     */
    
     public String buildContext() {  
         return String.format("%s changed(%s)[%s = > %s]", source.getClass().getTypeName(), eventTypeEnum.toString(), beforeSnapshot(), afterSnapshot());  
    }  
  
}

至此,我们完成了领域事件基类的设计,类图如下所示:


接口实现设计

在项目中,如果是进程内通信,可以基于ApplicationEventPublisher发布事件;而对于跨进程交互,则需要借助消息队列。无论何种实现方式,我们都应设计一个通用的领域事件发送接口。该接口提供onCreatedonUpdatedonDeleted方法,分别对应事件的创建、更新和删除逻辑。由于并非所有业务场景都需要实现这三种事件变更,这里我们使用default修饰符,以便业务按需实现。

public interface DataChangedEventPublisher<T{
    /**
     * 处理新数据实体的创建。
     */

    default void onCreated(final T data){
        DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.CREATE);
        publish(event);
    }


    /**
     * 处理数据实体的删除。
     */

    default void onDeleted(final T data){
        DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.DELETE);
        publish(event);
    }



    /**
     * 处理数据实体的更新。
     */

    default void onUpdated(final T data, final T before){
        DomainEvent<T> event = new DomainEvent<>(data, before, EventTypeEnum.UPDATE);
        publish(event);
    }


    /**
     * Publishes the given DomainEvent.
     */

    void publish(DomainEvent<T> domainEvent);
}

消息发送示例

一旦定义了DataChangedEventPublisher接口,我们就可以实现消息发送逻辑。假设我们有一个业务场景:当用户创建或更新时需要发送消息。

  1. 定义用户对象
@Data
public class Account {
    private String name;
    private String nickName;
    private Long id;
    private Integer age;
}
  1. 创建变更消息对象
public class AccountChangedEvent extends DomainEvent<Account{

    public AccountChangedEvent(Account source, Account before, OperationType operationType) {
        super(source, before, operationType);
    }

    @Override
    public String buildContext() {
        final Account source = getSource();
        if (Objects.isNull(getAfter())) {
            return String.format("the account [%s] is %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()));
        }
        return String.format("the account [%s] is %s : %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()), contrast());
    }


    /**
     * 字段对比
     */

    private Object contrast() {
        final Account before =  getSource();
        Objects.requireNonNull(before);
        final Account after =  getAfter();
        Objects.requireNonNull(after);
        if (Objects.equals(before, after)) {
            return "it no change";
        }
        final StringBuilder builder = new StringBuilder();
        if (!Objects.equals(before.getName(), after.getName())) {
            builder.append(String.format("name[%s => %s] ", before.getName(), after.getName()));
        }
        if (!Objects.equals(before.getUniqueName(), after.getNickName())) {
            builder.append(String.format("uniqueName[%s => %s] ", before.getUniqueName(), after.getUniqueName()));
        }
        if (!Objects.equals(before.getAge(), after.getAge())) {
            builder.append(String.format("age[%s => %s] ", before.getAge(), after.getAge()));
        }
        if (!Objects.equals(before.getNickName(), after.getNickName())) {
            builder.append(String.format("nickName[%s => %s] ", before.getNickName(), after.getNickName()));
        }
        return builder.toString();
    }
}

重写buildContext()方法用于构建用户变更的详细信息,根据业务需要自由实现。

  1. 实现事件变更接口
@Slf4j
@Service
public class AccountChangedPublisher implements DataChangedEventPublisher<Account{

    private final ApplicationEventPublisher applicationEventPublisher;

    public AccountChangedPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    public void onCreated(Account data) {
        AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);
        publish(changedEvent);
    }

    @Override
    public void onUpdated(Account data, Account after) {
        AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);
        publish(changedEvent);
    }


    @Override
    public void publish(DomainEvent<Account> domainEvent) {
        String context = domainEvent.buildContext();
        log.info(context);
        applicationEventPublisher.publishEvent(domainEvent);
    }
}
  1. 注入并使用Publisher进行消息发送

业务逻辑中可以注入AccountChangedPublisher进行消息发送,同时监听AccountChangedEvent

@RestController
@RequestMapping("/api/demo/account")
@RequiredArgsConstructor
public class AccountController {

    private final AccountChangedPublisher publisher;

    @GetMapping("/message")
    public void demo() {
        Account account = new Account();
        account.setAge(23);
        account.setName("Jam");
        account.setNickName("张张");

        Account account2 = new Account();
        account2.setNickName("赵赵");
        account2.setAge(24);
        account2.setName("Jam");

        //发送变更消息
        publisher.onUpdated(account,account2);
    }
}

@Component
public class AccountEventConsumer {
    @EventListener
    public void handleSyncErrorEvent(AccountChangedEvent event) {
        // Handle the event here
        System.out.println("Received event: " + event.buildContext());
    }
}
  1. 日志输出

当消息发送后,可以在日志中看到以下信息:

INFO  c.j.d.m.t.e.a.AccountChangedPublisher - the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵] 
Received event: the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵] 

通过以上步骤,我们完成了事件消息的发送流程,整体类图如下所示:


设计扩展

该设计可以进行进一步扩展。如果系统要求存储所有发送过的消息以便后期检索和分析,我们可以创建一个抽象类AbstractLoggingEventPublisher,用于处理此逻辑,后续事件发送器可直接继承此类。


@Slf4j
public abstract class AbstractLoggingEventPublisher<Timplements DataChangedEventPublisher<T>{
    @Autowired
    private LogService logService;

    protected AbstractLoggingEventPublisher(){

    }

    /**
     * Publishes the given DomainEvent.
     * Logs the event details and delegates the actual publishing to the subclass.
     */

    @Override
    public void publish(DomainEvent<T> domainEvent) {
        logEvent(domainEvent);
        publishEvent(domainEvent);
    }

    /**
     * Logs the details of the given DomainEvent.
     * @param domainEvent the event to be logged
     */

    private void logEvent(DomainEvent<T> domainEvent) {
        logService.saveLog(domainEvent);
    }


    /**
     * Abstract method for publishing the given DomainEvent.
     * @param domainEvent the event to be published
     */

    protected abstract void publishEvent(DomainEvent<T> domainEvent) ;
}

此时,AccountChangedPublisher可以直接继承AbstractLoggingEventPublisher,代码如下:

@Slf4j
@Service
public class AccountChangedEventPublisher extends AbstractLoggingEventPublisher<Account{

    private final ApplicationEventPublisher applicationEventPublisher;

    public AccountChangedEventPublisher3(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    public void onUpdated(Account data, Account after) {
        AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);
        publish(changedEvent);
    }
    
    @Override
    public void onCreated(Account data) {
        AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);
        publish(changedEvent);
    }


    @Override
    protected void publishEvent(DomainEvent<Account> domainEvent) {
        applicationEventPublisher.publishEvent(domainEvent);
    }
}

总结

应用开发中,解耦业务流程至关重要,领域事件模式提供了一种有效的解决方案。通过设计通用的事件发布接口DataChangedEventPublisher,我们能够实现对创建、更新和删除等业务操作的统一处理。借助DomainEvent类封装事件信息,确保了事件的完整性和可追溯性。

- End-

DailyMart是一个基于 DDD 和Spring Cloud Alibaba的微服务商城系统,采用SpringBoot3.x以及JDK17。旨在为开发者提供集成式的学习体验,并将其无缝地应用于实际项目中。该专栏包含领域驱动设计(DDD)、Spring Cloud Alibaba企业级开发实践、设计模式实际应用场景解析、分库分表战术及实用技巧等内容。如果你对这个系列感兴趣,可在本公众号回复关键词 DDD 获取完整文档以及相关源码。

JAVA日知录
写代码的架构师,做架构的程序员! 实战、源码、数据库、架构...只要你来,你想了解的这里都有!
 最新文章