Docker 部署RabbitMQ

发布于:2025-03-19 ⋅ 阅读:(17) ⋅ 点赞:(0)

镜像

docker pull rabbitmq:management
docker pull rabbitmq:4.0.7-management

docker-compose.yml

services:
  rabbitmq:
    image: rabbitmq:3.9.5-management
    container_name: rabbitmq
    restart: always
    network_mode: "host"
    volumes:
      - /etc/localtime:/etc/localtime
      - ./rabbitmq/data:/var/lib/rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: Test123
      RABBITMQ_DEFAULT_VHOST: /test
  • 部署 RabbitMQ
docker-compose up -d rabbitmq

访问控制台

Spring Boot

  • pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • application.yml
spring:
  rabbitmq:
    host: 192.168.0.140
    port: 5672
    username: admin
    password: Test123
    virtual-host: /test
    listener:
      simple:
        # ACK模式(none,auto,manual,默认为auto)
        acknowledge-mode: auto
        # 开启重试
        retry:
          # 是否开启重试机制
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔(ms)
          initial-interval: 5000

批量声明队列

  • 常量类 MQConstants
public class MQConstants {

    public static final String TEST_QUEUE_1 = "test-queue-1";
    public static final String TEST_QUEUE_2 = "test-queue-2";
    
}
  • RabbitMQConfig
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public List<Queue> createQueues(RabbitAdmin rabbitAdmin) {
        List<Queue> queueList = new ArrayList<>();
        List<String> queueNameList = getConstantsValueList(MQConstants.class);
        for (String queueName : queueNameList) {
            queueList.add(new Queue(queueName, true));
        }
        // 让 RabbitAdmin 主动声明队列
        queueList.forEach(rabbitAdmin::declareQueue);
        return queueList;
    }

    private List<String> getConstantsValueList(Class<?> clazz) {
        Field[] fields = clazz.getDeclaredFields();
        List<String> valueList = new ArrayList<>(fields.length);
        for (Field field : fields) {
            int modifiers = field.getModifiers();
            if (Modifier.isStatic(modifiers) && Modifier.isFinal(modifiers)) {
                try {
                    valueList.add(field.get(null).toString());
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }
        return valueList;
    }

}
  • 生产者
public Test {
	
	@Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(MQConstants.TEST_QUEUE_1, "test");
    }
}
  • 消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class TestConsumer implements ChannelAwareMessageListener {

    @RabbitListener(queues = MQConstants.TEST_QUEUE_1, concurrency = "10")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String body = new String(message.getBody());
        System.out.println(" [x] Received: " + body);
        try {
            // 模拟业务逻辑
            if ("error".equals(body)) {
                throw new Exception("处理失败");
            }

            // 处理成功,手动 ACK
            // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println(" [✔] Done");
        } catch (Exception e) {
        	System.out.println(" [✖] Error: " + e.getMessage());
        	// 抛异常自动重试
        	throw e;
        	
            // 失败后重新入队
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

}