【进阶篇-消息队列】——RocketMQ如何实现事务的

发布于:2025-07-04 ⋅ 阅读:(14) ⋅ 点赞:(0)


本文来源:极客时间vip课程笔记

一、RocketMQ如何实现事务的

1.1、普通业务代码实现RocketMQ 的事务大致流程

  • 首先我们一起通过普通业务代码来看 RocketMQ 的事务大致流程。

    public class CreateOrderService {
         
    
      @Inject
      private OrderDao orderDao; // 注入订单表的DAO
      @Inject
      private ExecutorService executorService; //注入一个ExecutorService
    
      private TransactionMQProducer producer;
    
      // 初始化transactionListener 和 producer
      @Init
      public void init() throws MQClientException {
         
        TransactionListener transactionListener = createTransactionListener();
        producer = new TransactionMQProducer("myGroup");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
      }
    
      // 创建订单服务的请求入口
      @PUT
      @RequestMapping(...)
      public boolean createOrder(@RequestBody CreateOrderRequest request) {
         
        // 根据创建订单请求创建一条消息
        Message msg = createMessage(request);
        // 发送事务消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, request);
        // 返回:事务是否成功
        return sendResult.getSendStatus() == SendStatus.SEND_OK;
      }
    
      private TransactionListener createTransactionListener() {
         
        return new TransactionListener() {
         
          @Override
          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
         
            CreateOrderRequest request = (CreateOrderRequest ) arg;
            try {
         
              // 执行本地事务创建订单
              orderDao.createOrderInDB(request);
              // 如果没抛异常说明执行成功,提交事务消息
              return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Throwable t) {
         
              // 失败则直接回滚事务消息
              return LocalTransactionState.ROLLBACK_MESSAGE;
            }
          }
          // 反查本地事务