【消息队列RocketMQ】一、RocketMQ入门核心概念与架构解析

发布于:2025-04-21 ⋅ 阅读:(24) ⋅ 点赞:(0)

在当今互联网技术飞速发展的时代,分布式系统的架构设计愈发复杂。消息队列作为分布式系统中重要的组件,在解耦应用、异步处理、削峰填谷等方面发挥着关键作用。RocketMQ 作为一款高性能、高可靠的分布式消息中间件,被广泛应用于各类互联网场景。之前我们发过中间件kafka,其功能与RocketMQ有些类似。但是毕竟是两个技术的迭代。下面让我们一起走进RocketMQ系列文章。

一、消息队列核心概念​

1.1 什么是消息队列​

消息队列(Message Queue)是一种应用间的通信方式,它允许不同的应用程序通过发送和接收消息来进行交互。简单来说,消息队列就像是一个存放消息的 “容器”,发送方将消息放入队列中,接收方从队列中取出消息进行处理 。在分布式系统中,消息队列可以解决应用程序之间的耦合问题,提高系统的可扩展性和稳定性。

1.2 消息队列的应用场景​

        1、异步处理:在电商系统中,用户下单后,需要进行库存扣减、订单记录、发送短信通知等一系列操作。如果采用同步处理,用户需要等待所有操作完成才能得到响应,体验较差。通过引入消息队列,将发送短信通知等非核心操作异步化处理,用户下单后立即返回响应,后续操作由消息队列异步完成,大大提高了系统的响应速度。​

        2、应用解耦:以一个大型的在线教育平台为例,课程服务、用户服务、支付服务等多个微服务之间存在复杂的调用关系。使用消息队列后,各个服务之间不再直接依赖,而是通过消息进行通信。当课程服务发布新课程时,只需将消息发送到消息队列,用户服务和支付服务根据自身需求从队列中获取消息进行处理,降低了服务之间的耦合度,提高了系统的可维护性和扩展性。​

        3、削峰填谷:在电商大促活动期间,短时间内会有大量的订单请求涌入系统。如果直接将这些请求发送到数据库进行处理,可能会导致数据库负载过高甚至崩溃。通过消息队列,可以将大量的订单请求先暂存起来,然后按照系统的处理能力逐步消费,实现流量的削峰填谷,保证系统的稳定性。

1.3 消息队列的优势​

  • 提高系统性能:通过异步处理,减少了请求的等待时间,提高了系统的响应速度。​
  • 增强系统稳定性:解耦应用程序,降低了系统的复杂性,当某个应用出现故障时,不会影响其他应用的正常运行。​
  • 提升系统扩展性:方便地添加或移除应用程序,适应业务的快速发展。

二、RocketMQ 架构解析​

2.1 RocketMQ 核心组件​

RocketMQ 主要由 NameServer、Broker、Producer 和 Consumer 四大核心组件构成,它们相互协作,共同完成消息的发送、存储和消费。​

        1、NameServer:NameServer 是 RocketMQ 的命名服务,提供轻量级的服务发现和路由功能。它保存着 Topic 与 Broker 的映射关系,以及 Broker 的相关信息,如 Broker 的地址、状态等。Producer 和 Consumer 通过 NameServer 获取 Topic 的路由信息,从而找到对应的 Broker 进行消息的发送和消费。NameServer 是无状态的,各个 NameServer 之间相互独立,不存在数据同步等复杂操作,保证了其高可用性。​

        2、Broker:Broker 是 RocketMQ 的核心组件,负责消息的存储、转发和查询等操作。它接收 Producer 发送的消息,并将消息存储到磁盘上,同时为 Consumer 提供消息的拉取服务。Broker 可以分为 Master 和 Slave 两种角色,Master 负责处理读写请求,Slave 用于备份数据,提高系统的可靠性和可用性。当 Master 出现故障时,Slave 可以切换为 Master 继续提供服务。​

        3、Producer:Producer 即消息生产者,负责将业务系统产生的消息发送到 Broker。Producer 可以根据业务需求选择不同的消息发送模式,如同步发送、异步发送和单向发送。在发送消息时,Producer 会根据 Topic 从 NameServer 获取路由信息,然后将消息发送到对应的 Broker 上。​

        4、Consumer:Consumer 是消息消费者,从 Broker 中拉取消息并进行处理。Consumer 支持两种消费模式:集群消费和广播消费。集群消费模式下,多个 Consumer 实例组成一个消费组,共同消费 Topic 中的消息,每个消息只会被消费组中的一个 Consumer 实例消费;广播消费模式下,Topic 中的消息会被消费组中的所有 Consumer 实例消费。

2.2 组件间的交互流程​

        1、Broker 注册:Broker 启动后,会向所有的 NameServer 发送心跳包,将自己的信息注册到 NameServer 中,包括 Broker 的地址、所属集群、Topic 配置等信息。NameServer 会定期检查 Broker 的心跳,如果超过一定时间没有收到心跳,就认为 Broker 已经下线,将其从路由表中移除。​

        2、Producer 获取路由信息:Producer 启动时,会从 NameServer 获取 Topic 的路由信息,包括该 Topic 对应的所有 Broker 地址和队列信息。Producer 根据一定的负载均衡策略,选择一个 Broker 发送消息。在消息发送过程中,如果发现 Broker 不可用,Producer 会重新从 NameServer 获取路由信息,选择其他可用的 Broker 进行发送。​

        3、消息发送:Producer 根据获取的路由信息,将消息发送到对应的 Broker。Broker 接收到消息后,会将消息存储到 CommitLog 文件中,并更新 ConsumeQueue 和 IndexFile 等索引文件,以便 Consumer 快速查询和消费消息。​

        4、Consumer 获取消息:Consumer 启动时,同样会从 NameServer 获取 Topic 的路由信息。然后,Consumer 根据消费模式(集群消费或广播消费)和负载均衡策略,从对应的 Broker 拉取消息进行消费。在消费过程中,Consumer 会记录消费进度,以便在重启后能够继续从上次消费的位置开始消费。

三、在 CentOS 7 上安装配置 RocketMQ​

3.1 环境准备​

在安装 RocketMQ 之前,需要确保 CentOS 7 系统已经安装了 Java 环境,因为 RocketMQ 是基于 Java 开发的。可以通过以下命令检查 Java 是否安装:

java -version

如果没有安装 Java,可以使用以下命令安装 OpenJDK 8:

sudo yum install -y java-1.8.0-openjdk-devel

3.2 下载 RocketMQ​

访问 RocketMQ 官方 GitHub 仓库(https://github.com/apache/rocketmq),下载最新版本的 RocketMQ 安装包。这里以 RocketMQ 4.9.4 版本为例,使用以下命令下载和解压:

wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-all-4.9.4-bin-release

3.3 配置 NameServer​

  1. 启动 NameServer:进入 RocketMQ 安装目录的 bin 文件夹,使用以下命令启动 NameServer:
nohup sh mqnamesrv &

nohup命令用于在后台运行进程,即使终端关闭,进程也不会停止。启动成功后,可以通过以下命令查看 NameServer 的日志,确认是否启动正常:

tail -f ~/logs/rocketmqlogs/namesrv.log

如果日志中出现 “the name server is started success” 等类似信息,说明 NameServer 启动成功。​

      2. 配置文件说明:NameServer 的配置文件位于 RocketMQ 安装目录的 conf 文件夹下,文件名为broker.conf。在默认情况下,NameServer 不需要进行复杂的配置即可正常工作。不过,如果需要修改 NameServer 的监听端口、日志存储路径等参数,可以在配置文件中进行修改。例如,修改 NameServer 的监听端口为 9877,可以在配置文件中添加以下内容:

listenPort=9877

3.4 配置 Broker​

  1. 修改配置文件:进入 RocketMQ 安装目录的 conf 文件夹,找到broker.conf文件,对其进行修改。以下是一个简单的broker.conf配置示例:
# broker名称,同一集群内名称要唯一
brokerName=broker-a
# 所属集群名称
brokerClusterName=DefaultCluster
# brokerId,0表示Master,大于0表示Slave
brokerId=0
# NameServer地址,多个地址用分号分隔
namesrvAddr=localhost:9876
# 存储路径
storePathRootDir=/home/rocketmq/store
# commitLog存储路径
storePathCommitLog=/home/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/home/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/home/rocketmq/store/index
# checkpoint文件存储路径
storeCheckpoint=/home/rocketmq/store/checkpoint
# abort文件存储路径
abortFile=/home/rocketmq/store/abort

在上述配置中,brokerName、brokerClusterName、brokerId等参数需要根据实际情况进行设置。namesrvAddr要指向正确的 NameServer 地址和端口。存储路径相关参数可以根据磁盘空间等情况进行调整。​

     2. 启动 Broker:使用以下命令启动 Broker:

nohup sh mqbroker -c conf/broker.conf &

-c参数指定配置文件的路径。启动成功后,可以通过以下命令查看 Broker 的日志:

tail -f ~/logs/rocketmqlogs/broker.log

如果日志中出现 “the broker [broker-a, 192.168.1.10:10911] boot success” 等类似信息,说明 Broker 启动成功。​

3.5 验证安装​

  1. 发送消息:进入 RocketMQ 安装目录的 bin 文件夹,使用以下命令发送一条测试消息:
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

该命令会执行 RocketMQ 自带的生产者示例代码,向 Broker 发送一条消息。​

      2. 接收消息:再打开一个终端,进入 RocketMQ 安装目录的 bin 文件夹,使用以下命令接收消息:

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果能够正常接收到刚才发送的消息,说明 RocketMQ 的安装和配置是成功的。

后续文章中将持续更新RocketMQ更多内容和功能,可以关注小编,期待后续文章。


网站公告

今日签到

点亮在社区的每一天
去签到