六、RabbitMQ死信队列
1、过期消息
过期消息也叫TTL消息(TTL:Time To Live)
消息的过期时间有两种设置方式:(过期消息)
单条消息过期
单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;
队列属性设置所有消息过期
队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;
如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。
6.1.1、设置消息过期时间
模块:rabbitmq-06-ttl-01
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: ttl-learn1
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import com.sun.source.tree.ImportTree;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.MessageProperties;
import java.util.Date;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 设置单条消息的过期时间
*/
public void sendMsg() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("35000"); //过期的毫秒
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
定义队列/交换机
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 1、定义交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 2、定义队列
*
* @return
*/
@Bean
public Queue queue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).build();
}
/**
* 3、绑定交换机与队列
*
* @param directExchange
* @param queue
* @return
*/
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
public static final String EXCHANGE_NAME = "exchange.ttl.a";
public static final String QUEUE_TNAME = "queue.ttl.a";
public static final String ROUTING_KEY = "info";
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Ttl01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Ttl01Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
发送消息后查看
35秒之后查看
6.1.2、设置队列消息过期时间
模块:rabbitmq-06-ttl-02
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: ttl-learn2
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
定义队列/交换机
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 1、定义交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 2、定义队列
*
* @return
*/
@Bean
public Queue queue() {
//方式1 new Queue 的方式
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 15000); //消息过期时间
return new Queue(RabbitMQConstant.QUEUE_TNAME, true, false, false, arguments);
/*//方式2 建造者
return QueueBuilder
.durable(RabbitMQConstant.QUEUE_TNAME)
.withArguments(arguments).
build();*/
}
/**
* 3、绑定交换机与队列
*
* @param directExchange
* @param queue
* @return
*/
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
public static final String EXCHANGE_NAME = "exchange.ttl.b";
public static final String QUEUE_TNAME = "queue.ttl.b";
public static final String ROUTING_KEY = "info";
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Ttl02Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Ttl02Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
先后发送两条消息
第一条消息到期后消失
第二条消息到期后消失
2、死信队列
死信队列也叫死信交换机、死信邮箱等(DLX: Dead-Letter-Exchange)
6.2.1、死信原因
死信是由于以下几种情况造成的
消息到达消息设置的过期时间后仍没有被消费
可能是因为消息积压太多,消息消费不过来
也可能是没有对应的消费者
消息到达了队列的过期时间后仍没有被消费
队列的消息数量超过了队列的长度,先到达的消息仍没有被消费
消费者手动确认消息后没有让消息重新入队
消费者拒绝接收该消息,也没有让消息重新入队
6.2.2、死信示例
如下情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机
(1)、单条消息过期
消息设置了过期时间,在到达过期时间后消息没有被消费
可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者
这里演示没有消费者消费,消息到达了过期时间变成死信的情况
测试模块rabbitmq-06-dlx-01
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: dlx-learn1
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
发送消息时给消息添加过期时间
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
*/
public void sendMsg() {
//消息属性
MessageProperties messageProperties = new MessageProperties();
//设置单条消息的过期时间,单位为毫秒,数据类型为字符串
messageProperties.setExpiration("15000");
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}
定义队列/交换机
给正常队列绑定死信交换机和设置死信路由
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.1";
// 正常队列,没有消费者,设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.1";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.1";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.1";
// 正常路由key
public static final String NORMAL_KEY = "order1";
// 死信路由key
public static final String DLX_KEY = "error1";
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx01Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
发送消息
过期后查看死信队列
(2)、队列设置过期
队列设置了过期时间,在到达过期时间后消息没有被消费
可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者
这里演示没有消费者消费,消息到达了过期时间变成死信的情况
测试模块rabbitmq-06-dlx-02
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: dlx-learn2
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import org.springframework.amqp.core.MessageProperties;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
*/
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}
定义队列/交换机
定义队列时设置队列的过期时间
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
/**
* 正常交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
//设置队列的过期时间为20秒
arguments.put("x-message-ttl", 20000);
// 设置这两个参数
// 重点:给正常队列绑定死信交换机和设置死信路由的key
// 也就是消息过期后发送到哪个死信交换机,发送时设置死信路由的key
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME); //设置对列的死信交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY); //设置死信路由key,要和死信交换机和死信队列绑定key一模一样
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
/**
* 死信交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.2";
// 正常队列:没有消费者、设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.2";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.2";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.2";
// 正常路由key
public static final String NORMAL_KEY = "order2";
// 死信路由key
public static final String DLX_KEY = "error2";
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx02Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx02Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
先发送消息查看队列消息
消息超时后查看死信队列
(3)、队列到达最大长度
先入队的消息会被发送到DLX
测试模块rabbitmq-06-dlx-03
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: dlx-learn3
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import org.springframework.amqp.core.MessageProperties;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
*/
public void sendMsg() {
for (int i = 1; i <= 8; i++) {
String str = "hello world " + i;
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
}
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}
定义队列/交换机
定义队列时设置队列的最大长度
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
//设置对列的最大长度
arguments.put("x-max-length", 5);
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列过期时间
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
/**
* 死信交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.3";
// 正常队列:没有消费者、设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.3";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.3";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.3";
// 正常路由key
public static final String NORMAL_KEY = "order3";
// 死信路由key
public static final String DLX_KEY = "error3";
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx03Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx03Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(4)、消费消息不进行重新投递
- 从正常的队列接收消息,但是对消息不进行确认并且不对消息进行重新投递,此时消息就进入死信队列
- 业务处理过程中出现异常也会变成死信,因为消费者没有进行确认
测试模块rabbitmq-06-dlx-04
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
需要开启消费者手动确认模式
否则默认是消息消费后自动确认的
server:
port: 8080
spring:
application:
name: dlx-learn4
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
# 开启消费者手动确认
listener:
simple:
acknowledge-mode: manual
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
*/
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}
定义队列/交换机
需要设置队列的过期时间,这样消费者消费后不确认消息,超时后才能进入死信队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 15000); //设置对列的过期时间
// 重点:设置这两个参数
//设置对列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
/**
* 死信交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.4";
// 正常队列,没有消费者,设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.4";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.4";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.4";
// 正常路由key
public static final String NORMAL_KEY = "order4";
// 死信路由key
public static final String DLX_KEY = "error4";
}
定义消费者
消费者消费时开启消息手动确认模式,消费消息后手动确认后不回复确认消息,也不对消息进行重新投递
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.io.IOException;
@Service
@Slf4j
public class ReceiveMessageService {
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
* <p>
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
public void receiveMsg(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//对消息不确认,ack单词是 确认 的意思
// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息的一个数字标签
// multiple:如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认;false表示只对当前deliveryTag标签的消息Nack
// requeue:如果是true表示消息被Nack后重新发送到队列;如果是false表示消息被Nack后不会重新发送到队列
//获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//获取消息的唯一标识,类似身份证或者学号
long deliveryTag = messageProperties.getDeliveryTag();
try {
byte[] body = message.getBody();
String str = new String(body);
log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
//TODO 业务逻辑处理
// 这里模拟一个异常,出现异常后进行手动不确认并且不重新投递设置
int a = 1 / 0;
//消费者的手动确认:false表示只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("接收着出现问题:{}", e.getMessage());
try {
//消费者的手动不确认:参数3为false表示不重新入队(不重新投递),就会变成死信;为true表示是重新入队
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx04Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx04Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(5)、消费者拒绝消息
开启手动确认模式并拒绝消息,不重新投递,则进入死信队列
测试模块rabbitmq-06-dlx-05
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
需要开启消费者手动确认模式
server:
port: 8080
spring:
application:
name: dlx-learn5
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
# 开启消费者手动确认模式
listener:
simple:
acknowledge-mode: manual
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
*/
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}
定义队列/交换机
这里的队列无需设置队列的过期时间,因为消费者拒绝后如果不重新投递就直接进入死信队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置对列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.QUEUE_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
/**
* 死信交换机 type=direct
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.5";
// 正常队列,没有消费者,设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.5";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.5";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.5";
// 正常路由key
public static final String NORMAL_KEY = "order5";
// 死信路由key
public static final String DLX_KEY = "error5";
}
消费者
消费者拒绝消息后不进行重新投递
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.Message;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
@Slf4j
@Service
public class ReceiveMessageService {
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
* <p>
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
public void receiveMsg(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//获取消息的唯一标识,类似身份证或者学号
long deliveryTag = messageProperties.getDeliveryTag();
try {
byte[] body = message.getBody();
String str = new String(body);
log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
//TODO 业务逻辑处理
//这里模拟一个业务异常,出现异常后进入消费者拒绝设置,不进行重新投递
int a = 1 / 0;
//消费者的手动确认:false是只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("接收着出现问题:{}", e.getMessage());
try {
// 拒绝消息:参数1是消息的标识,参数2是否重新入队(false表示拒绝后不重新入队),不可以批量处理
channel.basicReject(deliveryTag, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx05Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx05Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
6.2.3、死信应用场景
监听死信消息以便查找问题
//获取消息的唯一标识,类似身份证或者学号
long deliveryTag = messageProperties.getDeliveryTag();try { byte[] body = message.getBody(); String str = new String(body); log.info("接收到的消息为:{},接收时间为:{}", str, new Date()); //TODO 业务逻辑处理 //这里模拟一个业务异常,出现异常后进入消费者拒绝设置,不进行重新投递 int a = 1 / 0; //消费者的手动确认:false是只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("接收着出现问题:{}", e.getMessage()); try { // 拒绝消息:参数1是消息的标识,参数2是否重新入队(false表示拒绝后不重新入队),不可以批量处理 channel.basicReject(deliveryTag, false); } catch (IOException ex) { throw new RuntimeException(ex); } throw new RuntimeException(e); }
}
}
`发送消息`
```java
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq06Dlx05Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq06Dlx05Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
6.2.3、死信应用场景
- 监听死信消息以便查找问题
- 可以实现延时队列业务