RabbitMQ在SpringBoot中的使用详解

发布于:2025-07-08 ⋅ 阅读:(17) ⋅ 点赞:(0)

📋目录

🎯 RabbitMQ基础概念

什么是RabbitMQ?

核心概念详解

1. 队列(Queue)

2. 交换机(Exchange)

3. 绑定(Binding)

🛠️ 环境搭建

1. 安装RabbitMQ

Windows安装

Docker安装(推荐)

2. 访问管理界面

🚀 SpringBoot集成RabbitMQ

1. 添加依赖

2. 配置文件

⚙️ 基础配置

1. RabbitMQ配置类

📨 简单队列模式

1. 队列配置

2. 生产者

3. 消费者

👥 工作队列模式

1. 配置类

2. 生产者

3. 消费者

📡 发布订阅模式

1. 配置类

2. 生产者

3. 消费者

🎯 路由模式

1. 配置类

2. 生产者

3. 消费者

🏷️ 主题模式

1. 配置类

2. 生产者

3. 消费者

✅ 消息确认机制

1. 生产者确认

2. 消费者确认

💼 实际项目应用

1. 订单处理系统

2. 延迟队列实现

🔧 常见问题解决

1. 消息丢失问题

2. 消息重复消费问题

3. 性能优化

4. 监控和日志

📚 总结

使用建议

最佳实践


🎯 RabbitMQ基础概念

什么是RabbitMQ?

RabbitMQ是一个开源的消息队列中间件,就像一个邮局一样:

  • 生产者(Producer):发送邮件的人
  • 队列(Queue):邮箱,存放邮件的地方
  • 消费者(Consumer):收邮件的人
  • 交换机(Exchange):邮局的分拣中心,决定邮件发到哪个邮箱

核心概念详解

1. 队列(Queue)
队列就像一个容器,消息在这里排队等待被处理
[消息1] [消息2] [消息3] → 消费者取走处理
2. 交换机(Exchange)

交换机有4种类型:

  • Direct(直连):根据路由键精确匹配
  • Fanout(扇出):广播到所有绑定的队列
  • Topic(主题):根据路由键模式匹配
  • Headers:根据消息头匹配(较少使用)
3. 绑定(Binding)

绑定是交换机和队列之间的连接规则


🛠️ 环境搭建

1. 安装RabbitMQ

Windows安装
# 使用Chocolatey安装
choco install rabbitmq

# 启动RabbitMQ服务
rabbitmq-server
Docker安装(推荐)
# 拉取并运行RabbitMQ容器
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=123456 \
  rabbitmq:3-management

2. 访问管理界面

  • 地址:http://localhost:15672
  • 用户名:admin
  • 密码:123456

🚀 SpringBoot集成RabbitMQ

1. 添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

2. 配置文件

# application.yml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: /
    # 发送者开启确认模式
    publisher-confirms: true
    # 发送者开启return确认机制
    publisher-returns: true
    # 设置消费者手动确认
    listener:
      simple:
        acknowledge-mode: manual
        # 限制每次只处理一个消息
        prefetch: 1

⚙️ 基础配置

1. RabbitMQ配置类

@Configuration
@EnableRabbit
public class RabbitConfig {
    
    /**
     * 创建RabbitTemplate,用于发送消息
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // 设置消息转换器(可选)
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        
        // 设置发送确认回调
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败:" + cause);
            }
        });
        
        return template;
    }
}

📨 简单队列模式

这是最基本的模式:一个生产者发送消息到队列,一个消费者接收消息。

1. 队列配置

@Configuration
public class SimpleQueueConfig {
    
    public static final String SIMPLE_QUEUE = "simple.queue";
    
    /**
     * 声明简单队列
     */
    @Bean
    public Queue simpleQueue() {
        return QueueBuilder.durable(SIMPLE_QUEUE).build();
    }
}

2. 生产者

@RestController
@RequestMapping("/simple")
public class SimpleProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping("/send")
    public String sendMessage(@RequestParam String message) {
        // 发送消息到队列
        rabbitTemplate.convertAndSend(SimpleQueueConfig.SIMPLE_QUEUE, message);
        return "消息发送成功:" + message;
    }
}

3. 消费者

@Component
public class SimpleConsumer {
    
    /**
     * 监听简单队列
     */
    @RabbitListener(queues = SimpleQueueConfig.SIMPLE_QUEUE)
    public void receiveMessage(String message, Channel channel, 
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            System.out.println("接收到消息:" + message);
            
            // 模拟业务处理
            Thread.sleep(1000);
            
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            try {
                // 处理失败,拒绝消息并重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

👥 工作队列模式

多个消费者共同处理一个队列中的消息,实现任务分发。

1. 配置类

@Configuration
public class WorkQueueConfig {
    
    public static final String WORK_QUEUE = "work.queue";
    
    @Bean
    public Queue workQueue() {
        return QueueBuilder.durable(WORK_QUEUE).build();
    }
}

2. 生产者

@RestController
@RequestMapping("/work")
public class WorkProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping("/send")
    public String sendWork(@RequestParam String task) {
        for (int i = 1; i <= 10; i++) {
            String message = task + " - 任务" + i;
            rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message);
        }
        return "批量任务发送完成";
    }
}

3. 消费者

@Component
public class WorkConsumer {
    
    /**
     * 工作者1
     */
    @RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
    public void worker1(String message, Channel channel, 
                       @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            System.out.println("工作者1处理:" + message);
            Thread.sleep(2000); // 模拟较慢的处理
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            handleError(channel, deliveryTag);
        }
    }

网站公告

今日签到

点亮在社区的每一天
去签到