rabbitmq官网地址:https://www.rabbitmq.com/tutorials
下面介绍rabbitmq官网中七种使用方式在spring boot中如何使用
下面是基于 Spring Boot 使用 RabbitMQ 实现这七种模式的示例代码。假设已经引入了以下依赖:
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. Hello World! 模式
生产者和消费者直接发送和接收消息。
配置类
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "hello";
@Bean
public Queue helloQueue() {
return new Queue(QUEUE_NAME);
}
}
生产者
@RestController
@RequestMapping("/hello")
public class HelloProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "Hello RabbitMQ!");
return "Message sent!";
}
}
消费者
@Component
public class HelloConsumer {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receive(String message) {
System.out.println("Received: " + message);
}
}
2. Work Queues(工作队列)
多个消费者从同一个队列中获取任务,进行任务分发。
配置类
@Configuration
public class WorkQueueConfig {
public static final String WORK_QUEUE = "work_queue";
@Bean
public Queue workQueue() {
return new Queue(WORK_QUEUE);
}
}
生产者
@RestController
@RequestMapping("/work")
public class WorkQueueProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public String send(@PathVariable String msg) {
rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);
return "Work message sent!";
}
}
消费者(多个)
@Component
public class WorkConsumer {
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void receive(String message) throws InterruptedException {
System.out.println("Worker received: " + message);
Thread.sleep(1000); // 模拟任务耗时
}
}
3. Publish/Subscribe(发布/订阅)
使用 Fanout Exchange 实现广播。
配置类
@Configuration
public class FanoutConfig {
public static final String FANOUT_EXCHANGE = "fanout_exchange";
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Queue queue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue queue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
}
生产者
@RestController
@RequestMapping("/fanout")
public class FanoutProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", "Fanout message!");
return "Fanout message sent!";
}
}
4. Routing(路由模式)
使用 Direct Exchange 和路由键实现定向投递。
配置类
@Configuration
public class DirectConfig {
public static final String DIRECT_EXCHANGE = "direct_exchange";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Queue errorQueue() {
return new Queue("direct.error");
}
@Bean
public Queue infoQueue() {
return new Queue("direct.info");
}
@Bean
public Binding errorBinding() {
return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
}
@Bean
public Binding infoBinding() {
return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");
}
}
5. Topics(主题模式)
使用 Topic Exchange 实现多级路由。
配置类
@Configuration
public class TopicConfig {
public static final String TOPIC_EXCHANGE = "topic_exchange";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.error");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("log.#");
}
}
6. RPC(远程过程调用)
生产者发送请求,消费者处理后返回响应。
RPC 服务端
@Component
public class RpcServer {
@RabbitListener(queues = "rpc_queue")
public String process(String message) {
return "Processed: " + message;
}
}
RPC 客户端
@RestController
@RequestMapping("/rpc")
public class RpcClient {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public String send(@PathVariable String msg) {
Object response = rabbitTemplate.convertSendAndReceive("rpc_queue", msg);
return "RPC response: " + response;
}
}
7. Publisher Confirms(发布者确认)
确保消息成功发送到 RabbitMQ 服务器。
配置类
@Configuration
public class ConfirmConfig {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setPublisherConfirms(true);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("Message confirmed");
} else {
System.err.println("Message failed: " + cause);
}
});
return template;
}
}