个人认为:
与普通的routing模式不同的是,在消息产生者和交换机之间,多一个延迟队列来缓存需要延迟发送的消息,
消息生产者将消息发送到延迟队列后,由延迟队列缓存消息,并且计算延迟发送的时间,在到时间后发送给交换机,再由交换机发往消费者监听的队列
需要创建的东西:
死信队列
死信交换机
延迟队列
步骤:
创建死信队列
//配置一个队列,队列名称为boot.deadQueue @Bean public Queue deadQueue(){ return new Queue("boot.deadQueue"); }
创建死信交换机
//配置一个直连交换机,交换机名称为boot.deadEx @Bean public DirectExchange deadEx(){ return new DirectExchange("boot.deadEx"); }
将死信队列与死信交换机绑定在一起
//将队列boot.deadQueue和交换机boot.deadEx绑定,同时给队列boot.deadQueue定义路由key为dk @Bean public Binding deadQueueBindK(){ return BindingBuilder.bind(deadQueue()).to(deadEx()).with("dk"); }
创建延迟队列
//配置一个延时队列,队列名称为boot.msQueue @Bean public Queue msQueue(){ //map中配置了延迟队列的配置项 Map<String, Object> map = new HashMap<>(); //延时时间(1分钟) map.put("x-message-ttl", 1000*60); //时间到后走哪个交换机 map.put("x-dead-letter-exchange", "boot.deadEx"); //时间到后走哪个路由key绑定的队列 map.put("x-dead-letter-routing-key", "dk"); /** 参数一:延迟队列名 参数二:该队列是否为持久的 参数三:队列是否为唯一的 参数四:是否启动队列消息自动删除 参数五:配置项 */ return new Queue("boot.msQueue", true, false, false, map); }
消费者监听死信队列
/* @RabbitListener(queues = "boot.deadQueue") 从名称为boot.deadQueue的队列中消费消息,即从死信队列中消费消息; */ @RabbitListener(queues = "boot.deadQueue") public void handlerMsg(Message message, Channel channel){ System.out.println("消费者:"+new String(message.getBody())); System.out.println("收到消息的时间:"+new Date()); //签收消息 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } }
生产者往延迟队列中传入内容
//注入RabbitMQ模板 @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { //向延时队列boot.msQueue发送消息 rabbitTemplate.convertAndSend("boot.msQueue","这是延时消息"); System.out.println("发送消息时间:"+new Date()); }
补充:由消息生产者传入指定的延迟时间
@Test void contextLoads() { //向延时队列boot.msQueue发送消息 //使用三个参数的重载方法,来往消息队列中传递信息,第三个参数为回调函数,设置消息的延迟时间 rabbitTemplate.convertAndSend("boot.msQueue","这是延时消息", message -> { //指定消息发送的演示时间为20秒 message.getMessageProperties().setExpiration("20000"); return message; }); System.out.println("发送消息时间:"+new Date()); }