Spring Boot集成RocketMQ:真实项目应用场景

发布于:2025-02-26 ⋅ 阅读:(14) ⋅ 点赞:(0)

第一部分:基础配置与简单示例

1. 项目初始化

使用Spring Boot创建一个项目,添加RocketMQ依赖。

  • POM依赖(Maven):

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>3.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.3.0</version>
    </dependency>
    
  • application.yml 配置:

    rocketmq:
      name-server: localhost:9876
      producer:
        group: default-producer-group
      consumer:
        group: default-consumer-group
    
2. 简单生产者与消费者
  • 生产者

    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SimpleProducerController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send")
        public String sendMessage() {
            rocketMQTemplate.convertAndSend("SimpleTopic", "Hello, RocketMQ with Spring Boot!");
            return "Message sent!";
        }
    }
    
  • 消费者

    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RocketMQMessageListener(topic = "SimpleTopic", consumerGroup = "simple-consumer-group")
    public class SimpleConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            System.out.println("Received message: " + message);
        }
    }
    
  • 启动类

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RocketMQApplication {
        public static void main(String[] args) {
            SpringApplication.run(RocketMQApplication.class, args);
        }
    }
    

启动项目后,访问 http://localhost:8080/send,消费者会打印消息。这是最基础的用法,面试中常被问到如何快速集成。


第二部分:真实项目应用场景

以下是RocketMQ在Spring Boot中的典型应用场景,涵盖面试常见问题。

1. 电商订单系统(异步消息)

场景:用户下单后,异步通知库存扣减和物流系统。

  • 生产者(订单服务):

    @RestController
    public class OrderController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/place-order")
        public String placeOrder() {
            String orderJson = "{\"orderId\":\"12345\",\"item\":\"Laptop\",\"quantity\":1}";
            // 异步发送消息
            rocketMQTemplate.asyncSend("OrderTopic", orderJson, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("Order sent successfully: " + sendResult.getMsgId());
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.err.println("Order send failed: " + throwable.getMessage());
                }
            });
            return "Order placed!";
        }
    }
    
  • 消费者(库存服务):

    @Service
    @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "inventory-consumer-group", selectorExpression = "Inventory")
    public class InventoryConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String orderJson) {
            System.out.println("Processing inventory for: " + orderJson);
            // 假设这里调用库存扣减逻辑
        }
    }
    
  • 配置Tag(application.yml):

    rocketmq:
      name-server: localhost:9876
      producer:
        group: order-producer-group
    

面试问题:如何确保消息不丢失?

  • 回答:使用异步发送时,结合 SendCallback 检查发送结果;在生产者端开启 retries(默认3次重试);Broker端开启持久化。
2. 事务消息(支付系统)

场景:用户支付后,确保订单状态更新和消息发送一致。

  • 生产者(事务消息):

    @RestController
    public class PaymentController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @Autowired
        private OrderService orderService;
    
        @GetMapping("/pay")
        public String payOrder() {
            String orderId = "12345";
            rocketMQTemplate.sendMessageInTransaction("TransactionTopic", MessageBuilder.withPayload("Payment for " + orderId).build(), orderId);
            return "Payment processed!";
        }
    }
    
    @Service
    @RocketMQTransactionListener
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        @Autowired
        private OrderService orderService;
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String orderId = (String) arg;
            try {
                orderService.updateOrderStatus(orderId, "PAID"); // 本地事务
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String orderId = new String(msg.getPayload()).split(" ")[2];
            String status = orderService.getOrderStatus(orderId);
            return "PAID".equals(status) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Service
    public class OrderService {
        // 模拟数据库操作
        private Map<String, String> orderStatus = new HashMap<>();
    
        public void updateOrderStatus(String orderId, String status) {
            orderStatus.put(orderId, status);
        }
    
        public String getOrderStatus(String orderId) {
            return orderStatus.getOrDefault(orderId, "UNPAID");
        }
    }
    

面试问题:事务消息的实现原理?

  • 回答:分为两阶段提交。Producer先发送Half消息,执行本地事务后提交或回滚;Broker定时检查未决事务,调用 checkLocalTransaction 确认状态。
3. 日志收集(顺序消息)

场景:收集应用日志,确保按时间顺序处理。

  • 生产者

    @RestController
    public class LogController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/log")
        public String sendLog() {
            String log = "{\"timestamp\":\"2025-02-25 10:00:00\",\"message\":\"User login\"}";
            rocketMQTemplate.syncSendOrderly("LogTopic", log, "user123"); // 使用userId作为hashKey保证顺序
            return "Log sent!";
        }
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "LogTopic", consumerGroup = "log-consumer-group", messageModel = MessageModel.CLUSTERING)
    public class LogConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String log) {
            System.out.println("Processing log: " + log);
        }
    }
    

面试问题:如何保证消息顺序?

  • 回答:使用 syncSendOrderly,通过hashKey(如用户ID)将消息路由到同一队列,消费者单线程消费该队列。
4. 延迟消息(促销提醒)

场景:订单未支付30分钟后发送提醒。

  • 生产者

    @RestController
    public class ReminderController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/remind")
        public String sendReminder() {
            String reminder = "Order 12345 unpaid";
            rocketMQTemplate.syncSend("ReminderTopic", MessageBuilder.withPayload(reminder).build(), 1000, 18); // 18代表30分钟延迟
            return "Reminder scheduled!";
        }
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "ReminderTopic", consumerGroup = "reminder-consumer-group")
    public class ReminderConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String reminder) {
            System.out.println("Sending reminder: " + reminder);
        }
    }
    

面试问题:延迟消息的实现机制?

  • 回答:RocketMQ内置18个延迟级别(1s到2h),消息先存储到延迟队列,到期后投递到目标队列。

第三部分:优化与高可用

1. 高可用性配置
  • 多NameServer

    rocketmq:
      name-server: localhost:9876;localhost:9877
    
  • 多消费者实例:ConsumerGroup内多实例负载均衡。

2. 性能优化
  • 批量发送

    List<Message> messages = Arrays.asList(
        MessageBuilder.withPayload("msg1").build(),
        MessageBuilder.withPayload("msg2").build()
    );
    rocketMQTemplate.syncSend("BatchTopic", messages);
    
  • 调整线程池

    rocketmq:
      consumer:
        pull-batch-size: 32
        consume-thread-max: 64
    
3. 异常处理
  • 消费者重试

    @Service
    @RocketMQMessageListener(topic = "RetryTopic", consumerGroup = "retry-consumer-group")
    public class RetryConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            if (true) { // 模拟失败
                throw new RuntimeException("Processing failed");
            }
        }
    }
    

    默认重试16次,可通过 maxReconsumeTimes 调整。


第四部分:面试常见问题与回答

  1. RocketMQ与Kafka的区别?

    • RocketMQ支持事务消息和延迟消息,Kafka不支持。
    • RocketMQ拉模式和推模式都支持,Kafka主要拉模式。
    • RocketMQ适合业务场景,Kafka更偏大数据处理。
  2. 如何处理消息重复消费?

    • 在消费者端实现幂等性(如数据库唯一约束或Redis去重)。
  3. 如何监控RocketMQ?

    • 使用RocketMQ Dashboard查看Topic、消费进度;集成Prometheus+Grafana监控性能。

网站公告

今日签到

点亮在社区的每一天
去签到