流程图概括
1,在项目中引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2,配置RabbitMQ连接
在application.properties
或application.yml
中配置RabbitMQ服务器的连接参数:
3,DirectExchange
创建消费者
直连交换机消费者
import com.beiyou.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
//@Configuration
@Slf4j
public class DirectConsumer {
//注册一个队列
@Bean //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的
public Queue queue1(){
return QueueBuilder.durable("Direct_Q01").maxLength(100).build();
}
//注册交换机
@Bean
public DirectExchange exchange(){
//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的
return ExchangeBuilder.directExchange("Direct_E01").build();
}
@Bean //交换机与队列关系
public Binding binding1(Queue queue1,DirectExchange exchange){
return BindingBuilder.bind(queue1).to(exchange).with("RK01");
}
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage(OrderingOk msg) {
log.info("消费者1 收到消息:"+ msg );
int i= 5/0;
}
}
广播交换机消费者
import com.beiyou.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConsumer {
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("Fanout_Q01").build();
}
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("Fanout_E01").build();
}
@Bean //交换机与队列关系
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage(OrderingOk msg){
System.out.println("FanoutConsumer1 消费者1 收到消息:"+msg);
}
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage32(OrderingOk msg){
System.out.println("FanoutConsumer1 消费者2 收到消息:"+msg);
}
}
主题交换机消费者
import com.beiyou.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//@Configuration
public class TopicConsumer {
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("Topic_E01").build();
}
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("小龙").build();
}
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("海洋").build();
}
@Bean
public Queue topicQueue3(){
return QueueBuilder.durable("文超").build();
}
@Bean //交换机与队列关系
public Binding TopicBinding1(Queue topicQueue1,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#");
}
@Bean //交换机与队列关系
public Binding TopicBinding2(Queue topicQueue2,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("1.6.*");
}
@Bean //交换机与队列关系
public Binding TopicBinding3(Queue topicQueue3,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue3).to(topicExchange).with("1.8.*");
}
@RabbitListener(queues = "小龙")
public void receiveMessage(OrderingOk msg){
System.out.println("小龙 收到消息:"+msg);
}
@RabbitListener(queues = "海洋")
public void receiveMessage2(OrderingOk msg){
System.out.println("海洋 收到消息:"+msg);
}
@RabbitListener(queues = "文超")
public void receiveMessage3(OrderingOk msg){
System.out.println("文超 收到消息:"+msg);
}
}
死信交换机消费者
import com.beiyou.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
@Slf4j
public class DeadConsumer {
//死信交换机
@Bean
public DirectExchange deadExchange(){
return ExchangeBuilder.directExchange("Dead_E01").build();
}
//死信队列
@Bean
public Queue deadQueue1(){
return QueueBuilder.durable("Dead_Q01").build();
}
//死信交换机与死信队列的绑定
@Bean
public Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){
return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
}
//业务队列
@Bean
public Queue queue1(){
return QueueBuilder
.durable("Direct_Q01")
.deadLetterExchange("Dead_E01")
.deadLetterRoutingKey("RK_DEAD")
//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
.build();
}
//业务交换机
@Bean
public DirectExchange exchange(){
return ExchangeBuilder.directExchange("Direct_E01").build();
}
//业务交换机与队列的绑定
@Bean
public Binding binding1(Queue queue1,DirectExchange exchange){
return BindingBuilder.bind(queue1).to(exchange).with("RK01");
}
//@RabbitListener(queues = "Direct_Q01")
public void receiveMessage(OrderingOk msg) {
log.info("消费者1 收到消息:"+ msg );
int i= 5/0;
}
}
延迟交换机消费者
import cn.hutool.core.map.MapUtil;
import com.beiyou.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
//@Configuration
@Slf4j
public class DelayConsumer {
@Bean
public Queue delayQueue1(){
return QueueBuilder.durable("Delay_Q01").lazy().build();
}
@Bean
public CustomExchange delayExchange(){
//参数x-delayed-type
Map<String, Object> map = MapUtil.of("x-delayed-type","direct");
return new CustomExchange("Delay_E01","x-delayed-message",true,false,map);
}
@Bean
public Binding binding1(Queue delayQueue1,CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue1).to(delayExchange).with("RK01").noargs();
}
// @RabbitListener(queues = "Delay_Q01")
public void receiveMessage(OrderingOk msg) {
log.info("消费者1 收到消息:"+ msg );
}
}
3.2, 创建生产者
3.1.1,创建直连交换机生产者
import com.beiyou.model.RegisterOk;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DirectProvider {
//导入一个直连交换机
@Autowired
public RabbitTemplate rabbitTemplate;
//"Direct_E01":这是交换机(Direct)的名字,采用Direct类型交换机
public void send(Object message){
rabbitTemplate.convertAndSend("Direct_E01","R01",message);
}
}
3.1.2,创建广播交换机生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FanoutProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
//"Fanout_E01":这是交换机(Exchange)的名字,采用Fanout类型交换机
public void send(Object message) {
rabbitTemplate.convertAndSend("Fanout_E01", "", message);
}
}
3.1.3创建主题交换机生产者
import com.beiyou.model.Girl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class TopicProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
//"Topic_E01":这是Topic类型的交换机名称,Topic交换机会根据Routing Key与队列绑定关系决定消息投递给哪些队列。
public void send(Girl girl) {
rabbitTemplate.convertAndSend("Topic_E01",girl.getHeight(), girl);
}
}
为了解决消息被丢失的情况我们要使用特殊的交换机,比如: 死信交换机,延迟交换机
3.1.4创建死信交换机生产者
import com.beiyou.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DeadProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
// 这里重写了一个方法,自定义了一个死信交换机和要做的业务
public void send(OrderingOk orderingOk) {
rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
int id = orderingOk.getId();
int expiration = 0;
if(id == 1){
expiration = 50*1000;
}else if(id == 2){
expiration = 40*1000;
}else if(id ==3){
expiration = 30*1000;
}else if(id ==4){
expiration = 20*1000;
}else if(id ==5){
expiration = 10*1000;
}
//这句话是重点
//为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费
message.getMessageProperties().setExpiration(String.valueOf(expiration));
return message;
}
});
}
}
3.1.5创建延迟交换机生产者
import com.beiyou.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DelayProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
//这里我们重写了一个方法,自定义了一个延迟交换机,和要做的业务
public void send(OrderingOk orderingOk) {
rabbitTemplate.convertAndSend("Delay_E01", "RK01", orderingOk,new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
int id = orderingOk.getId();
int ttl = 0;
if(id == 1){
ttl = 50*1000;
}else if(id == 2){
ttl = 40*1000;
}else if(id ==3){
ttl = 30*1000;
}else if(id ==4){
ttl = 20*1000;
}else if(id ==5){
ttl = 10*1000;
}
//延迟交换机使用的delay参数,设置消息的延期时长,单位是微妙
message.getMessageProperties().setDelay(ttl);
//延迟交换机消息默认是持久化的
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});
}
}