RabbitMQ
一、RabbitMQ概述
RabbitMQ
RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布式系统和微服务架构中
异步消息处理:RabbitMQ 允许系统组件通过消息传递异步交互,提高性能和响应速度。
消息持久化:支持将消息保存到磁盘,确保消息不会因服务器故障而丢失。
灵活的路由:通过交换器(Exchanges)和队列(Queues)的组合,可以灵活地路由和分发消息。
高可用性:支持镜像队列和集群,确保消息系统的高可用性。
多种协议支持:支持 AMQP 0-9-1、STOMP、MQTT 等多种消息协议。
管理界面:提供易于使用的管理界面,方便监控和管理消息队列。
基本特点:
支持多语言客户端:RabbitMQ几乎支持所有常用的语言,比如java、Ruby、.NET等。
提供跟踪机制:RabbitMQ提供消息跟踪机制,如果消息异常,使用者可以查出发生了什么情况。
提供插件机制:RabbitMQ提供了许多插件,从多方面进行扩展,也可以编写自己的插件
Broker:就是 RabbitMQ 服务,用于接收和分发消息,接受客户端的连接,实现 AMQP 实体服务。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange 或 queue 等。
Connection:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
Channel:网络信道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立连接的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。
Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host:虚拟节点,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,是 message 到达 broker 的第一站,用于根据分发规则、匹配查询表中的 routing key,分发消息到 queue 中去,不具备消息存储的功能。常用的类型有:direct、topic、fanout。
Bindings:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
Queue:消息队列,保存消息并将它们转发给消费者进行消费。
消息路由
直连交换机
扇形交换机
主题交换机
头交换机
二、RabbitMQ的工作模式
2.1. 简单模式
- 描述:在这种模式下,生产者(P)将消息发送到一个队列中,消费者(C)从该队列中接收消息。
- 特点:只有一个队列,一个生产者和一个消费者。消息直接从生产者传递给消费者。
2.2. 工作模式
- 描述:生产者(P)将消息发送到一个队列中,多个消费者(C1, C2)可以订阅这个队列。
- 特点:多个消费者可以共享同一个队列,消息会被轮流分配给不同的消费者,实现负载均衡。
2.3. 发布订阅模式
- 描述:生产者(P)将消息发送到一个交换机(X),交换机根据路由键将消息分发到多个队列中,每个队列由一个或多个消费者(C1, C2)订阅。
- 特点:一个消息可以被多个消费者接收,适用于广播场景。
2.4. 路由模式
- 描述:生产者(P)将消息发送到一个交换机(X),交换机根据路由键将消息分发到特定的队列中,每个队列由一个或多个消费者(C1, C2)订阅。
- 特点:通过路由键来决定消息应该发送到哪个队列,支持更灵活的消息路由。
2.5. 主题模式
- 描述:生产者(P)将消息发送到一个交换机(X),交换机根据主题(通常是通配符匹配)将消息分发到多个队列中,每个队列由一个或多个消费者(C1, C2, C3)订阅。
- 特点:使用通配符(如
*
和#
)进行灵活的主题匹配,支持更复杂的路由规则。
2.6. RPC模式
- 描述:客户端(Client)发送一个请求到服务器(Server),服务器处理请求后返回响应。客户端通过一个临时队列(rpc_queue)接收响应。
- 特点:模拟远程过程调用(RPC),客户端等待服务器的响应,适用于需要同步处理的场景。
三、RabbitMQ工作原理
连接与信道建立:
生产者和消费者分别与RabbitMQ服务器建立连接,并创建通信信道。
队列声明:
生产者声明一个或多个队列,用于存储消息。
消息发送:
生产者通过信道将消息发送到交换机。
交换机根据预定义的路由规则和绑定,将消息路由到一个或多个队列中。
消息接收与处理:
消费者订阅一个或多个队列,并从这些队列中接收消息进行处理。
消费者处理完消息后,向RabbitMQ服务器发送确认消息(ACK)。
消息删除:
RabbitMQ服务器在接收到消费者的确认消息后,从队列中删除该消息。
四、RabbitMQ应用
4.1 RabbitMQ安装和配置
官网地址https://www.rabbitmq.com/,选择版本4.1.0
下载Erlang
地址https://www.erlang.org/downloads
RabbitMQ4.1.0与Erlang的版本兼容
4.2 安装Erlang
双击点开,接着选取要安装的路径,然后一路傻瓜式安装 next 下一步,安装即可。
**【注意】**不要安装在中文或带空格的文件路径下
4.3 安装RabbitMQ
右键管理员运行,然后选择安装路径,接着一路 next 下一步,遇到弹窗点允许,没有弹窗则无视。
**【注意】**不要安装在中文或带空格的文件路径下
打开cmd,命令移动到sbin目录下
执行命令
rabbitmq-plugins enable rabbitmq_management
启动rabbitmq
使用管理员打开cmd
net start RabbitMQ
访问RabbitMQ
地址:http://127.0.0.1:15672
输入用户名guest和密码guest,进去之后如图:
4.3 Java访问RabbitMq工程
第一步,引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.24.3</version>
</dependency>
第二步,编写生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setVirtualHost("/");
// 建立到代理服务器的连接
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "testRoutingKey";
// 发布消息
byte[] messageBodyBytes = "jx".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
第三步,编写消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException
{
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setVirtualHost("/");
// 建立到代理服务器的连接
Connection conn = factory.newConnection();
// 创建信道
final Channel channel = conn.createChannel();
// 声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "testRoutingKey";
// 绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while (true) {
// 消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键: " + routingKey);
System.out.println("消费的内容类型: " + contentType);
long deliveryTag = envelope.getDeliveryTag();
// 确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息内容: ");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
4.4 Spring Boot 整合RabbitMQ工程
第一步,引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.1</version>
<relativePath/> <!-- lookup parent in repository -->
</parent>
<artifactId>spring_boot_rabbitmq_demo</artifactId>
<packaging>jar</packaging>
<name>spring_boot_rabbitmq_demo Maven Webapp</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
第二步,编写配置
在resources下编写application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 启用发布确认
publisher-return: true # 启用发布返回
listener:
simple:
acknowledge-mode: manual # 手动确认消息
prefetch: 1 # 每次只处理一个消息
第三步 编写配置类
@Configuration
public class RabbitMQConfig {
// 定义交换器名称
public static final String EXCHANGE_NAME = "hello-exchange";
// 定义队列名称
public static final String QUEUE_NAME = "hello-queue";
// 定义路由键
public static final String ROUTING_KEY = "testRoutingKey";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME, true, false);
}
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
}
}
第四步,编写生产者
@Service
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
// 构建消息内容
Message msg = new Message(message.getBytes(), new MessageProperties());
// 发送消息到指定的交换器和路由键
rabbitTemplate.send(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg);
System.out.println("已发送消息: " + message);
}
@PostConstruct
public void init() {
// 测试发送消息
sendMessage("Hello, RabbitMQ!");
}
}
第五步,编写消费者
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void processMessage(byte[] message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Header("amqp_consumerTag") String consumerTag,
Channel channel) throws IOException {
try {
// 处理消息
String messageStr = new String(message);
System.out.println("接收到的消息: " + messageStr);
// 手动确认消息
channel.basicAck(deliveryTag, false); //是否批量确认
} catch (Exception e) {
e.printStackTrace();
// 如果处理失败,可以选择拒绝消息或者重新入队
channel.basicNack(deliveryTag, false, true);//第二是否批量 第三个是是否重新入队
}
}
}
第六步, 编写启动类
@SpringBootApplication
public class RabbitMQApplication{
@Autowired
private RabbitMQProducer producer;
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}
4.5 实现消息向页面推送(基于4.4内容)
第一步,引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.1</version>
<relativePath/> <!-- lookup parent in repository -->
</parent>
<artifactId>spring_boot_rabbitmq_demo</artifactId>
<packaging>jar</packaging>
<name>spring_boot_rabbitmq_demo Maven Webapp</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
第二步,修改配置文件application.yml 添加server配置
server:
port: 8080
context-path: /
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 启用发布确认
publisher-return: true # 启用发布返回
listener:
simple:
acknowledge-mode: manual # 手动确认消息
prefetch: 1 # 每次只处理一个消息
第三步,增加stomp配置类
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic"); // 启用简单消息代理,订阅路径为/topic
}
}
第四步,编写测试controller
@Controller
@RequestMapping("test")
public class StompController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@RequestMapping("/send")
@ResponseBody
public void send() {
messagingTemplate.convertAndSend("/topic/greetings", "Hello World!");
}
}
第五步,提供前端html页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>STOMP Example</title>
<script src="https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<h1>STOMP Web Message Push</h1>
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>
<script type="text/javascript">
var stompClient = null;
function connect() {
var socket = new SockJS('http://localhost:8080/ws');// 连接到注册的STOMP端点
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greetings', function (greeting) {
alert(greeting);
});
});
}
function disconnect() {
if (stompClient !== null) {
stompClient.disconnect();
}
console.log("Disconnected");
}
</script>
</body>
</html>
运行效果