SpringBoot 整合 RabbitMQ

发布于:2024-04-17 ⋅ 阅读:(53) ⋅ 点赞:(0)

流程图概括

1,在项目中引入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2,配置RabbitMQ连接

application.propertiesapplication.yml中配置RabbitMQ服务器的连接参数:

3,DirectExchange

创建消费者

直连交换机消费者

import com.beiyou.model.OrderingOk;


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;

//@Configuration
@Slf4j
public class DirectConsumer {
    //注册一个队列
    @Bean  //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的
    public Queue queue1(){
      return   QueueBuilder.durable("Direct_Q01").maxLength(100).build();
    }
    //注册交换机
    @Bean
    public DirectExchange exchange(){
        //1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的
        return  ExchangeBuilder.directExchange("Direct_E01").build();
    }
    @Bean //交换机与队列关系
    public Binding binding1(Queue queue1,DirectExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }

    @RabbitListener(queues = "Direct_Q01")
    public void receiveMessage(OrderingOk msg) {
        log.info("消费者1 收到消息:"+ msg );
        int  i= 5/0;
    }
}

广播交换机消费者

import com.beiyou.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConsumer {
    @Bean
    public Queue fanoutQueue1(){
      return   QueueBuilder.durable("Fanout_Q01").build();
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return  ExchangeBuilder.fanoutExchange("Fanout_E01").build();
    }
    @Bean //交换机与队列关系
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @RabbitListener(queues = "Fanout_Q01")
    public void receiveMessage(OrderingOk msg){
        System.out.println("FanoutConsumer1 消费者1 收到消息:"+msg);
    }
    @RabbitListener(queues = "Fanout_Q01")
    public void receiveMessage32(OrderingOk msg){
        System.out.println("FanoutConsumer1 消费者2 收到消息:"+msg);
    }

}

主题交换机消费者

import com.beiyou.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//@Configuration
public class TopicConsumer {
    @Bean
    public TopicExchange topicExchange(){
        return  ExchangeBuilder.topicExchange("Topic_E01").build();
    }
    @Bean
    public Queue topicQueue1(){
      return   QueueBuilder.durable("小龙").build();
    }
    @Bean
    public Queue topicQueue2(){
        return   QueueBuilder.durable("海洋").build();
    }
    @Bean
    public Queue topicQueue3(){
        return   QueueBuilder.durable("文超").build();
    }
    @Bean //交换机与队列关系
    public Binding TopicBinding1(Queue topicQueue1,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#");
    }
    @Bean //交换机与队列关系
    public Binding TopicBinding2(Queue topicQueue2,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("1.6.*");
    }
    @Bean //交换机与队列关系
    public Binding TopicBinding3(Queue topicQueue3,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue3).to(topicExchange).with("1.8.*");
    }
    @RabbitListener(queues = "小龙")
    public void receiveMessage(OrderingOk msg){
        System.out.println("小龙 收到消息:"+msg);
    }
    @RabbitListener(queues = "海洋")
    public void receiveMessage2(OrderingOk msg){
        System.out.println("海洋 收到消息:"+msg);
    }
    @RabbitListener(queues = "文超")
    public void receiveMessage3(OrderingOk msg){
        System.out.println("文超 收到消息:"+msg);
    }
}

死信交换机消费者

import com.beiyou.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
@Slf4j
public class DeadConsumer {
    //死信交换机
    @Bean
    public DirectExchange deadExchange(){
        return  ExchangeBuilder.directExchange("Dead_E01").build();
    }
    //死信队列
    @Bean
    public Queue deadQueue1(){
        return   QueueBuilder.durable("Dead_Q01").build();
    }
    //死信交换机与死信队列的绑定
    @Bean
    public Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
    }
    //业务队列
    @Bean
    public Queue queue1(){
      return   QueueBuilder
              .durable("Direct_Q01")
              .deadLetterExchange("Dead_E01")
              .deadLetterRoutingKey("RK_DEAD")
              //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
              //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
              .build();
    }

    //业务交换机
    @Bean
    public DirectExchange exchange(){
        return  ExchangeBuilder.directExchange("Direct_E01").build();
    }
    //业务交换机与队列的绑定
    @Bean
    public Binding binding1(Queue queue1,DirectExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }

    //@RabbitListener(queues = "Direct_Q01")
    public void receiveMessage(OrderingOk msg) {
        log.info("消费者1 收到消息:"+ msg );
        int  i= 5/0;
    }
}

延迟交换机消费者

import cn.hutool.core.map.MapUtil;
import com.beiyou.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

//@Configuration
@Slf4j
public class DelayConsumer {
    @Bean
    public Queue delayQueue1(){
      return   QueueBuilder.durable("Delay_Q01").lazy().build();
    }
    @Bean
    public CustomExchange delayExchange(){
        //参数x-delayed-type
        Map<String, Object> map = MapUtil.of("x-delayed-type","direct");
        return new CustomExchange("Delay_E01","x-delayed-message",true,false,map);
    }
    @Bean
    public Binding binding1(Queue delayQueue1,CustomExchange delayExchange){
        return BindingBuilder.bind(delayQueue1).to(delayExchange).with("RK01").noargs();
    }

   // @RabbitListener(queues = "Delay_Q01")
    public void receiveMessage(OrderingOk msg) {
        log.info("消费者1 收到消息:"+ msg );
    }


}

3.2, 创建生产者

3.1.1,创建直连交换机生产者

import com.beiyou.model.RegisterOk;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DirectProvider {
//导入一个直连交换机
    @Autowired
    public RabbitTemplate rabbitTemplate;
//"Direct_E01":这是交换机(Direct)的名字,采用Direct类型交换机
    public void send(Object message){
        rabbitTemplate.convertAndSend("Direct_E01","R01",message);
    }
}

3.1.2,创建广播交换机生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class FanoutProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;
//"Fanout_E01":这是交换机(Exchange)的名字,采用Fanout类型交换机
    public void send(Object message) {
        rabbitTemplate.convertAndSend("Fanout_E01", "", message);
    }
}

3.1.3创建主题交换机生产者

import com.beiyou.model.Girl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class TopicProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;
//"Topic_E01":这是Topic类型的交换机名称,Topic交换机会根据Routing Key与队列绑定关系决定消息投递给哪些队列。
    public void send(Girl girl) {
        rabbitTemplate.convertAndSend("Topic_E01",girl.getHeight(), girl);
    }
}

为了解决消息被丢失的情况我们要使用特殊的交换机,比如: 死信交换机,延迟交换机

3.1.4创建死信交换机生产者

import com.beiyou.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DeadProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;
//  这里重写了一个方法,自定义了一个死信交换机和要做的业务
    public void send(OrderingOk orderingOk) {
        rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                int id  = orderingOk.getId();
                int expiration = 0;
                if(id == 1){
                    expiration = 50*1000;
                }else if(id == 2){
                    expiration = 40*1000;
                }else if(id ==3){
                    expiration = 30*1000;
                }else if(id ==4){
                    expiration = 20*1000;
                }else if(id ==5){
                    expiration = 10*1000;
                }
//这句话是重点
                //为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费
                message.getMessageProperties().setExpiration(String.valueOf(expiration));
                return message;
            }
        });
    }
}

3.1.5创建延迟交换机生产者

import com.beiyou.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DelayProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;
//这里我们重写了一个方法,自定义了一个延迟交换机,和要做的业务
    public void send(OrderingOk orderingOk) {
        rabbitTemplate.convertAndSend("Delay_E01", "RK01", orderingOk,new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                int id  = orderingOk.getId();
                int ttl = 0;
                if(id == 1){
                    ttl = 50*1000;
                }else if(id == 2){
                    ttl = 40*1000;
                }else if(id ==3){
                    ttl = 30*1000;
                }else if(id ==4){
                    ttl = 20*1000;
                }else if(id ==5){
                    ttl = 10*1000;
                }
                //延迟交换机使用的delay参数,设置消息的延期时长,单位是微妙
                message.getMessageProperties().setDelay(ttl);
                //延迟交换机消息默认是持久化的
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                return message;
            }
        });
    }
}

一个交换机对多个队列的特点:

一个队列对多个消费者特点:


网站公告

今日签到

点亮在社区的每一天
去签到