【RabbitMQ】rabbitmq在spring boot中的使用

发布于:2025-03-18 ⋅ 阅读:(22) ⋅ 点赞:(0)

rabbitmq官网地址:https://www.rabbitmq.com/tutorials

下面介绍rabbitmq官网中七种使用方式在spring boot中如何使用

下面是基于 Spring Boot 使用 RabbitMQ 实现这七种模式的示例代码。假设已经引入了以下依赖:

Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

1. Hello World! 模式

生产者和消费者直接发送和接收消息。

配置类

@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "hello";

    @Bean
    public Queue helloQueue() {
        return new Queue(QUEUE_NAME);
    }
}

生产者

@RestController
@RequestMapping("/hello")
public class HelloProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "Hello RabbitMQ!");
        return "Message sent!";
    }
}

消费者

@Component
public class HelloConsumer {

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receive(String message) {
        System.out.println("Received: " + message);
    }
}

2. Work Queues(工作队列)

多个消费者从同一个队列中获取任务,进行任务分发。

配置类

@Configuration
public class WorkQueueConfig {
    public static final String WORK_QUEUE = "work_queue";

    @Bean
    public Queue workQueue() {
        return new Queue(WORK_QUEUE);
    }
}

生产者

@RestController
@RequestMapping("/work")
public class WorkQueueProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
        rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);
        return "Work message sent!";
    }
}

消费者(多个)

@Component
public class WorkConsumer {

    @RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
    public void receive(String message) throws InterruptedException {
        System.out.println("Worker received: " + message);
        Thread.sleep(1000);  // 模拟任务耗时
    }
}

3. Publish/Subscribe(发布/订阅)

使用 Fanout Exchange 实现广播。

配置类

@Configuration
public class FanoutConfig {
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Queue queue1() {
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

生产者

@RestController
@RequestMapping("/fanout")
public class FanoutProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", "Fanout message!");
        return "Fanout message sent!";
    }
}

4. Routing(路由模式)

使用 Direct Exchange 和路由键实现定向投递。

配置类

@Configuration
public class DirectConfig {
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("direct.error");
    }

    @Bean
    public Queue infoQueue() {
        return new Queue("direct.info");
    }

    @Bean
    public Binding errorBinding() {
        return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
    }

    @Bean
    public Binding infoBinding() {
        return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");
    }
}

5. Topics(主题模式)

使用 Topic Exchange 实现多级路由。

配置类

@Configuration
public class TopicConfig {
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        return new Queue("topic.queue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.queue2");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.error");
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("log.#");
    }
}

6. RPC(远程过程调用)

生产者发送请求,消费者处理后返回响应。

RPC 服务端

@Component
public class RpcServer {

    @RabbitListener(queues = "rpc_queue")
    public String process(String message) {
        return "Processed: " + message;
    }
}

RPC 客户端

@RestController
@RequestMapping("/rpc")
public class RpcClient {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
        Object response = rabbitTemplate.convertSendAndReceive("rpc_queue", msg);
        return "RPC response: " + response;
    }
}

7. Publisher Confirms(发布者确认)

确保消息成功发送到 RabbitMQ 服务器。

配置类

@Configuration
public class ConfirmConfig {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
        factory.setPublisherConfirms(true);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message confirmed");
            } else {
                System.err.println("Message failed: " + cause);
            }
        });
        return template;
    }
}