RabbitMQ

发布于:2025-03-14 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、MQ相关的概念

1.1、MQ的基本概念

什么是MQ

MQ(Message Queue,消息队列)是一种应用程序对应用程序的通信方法。应用程序通过写入和检索出入队列的针对性消息来通信,这些消息可以存储在内存或磁盘中。消息队列允许应用程序独立地运行,并以可靠的方式相互通信。

为啥要用MQ
  1. 解耦: 允许系统独立开发、部署和运行,减少系统间的直接依赖
  2. 异步处理: 非阻塞操作,请求处理与响应分离
  3. 削峰填谷: 缓冲突发请求,防止系统过载
  4. 可靠通信: 确保消息传递的可靠性,支持持久化
  5. 扩展性: 便于系统水平扩展
  6. 顺序保证: 某些MQ可确保消息按特定顺序处理
常用的MQ
  1. RabbitMQ: 基于Erlang开发,实现AMQP协议,轻量级,易于部署
  2. Kafka: 高吞吐量的分布式消息系统,适合大数据场景
  3. RocketMQ: 阿里开源的分布式消息中间件,高性能、高可靠
  4. ActiveMQ: 老牌消息中间件,实现JMS规范
  5. ZeroMQ: 更像是网络通信库,无需单独的消息服务器

1.2、消息队列协议

什么是协议

协议是通信双方约定的规则集合,定义了通信格式、通信方式、数据传输方式等。

网络协议的三要素
  1. 语法: 数据与控制信息的结构或格式
  2. 语义: 解释控制信息每个部分的含义
  3. 时序: 事件发生的顺序
常用消息中间件协议
  1. AMQP (Advanced Message Queuing Protocol): 高级消息队列协议,是一个开放标准的应用层协议,为面向消息的中间件设计。RabbitMQ实现了AMQP。
  2. MQTT (Message Queuing Telemetry Transport): 轻量级的发布/订阅消息传输协议,适用于物联网场景。
  3. STOMP (Simple/Streaming Text Oriented Messaging Protocol): 简单的面向文本的消息传递协议。
  4. JMS (Java Message Service): Java消息服务,是Java平台中关于面向消息中间件的API。
  5. OpenWire: ActiveMQ的原生协议。

1.3、消息队列持久化

消息队列持久化是指将消息存储到磁盘上,确保在消息代理重启后消息不会丢失。

持久化方式:

  • 内存持久化: 速度快但不可靠,服务重启后数据丢失
  • 文件持久化: 将消息存储在文件系统中
  • 数据库持久化: 将消息存储在数据库中,便于管理和查询

RabbitMQ持久化需要:

  1. 队列持久化:声明队列时设置为持久化
  2. 消息持久化:发送消息时设置持久化属性

1.4、消息的分发策略

消息队列分发策略决定了消息如何分发给多个消费者:

  1. 轮询分发 (Round-Robin): 依次将消息发送给下一个消费者,实现负载均衡
  2. 公平分发 (Fair Dispatch): 根据消费者的处理能力分发消息,通常基于确认机制
  3. 一致性哈希: 相同特征的消息发送到同一消费者
  4. 随机分发: 随机选择消费者
  5. 优先级分发: 根据消息优先级分发

1.5、消息队列的高可用和高可靠

高可用:确保服务在部分节点故障时仍能继续工作

实现方式:

  • 集群模式
  • 镜像队列
  • 主备模式
  • 负载均衡

高可靠:确保消息不丢失

实现方式:

  • 消息持久化
  • 发送方确认机制
  • 消费方确认机制
  • 消息重试机制

二、RabbitMQ安装启动

Linux安装

# 安装Erlang环境
apt-get install erlang

# 安装RabbitMQ
apt-get install rabbitmq-server

# 启动RabbitMQ服务
systemctl start rabbitmq-server

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

# 创建用户
rabbitmqctl add_user admin password

# 设置用户角色
rabbitmqctl set_user_tags admin administrator

# 设置用户权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

访问管理界面: http://server-ip:15672

Docker安装

# 拉取RabbitMQ镜像(带管理插件)
docker pull rabbitmq:3-management

# 启动容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

三、RabbitMQ快速入门

3.1、RabbitMQ的概念

RabbitMQ是实现AMQP(高级消息队列协议)的开源消息代理软件。它接收、存储和转发消息数据。RabbitMQ用Erlang语言编写,集群和故障转移非常容易。

3.2、AMQP协议

AMQP(高级消息队列协议)是一个开放标准的应用层协议,为面向消息的中间件设计。AMQP定义了消息传递所需的各种元素,如交换机、队列、绑定、路由键等。

AMQP的主要特点:

  • 可互操作性:不同供应商实现的AMQP可互操作
  • 安全性:支持TLS加密和SASL认证
  • 可靠性:事务支持,确认机制
  • 标准化:开放标准

3.3、RabbitMQ架构组成

RabbitMQ的核心架构由以下部分组成:

  • Broker: 接收和分发消息的消息中间件服务节点
  • Virtual Host: 虚拟主机,用于隔离不同的消息环境
  • Connection: 客户端与RabbitMQ Broker的TCP连接
  • Channel: 信道,建立在Connection之上的虚拟连接
  • Exchange: 交换机,接收消息并转发到绑定的队列
  • Queue: 队列,存储消息
  • Binding: 绑定,交换机与队列之间的关联
  • Routing Key: 路由键,交换机根据路由键决定消息转发到哪个队列

3.4、四大核心概念

  1. 生产者(Producer): 发送消息的应用程序
  2. 交换机(Exchange): 接收生产者发送的消息,并根据路由键将消息路由到一个或多个队列
  3. 队列(Queue): 存储消息的缓冲区,直到消费者接收它们
  4. 消费者(Consumer): 接收消息的应用程序

3.5、RabbitMQ角色分类

RabbitMQ中的用户角色:

  1. 超级管理员(administrator): 可以管理用户、策略、虚拟主机等
  2. 监控者(monitoring): 可以查看节点的状态
  3. 策略制定者(policymaker): 可以管理策略和参数
  4. 管理者(management): 可以管理虚拟主机中的资源
  5. 普通用户(none): 只能使用已配置的虚拟主机和资源

3.6、RabbitMQ消息模式

简单模式

一个生产者、一个队列、一个消费者的最简单模式。

工作模式

一个生产者、一个队列、多个消费者,消息在多个消费者间分发。

发布/订阅模式(fanout)

生产者将消息发送到fanout交换机,交换机将消息广播到所有绑定的队列。

路由模式(direct)

生产者将消息发送到direct交换机,交换机根据消息的路由键将消息路由到特定队列。

主题模式(topic)

生产者将消息发送到topic交换机,交换机根据路由键的模式匹配将消息路由到队列。

参数模式

生产者将消息发送到headers交换机,交换机根据消息头信息而非路由键进行路由。

四、简单模式——Hello Word

4.1、导入rabbitmq依赖

Maven依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

4.2、编写消息生产者 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 队列名称
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ地址
        factory.setHost("localhost");
        // 设置用户名和密码
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 发送消息
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        
        // 关闭通道和连接
        channel.close();
        connection.close();
    }
}

4.3、编写消息消费者 

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.co