如何使用RocketMQ实现可靠事件模式

科技   2024-10-16 17:35   北京  

Seata 框架本身并没有内置针对可靠事件模式的解决方案,但我们可以使用另一款已经介绍过的框架来实现这一目标,就是 RocketMQ。

RocketMQ 为开发人员提供了事务消息这一消息类型,专门用来应对分布式环境下的数据一致性问题。

事务消息的基本概念

事务消息是RocketMQ提供的一种高级消息类型,支持在分布式场景下消息生产和本地事务的最终一致性。我们可以分别从生产者消费者维度出发来分析可靠事件实现上的需求。

  • 消息发送方:对于消息发送方而言,我们需要解决执行本地事务与发送消息的原子性问题,即保证本地事务执行成功,消息一定发送成功。
  • 消息接收方:对于消息接收方而言,我们需要解决接收消息与本地事务的原子性问题,即保证接收消息成功后,本地事务也一定执行成功。

事务消息的出现完美解决了可靠事件模式执行过程中可能出现的问题。事务消息提供了类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致性。

那么,RocketMQ 是如何做到这一点的呢?关键就在于它所提供的半消息机制。

所谓半消息(Half Message),是指暂不能投递的消息。发送方已经将消息成功发送到了服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息就是半消息。

介绍完半消息的概念,我们再来明确什么是半消息回查。

我们知道由于网络闪断、生产者应用重启等原因,可能会导致某条事务消息的二次确认丢失。RocketMQ 服务端通过扫描发现某条消息长期处于半消息状态时,就会主动向消息生产者询问该消息的最终状态(Commit 或 Rollback),这一过程就是半消息回查。图 1 展示了 RocketMQ 中事务消息的整体架构。

图1 RocketMQ 事务消息架构

进一步,我们梳理 RocketMQ 事务消息的执行过程,如图2所示。

图2 RocketMQ 事务消息执行过程

可以看到,图 2 存在服务A服务B这两个微服务。其中服务 A 是消息发布者,而服务 B 是消息消费者,我们需要确保两者之间数据的一致性。这里有 7 个步骤。

  1. 服务 A 向 RocketMQ 服务端发送事务消息。
  2. RocketMQ 将消息持久化成功之后,向服务A确认消息已经发送成功,此时消息为半消息。
  3. 服务 A 开始执行本地事务逻辑。
  4. 服务 A 根据本地事务执行结果向 RocketMQ 提交二次确认(Commit 或是 Rollback)。如果 RocketMQ 收到 Commit 结果,则将半消息标记为可投递,服务 B 最终将收到该消息。而如果 RocketMQ 收到 Rollback 结果,则删除半消息,服务 B 将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,步骤 4 提交的二次确认最终未到达 RocketMQ,经过一定时间后 RocketMQ 将基于该消息向服务 A 发起消息回查。
  6. 服务 A 收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 服务 A 根据检查得到的本地事务的最终状态再次提交二次确认,RocketMQ 仍按照步骤 4 对半消息进行操作。

图 2 更多是站在消息发布者的角度看待事务消息的发布流程。而针对消息消费而言,如果消费者处理事务消息时出现异常,RocketMQ 会进行重试操作,直到消息消费和本地事务处理都成功。这是一种回调机制,会被 RocketMQ 自动调用。

事务消息开发模式

介绍完 RocketMQ 事务消息的基本概念和执行流程之后,我们接着介绍它的开发模式。

实现消息发布者

当我们在微服务架构中引入事务消息之前,需要创建一张事务执行记录表。事务执行记录表的作用有两个:一个是实现事务回查,另一个则是实现业务层幂等控制。

事务执行记录表的创建脚本如以下代码所示。

代码清单1 事务执行记录表 SQL 定义代码

CREATE TABLE `tx_record` (
    `tx_no` varchar(64) NOT NULL COMMENT '事务Id',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    PRIMARY KEY (`tx_no`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事务记录表'

接下来我们要引入 RocketMQ 内置的TransactionListener接口。

为了实现事务消息,开发人员的主要开发工作量就体现在对这个接口的实现过程中。TransactionListener接口的定义如下所示。

代码清单2 TransactionListener接口定义代码

public interface TransactionListener {
    //当发送事务消息成功之后,该方法会被触发,本地事务将被执行
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    //当没有收到事务消息的响应时,服务器会发送确认消息来检查事务状态,该方法会被触发并获取本地事务状态
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

可以看到,TransactionListener接口的两个方法分别完成了本地事务执行本地事务回查这两个核心操作。那么我们应该如何实现这两个方法呢?这里给出这两个方法的执行伪代码。

代码清单3 TransactionListener接口两个方法实现伪代码

executeLocalTransaction {
 执行本地事务
 如果失败就选择回滚事务,反之提交事务
}

checkLocalTransaction {
 实现事务回查
 根据事务执行记录判断,已执行则提交事务
}

注意:这两个方法需要消息的发布者来实现,但调用方是 RocketMQ 自身,而且这个调用过程是自动触发的,不需要开发做任何干预。

图 3 围绕消息发布者展示了其所需要实现的各个核心步骤。

图3 事务消息中消息发布者实现过程

如果我们使用 Spring 框架来集成 RocketMQ,那么图 3 中的业务服务实现类的实现过程可以参考如下代码示例。

代码清单3 消息发布端业务服务实现类示例代码

@Service
public class CustomerTicketServiceImpl implements ICustomerTicketService {
    @Autowired
    TxRecordMapper txRecordMapper;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Override
    public void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) {
        //从VO中创建TicketGeneratedEvent
        TicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO);

        //将Event转化为JSON对象
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent);
        String jsonString = jsonObject.toJSONString();

        //生成消息对象
        Message<String> message = MessageBuilder.withPayload(jsonString).build();

        //发送事务消息
        rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null);
    }

    @Override
    @Transactional
    public void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) {
        //幂等判断
        if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
            return ;
        }

        //插入工单
        CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
        customerTicket.setStatus(1);
        save(customerTicket);

        //添加事务日志
        txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
    }
    ...
}

上述代码展示的是一个插入客服工单(CustomerTicket)的过程,generateTicketdoGenerateTicket方法分别对应图 3 中的发送消息执行本地事务这两个环节。

注意:这里使用了RocketMQTemplatesendMessageInTransaction方法来发送事务消息。同时,我们也看到了事务执行记录表的一种应用场景,即实现业务层幂等控制。

接下来继续实现图3所示的TransactionListener接口,示例代码如下。

代码清单4 TransactionListener接口实现类示例代码

@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_ticket")
public class ProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    ICustomerTicketService customerTicketService;

    @Autowired
    TxRecordMapper txRecordMapper;

    //事务消息发送后的回调方法,当消息发送给MQ成功,此方法被回调
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息,转成Event对象
            TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);

            //执行本地事务
            customerTicketService.doGenerateTicket(ticketGeneratedEvent);

            //当返回RocketMQLocalTransactionState.COMMIT,自动向MQ发送commit消息,MQ将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //如果本地事务执行失败,就将消息设置为回滚状态
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //事务状态回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析消息,转成Event对象
        TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);

        //根据事务Id判断是否存在已执行的事务
        Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()));

        //如果事务已执行则返回COMMIT,反之返回UNKNOWN状态
        if(isTxNoExisted){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    ...
}

这段代码清晰地展示了TransactionListener接口中两个核心方法的实现过程。在executeLocalTransaction方法中,我们通过调用CustomerTicketService业务服务类的doGenerateTicket方法完成了本地事务;而在checkLocalTransaction方法中,我们则实现了事务回查机制。这里同样展示了事务执行记录表的另一种应用场景,即实现事务回查

实现消息消费者

类似,当使用事务消息时,消息消费者的实现过程同样遵循一定的开发规范,如图 4 所示。

图4 事务消息中消息消费者实现过程

可以看到,相比于消息发布者,消息消费者的实现过程要简单很多。

代码清单5 消息消费实现类示例代码

@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket")
public class Consumer implements RocketMQListener<String> {
    @Autowired
    IChatRecordService chatRecordService;

    //接收消息
    @Override
    public void onMessage(String message) {
        log.info("开始消费消息:{}",message);

        //解析消息
        JSONObject jsonObject = JSONObject.parseObject(message);
        String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent");

        //转成TicketGeneratedEvent
        TicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class);

        //添加本地聊天记录
        chatRecordService.generateChatRecord(ticketGeneratedEvent);
    }
}

可以看到,这个消息消费者的实现过程没有任何特殊之处,我们只需要实现RocketMQListener接口的onMessage方法,并在该方法中调用业务服务实现类中的业务方法即可。

消费者端的业务服务实现类的实现过程如下。

代码清单6 消息消费端业务服务实现类示例代码

@Service
public class ChatRecordServiceImpl implements IChatRecordService {
    @Autowired
    TxRecordMapper txRecordMapper;

    @Override
    @Transactional
    public void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) {
        //幂等判断
        if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
            return ;
        }

        //插入聊天记录
        ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
        save(chatRecord);

        //添加事务日志
        txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
    }
}

这里同样通过事务执行记录表实现了业务层幂等控制,并最终完成本地事务的提交。

作为总结,我们使用时序图来详细展示事务消息发送和消费过程,如图 5 所示。


图5 事务消息发送和消费时序图



文末送书


老规矩,星球抽奖送两本《SpringCloud Alibaba 微服务架构设计与开发实战》,免费包邮到家,还没进的同学通过下面连接进入。

星球连接~

也可通过文末卡片购买享受五折优惠!!!


JAVA日知录
写代码的架构师,做架构的程序员! 实战、源码、数据库、架构...只要你来,你想了解的这里都有!
 最新文章