6.RabbitMQ死信队列

发布于:2025-03-10 ⋅ 阅读:(18) ⋅ 点赞:(0)

六、RabbitMQ死信队列

1、过期消息

过期消息也叫TTL消息(TTL:Time To Live)

消息的过期时间有两种设置方式:(过期消息)

  • 单条消息过期

    单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;

  • 队列属性设置所有消息过期

    队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;

如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。

6.1.1、设置消息过期时间

模块:rabbitmq-06-ttl-01

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: ttl-learn1

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import com.sun.source.tree.ImportTree;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.MessageProperties;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 设置单条消息的过期时间
     */
    public void sendMsg() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("35000"); //过期的毫秒
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message);
        log.info("消息发送完毕,发送时间为:{}", new Date());
    }
}

定义队列/交换机

package com.longdidi.config;


import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
    }

    /**
     * 2、定义队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).build();
    }

    /**
     * 3、绑定交换机与队列
     *
     * @param directExchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_NAME = "exchange.ttl.a";
    public static final String QUEUE_TNAME = "queue.ttl.a";
    public static final String ROUTING_KEY = "info";
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Ttl01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Ttl01Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

发送消息后查看

在这里插入图片描述

35秒之后查看

在这里插入图片描述

6.1.2、设置队列消息过期时间

模块:rabbitmq-06-ttl-02

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: ttl-learn2

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    public void sendMsg() {
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message);
        log.info("消息发送完毕,发送时间为:{}", new Date());
    }
}

定义队列/交换机

package com.longdidi.config;


import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 1、定义交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
    }

    /**
     * 2、定义队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        //方式1 new Queue 的方式
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 15000); //消息过期时间
        return new Queue(RabbitMQConstant.QUEUE_TNAME, true, false, false, arguments);
        /*//方式2 建造者
        return QueueBuilder
                .durable(RabbitMQConstant.QUEUE_TNAME)
                .withArguments(arguments).
                build();*/
    }

    /**
     * 3、绑定交换机与队列
     *
     * @param directExchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    public static final String EXCHANGE_NAME = "exchange.ttl.b";
    public static final String QUEUE_TNAME = "queue.ttl.b";
    public static final String ROUTING_KEY = "info";
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Ttl02Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Ttl02Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

先后发送两条消息

在这里插入图片描述

第一条消息到期后消失

在这里插入图片描述

第二条消息到期后消失

在这里插入图片描述

2、死信队列

死信队列也叫死信交换机、死信邮箱等(DLX: Dead-Letter-Exchange)

6.2.1、死信原因

死信是由于以下几种情况造成的

  • 消息到达消息设置的过期时间后仍没有被消费

    可能是因为消息积压太多,消息消费不过来

    也可能是没有对应的消费者

  • 消息到达了队列的过期时间后仍没有被消费

  • 队列的消息数量超过了队列的长度,先到达的消息仍没有被消费

  • 消费者手动确认消息后没有让消息重新入队

  • 消费者拒绝接收该消息,也没有让消息重新入队

在这里插入图片描述

6.2.2、死信示例

如下情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机

(1)、单条消息过期

消息设置了过期时间,在到达过期时间后消息没有被消费

可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者

这里演示没有消费者消费,消息到达了过期时间变成死信的情况

测试模块rabbitmq-06-dlx-01

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: dlx-learn1

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

生产者

发送消息时给消息添加过期时间

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */

    public void sendMsg() {
        //消息属性
        MessageProperties messageProperties = new MessageProperties();
        //设置单条消息的过期时间,单位为毫秒,数据类型为字符串
        messageProperties.setExpiration("15000");
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
        log.info("消息发送完毕:发送时间为:{}", new Date());

    }
}

定义队列/交换机

给正常队列绑定死信交换机和设置死信路由

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.1";
    // 正常队列,没有消费者,设置过期时间
    public static final String QUEUE_NORMAL_NAME = "queue.normal.1";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.dlx.1";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.dlx.1";
    // 正常路由key
    public static final String NORMAL_KEY = "order1";
    // 死信路由key
    public static final String DLX_KEY = "error1";
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx01Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

发送消息

在这里插入图片描述

过期后查看死信队列

在这里插入图片描述

(2)、队列设置过期

队列设置了过期时间,在到达过期时间后消息没有被消费

可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者

这里演示没有消费者消费,消息到达了过期时间变成死信的情况

测试模块rabbitmq-06-dlx-02

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: dlx-learn2

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

import org.springframework.amqp.core.MessageProperties;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
        log.info("消息发送完毕:发送时间为:{}", new Date());
    }
}

定义队列/交换机

定义队列时设置队列的过期时间

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机
     *
     * @return
     */
    /**
     * 正常交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        //设置队列的过期时间为20秒
        arguments.put("x-message-ttl", 20000);
        // 设置这两个参数
        // 重点:给正常队列绑定死信交换机和设置死信路由的key
        // 也就是消息过期后发送到哪个死信交换机,发送时设置死信路由的key
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME); //设置对列的死信交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY); //设置死信路由key,要和死信交换机和死信队列绑定key一模一样
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
    }

    /**
     * 死信交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.2";
    // 正常队列:没有消费者、设置过期时间
    public static final String QUEUE_NORMAL_NAME = "queue.normal.2";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.dlx.2";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.dlx.2";
    // 正常路由key
    public static final String NORMAL_KEY = "order2";
    // 死信路由key
    public static final String DLX_KEY = "error2";
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx02Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx02Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

先发送消息查看队列消息

在这里插入图片描述

消息超时后查看死信队列

在这里插入图片描述

(3)、队列到达最大长度

先入队的消息会被发送到DLX

测试模块rabbitmq-06-dlx-03

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

server:
  port: 8080

spring:
  application:
    name: dlx-learn3

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

import org.springframework.amqp.core.MessageProperties;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        for (int i = 1; i <= 8; i++) {
            String str = "hello world " + i;
            Message message = MessageBuilder.withBody(str.getBytes())
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);
        }

        log.info("消息发送完毕:发送时间为:{}", new Date());
    }
}

定义队列/交换机

定义队列时设置队列的最大长度

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        //设置对列的最大长度
        arguments.put("x-max-length", 5);
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
                .withArguments(arguments) // 设置对列过期时间
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
    }

    /**
     * 死信交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.3";
    // 正常队列:没有消费者、设置过期时间
    public static final String QUEUE_NORMAL_NAME = "queue.normal.3";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.dlx.3";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.dlx.3";
    // 正常路由key
    public static final String NORMAL_KEY = "order3";
    // 死信路由key
    public static final String DLX_KEY = "error3";
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx03Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx03Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

在这里插入图片描述

(4)、消费消息不进行重新投递
  • 从正常的队列接收消息,但是对消息不进行确认并且不对消息进行重新投递,此时消息就进入死信队列
  • 业务处理过程中出现异常也会变成死信,因为消费者没有进行确认

测试模块rabbitmq-06-dlx-04

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

需要开启消费者手动确认模式

否则默认是消息消费后自动确认的

server:
  port: 8080

spring:
  application:
    name: dlx-learn4

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi
    # 开启消费者手动确认
    listener:
      simple:
        acknowledge-mode: manual

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        String str = "hello world";
        Message message = MessageBuilder.withBody(str.getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);

        log.info("消息发送完毕:发送时间为:{}", new Date());
    }
}

定义队列/交换机

需要设置队列的过期时间,这样消费者消费后不确认消息,超时后才能进入死信队列

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    /**
     * 正常交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 15000); //设置对列的过期时间
        // 重点:设置这两个参数
        //设置对列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
    }

    /**
     * 死信交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.4";
    // 正常队列,没有消费者,设置过期时间
    public static final String QUEUE_NORMAL_NAME = "queue.normal.4";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.dlx.4";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.dlx.4";
    // 正常路由key
    public static final String NORMAL_KEY = "order4";
    // 死信路由key
    public static final String DLX_KEY = "error4";
}

定义消费者

消费者消费时开启消息手动确认模式,消费消息后手动确认后不回复确认消息,也不对消息进行重新投递

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.io.IOException;

@Service
@Slf4j
public class ReceiveMessageService {

    /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     * <p>
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
    public void receiveMsg(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        //对消息不确认,ack单词是 确认 的意思
        // void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        // deliveryTag:消息的一个数字标签
        // multiple:如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认;false表示只对当前deliveryTag标签的消息Nack
        // requeue:如果是true表示消息被Nack后重新发送到队列;如果是false表示消息被Nack后不会重新发送到队列

        //获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息的唯一标识,类似身份证或者学号
        long deliveryTag = messageProperties.getDeliveryTag();

        try {
            byte[] body = message.getBody();
            String str = new String(body);
            log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
            //TODO 业务逻辑处理
            // 这里模拟一个异常,出现异常后进行手动不确认并且不重新投递设置
            int a = 1 / 0;
            //消费者的手动确认:false表示只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("接收着出现问题:{}", e.getMessage());
            try {
                //消费者的手动不确认:参数3为false表示不重新入队(不重新投递),就会变成死信;为true表示是重新入队
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            throw new RuntimeException(e);
        }
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx04Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx04Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

在这里插入图片描述

(5)、消费者拒绝消息

开启手动确认模式并拒绝消息,不重新投递,则进入死信队列

测试模块rabbitmq-06-dlx-05

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ

需要开启消费者手动确认模式

server:
  port: 8080

spring:
  application:
    name: dlx-learn5

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi
    # 开启消费者手动确认模式
    listener:
      simple:
        acknowledge-mode: manual

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 生产者发送消息
     */
    public void sendMsg() {
        String str = "hello world";
        Message message = MessageBuilder.withBody(str.getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message);

        log.info("消息发送完毕:发送时间为:{}", new Date());
    }
}

定义队列/交换机

这里的队列无需设置队列的过期时间,因为消费者拒绝后如果不重新投递就直接进入死信队列

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置对列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.QUEUE_DLX_NAME);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
    }

    /**
     * 死信交换机 type=direct
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
    }
}

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.5";
    // 正常队列,没有消费者,设置过期时间
    public static final String QUEUE_NORMAL_NAME = "queue.normal.5";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.dlx.5";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.dlx.5";
    // 正常路由key
    public static final String NORMAL_KEY = "order5";
    // 死信路由key
    public static final String DLX_KEY = "error5";
}

消费者

消费者拒绝消息后不进行重新投递

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.Message;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;

@Slf4j
@Service
public class ReceiveMessageService {

    /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     * <p>
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
    public void receiveMsg(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        //获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息的唯一标识,类似身份证或者学号
        long deliveryTag = messageProperties.getDeliveryTag();

        try {
            byte[] body = message.getBody();
            String str = new String(body);
            log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
            //TODO 业务逻辑处理
            //这里模拟一个业务异常,出现异常后进入消费者拒绝设置,不进行重新投递
            int a = 1 / 0;
            //消费者的手动确认:false是只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("接收着出现问题:{}", e.getMessage());
            try {
                // 拒绝消息:参数1是消息的标识,参数2是否重新入队(false表示拒绝后不重新入队),不可以批量处理
                channel.basicReject(deliveryTag, false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            throw new RuntimeException(e);
        }
    }

}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx05Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx05Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

6.2.3、死信应用场景

  • 监听死信消息以便查找问题
    //获取消息的唯一标识,类似身份证或者学号
    long deliveryTag = messageProperties.getDeliveryTag();

      try {
          byte[] body = message.getBody();
          String str = new String(body);
          log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
          //TODO 业务逻辑处理
          //这里模拟一个业务异常,出现异常后进入消费者拒绝设置,不进行重新投递
          int a = 1 / 0;
          //消费者的手动确认:false是只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
          channel.basicAck(deliveryTag, false);
      } catch (Exception e) {
          log.error("接收着出现问题:{}", e.getMessage());
          try {
              // 拒绝消息:参数1是消息的标识,参数2是否重新入队(false表示拒绝后不重新入队),不可以批量处理
              channel.basicReject(deliveryTag, false);
          } catch (IOException ex) {
              throw new RuntimeException(ex);
          }
          throw new RuntimeException(e);
      }
    

    }

}


`发送消息`

```java
package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq06Dlx05Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq06Dlx05Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

6.2.3、死信应用场景

  • 监听死信消息以便查找问题
  • 可以实现延时队列业务

网站公告

今日签到

点亮在社区的每一天
去签到