Spring Boot中如何使用RabbitMQ?

发布于:2025-05-21 ⋅ 阅读:(18) ⋅ 点赞:(0)

前面已经了解了怎么使用RabbitMQ的JDK原生客户端,现在我们来了解Spring Boot中如何使用RabbitMQ,在学习之前,先做好准备工作:

1. 添加依赖

在Spring Boot中使用RabbitMQ,需要使用如下依赖:

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>

2. 添加RabbitMQ相关配置

#配置rabbitmq相关信息
#amqp://username:password@Ip:port/virtual-host
  rabbitmq:
    addresses: amqp://study:study@110.41.17.130:5672/java113

3. 创建好对应的包

完成准备工作后,我们来学习Spring Boot中如何使用RabbitMQ,这里只学习常用的4中工作模式(工作队列、公开/订阅、路由、通配符)


一、Work Queue(工作队列)

一·、在Constants类中编写常量信息

    public static final String WORK_QUEUE = "work.queue";

二、在RabbitMQConfig中填写配置信息

这里我们使用内置交换机,故只需声明队列

    /*
    * 声明队列 */
    @Bean("workQueue")
    public Queue workQueue(){
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    }

durable表示持久化(服务器重启后保留),参数表示的是要声明的队列名是什么。


三、编写生产者代码

 @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/work")
    public String work(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello spring amqp: work... " + i);
        }
        return "发送成功";
    }

通过RabbitTemplate中的converAndSend方法发送消息,其中第一个参数表示交换机的名称(使用内置交换机为空字符串)、第二个参数表示routingKey,使用内置交换机默认为队列名、第三个参数表示要发送的消息


四、编写消费者代码

@Component
public class WorkListener {
    //@RabbitListener修饰的方法对应一个消费者
    @RabbitListener(queues = Constants.WORK_QUEUE)//指定消费的四哪一个队列
    public void queueListener1(Message message, Channel channel){
        System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" + message  + ",channel:" + channel);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)//指定消费的四哪一个队列
    public void queueListener2(String message){
        System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" + message);
    }
}

在实际开发中,一个消费者就应该对应一个类,这里为了方便就写在同一个类中了,在消费者代码中,需要注意的有:

1. 通过@RabbitListener注解指定需要消费的是那个队列

2. 对应方法的参数有3种,分别为Channel、Message、String,其中 String 返回消息内容(生产者发送的消息)、Message 返回原始消息体及消息的属性,如ID、内容、队列信息、

Channel 返回的是RabbitMQ的通道对象,可用于手动确认等。


五、运行程序,发送消息


二、publish/subscribe(公开/订阅)

一、在Constants类中编写常量信息

    public static final  String FANOUT_QUEUE1 = "fanout.queue1";
    public static final  String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";

二、在RabbitMQConfig中填写配置信息


    //公开/订阅模式
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    }
    @Bean("fanoutQueue2")
    public Queue fanoutQueue2(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    }
    /*
    * 声明交换机*/
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
    }
    /*
    * 声明队列与交换机的绑定关系*/
    @Bean("fanoutQueueBinding1")
    public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean("fanoutQueueBinding2")
    public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

在声明交换机与队列绑定关系时,需要注意使用@Qualifier注解指定要将哪个队列注入到参数queue中,否者会报错,因为spring中管理了多个Queue对象(前面声明了多个队列)。


三、编写生产者代码

    @RequestMapping("/fanout")
    public String fanout(){
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp: fanout...");
        return "发送成功";
    }

发布订阅模式只需要将routingKey指定为空字符串(" ")即可将消息发送到所有与交换机绑定的队列中。


四、编写消费者代码

@Component
public class FanoutListener2 {
    @RabbitListener(queues = Constants.FANOUT_QUEUE2)//指定消费的四哪一个队列
    public void queueListener2(String message){
        System.out.println("队列 ["+Constants.WORK_QUEUE+"] 接收到消息:" + message);
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列 ["+Constants.WORK_QUEUE+"] 接收到消息:" + message);
    }
}

三、 Routing(路由)

一、在Constants类中编写常量信息

  public static final String DIRECT_QUEUE1 = "direct.queue1";
  public static final String DIRECT_QUEUE2 = "direct.queue2";
  public static final String DIRECT_EXCHANGE = "direct.exchange";

二、在RabbitMQConfig中填写配置信息

 //路由模式
    //1.声明队列
    @Bean
    public Queue directQueue1(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }
    @Bean
    public Queue directQueue2(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }
    //2.声明交换机
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    }
    //2.队列与交换机的绑定关系
    @Bean
    public Binding directQueueBinding1(DirectExchange directExchange,@Qualifier("directQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }
    @Bean
    public Binding directQueueBinding2(DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }
    @Bean
    public Binding directQueueBinding3(DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("black");
    }

通过with方法设置交换机与队列绑定的bindingKey(routingKey)


三、编写生产者代码

    @RequestMapping("/direct/{routingKey}")
    public String direct(@PathVariable("routingKey") String routingKey){
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello spring amqp: direct , routingKey is :" + routingKey);
        return "发送成功";
    }

四、编写消费者代码

@Component
public class DirectListener {

    @RabbitListener(queues = Constants.DIRECT_QUEUE1)//指定消费的四哪一个队列
    public void queueListener1(String message){
        System.out.println("队列 ["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + message);
    }


    @RabbitListener(queues = Constants.DIRECT_QUEUE2)//指定消费的四哪一个队列
    public void queueListener2(String message){
        System.out.println("队列 ["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + message);
    }
}

四、Topics(通配符) 

一、在Constants类中编写常量信息

    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";

二、在RabbitMQConfig中填写配置信息

  //通配符模式
    //1.声明队列
    @Bean
    public Queue topicQueue1(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }
    @Bean
    public Queue topicQueue2(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }
    //2.声明交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    }
    //3.交换机与队列绑定
    @Bean
    public Binding topicQueueBinding1(TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
    }
    @Bean
    public Binding topicQueueBinding2(TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
    }
    @Bean
    public Binding topicQueueBinding3(TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
    }

三、编写生产者代码

    @RequestMapping("/topic/{routingKey}")
    public String topic(@PathVariable("routingKey") String routingKey){
        //rabbitTemplate内部自动处理了channel的连接和释放
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello spring amqp: topic , routingKey is :" + routingKey);
        return "发送成功";
    }

四、编写消费者代码

@Component
public class TopicListener {

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)//指定消费的是哪一个队列
    public void queueListener1(String message){
        System.out.println("队列 ["+Constants.TOPIC_QUEUE1+"] 接收到消息:" + message);
    }


    @RabbitListener(queues = Constants.TOPIC_QUEUE2)//指定消费的是哪一个队列
    public void queueListener2(String message){
        System.out.println("队列 ["+Constants.TOPIC_QUEUE2+"] 接收到消息:" + message);
    }
}

 五、总结

与JDK相比,在Spring Boot中使用RabbitMQ:

1. 无需手动创建连接(connection)和声明信道(channel);

2. 对于重复的操作,如填写用户名密码、ip、port、虚拟机,都已经在配置文件中配置好,无需手动编写,需要做的只有 声明队列、声明交换机、声明交换机与队列的绑定关系

3. JDK原生客户端通过 queueDeclare、exchangeDeclare、queueBind 声明队列、交换机以及队列与交换机的绑定关系,而Spring Boot 中 通过QueueBuilder、ExchangeBuilder、BindingBuilder来声明队列、交换机以及队列与交换机的绑定关系

4. JDK中使用basicPublish发送消息,bascConsume接收消息,而Spring Boot中使用@RabbitListener注解接收指定队列消息,通过RabbitTemplate对象向队列发送消息


网站公告

今日签到

点亮在社区的每一天
去签到