MQ介绍:
简介:
MQ,一种提供消息队列服务的中间件,也称为消息中间件,是一套提供消息生产,存储,消费的全过程API软件系统。消息就是数据。
用途:
1.限流削峰:先把系统超量的请求暂存在MQ中,以便系统后期慢慢处理,避免请求的丢失或系统被压垮。
2.异步解耦:由同步转化为异步,在两层之间添加一个MQ,提高系统的吞吐量和并发度,解耦。
3.数据收集:分布式系统会产生海量级数据流,如:业务日志,监控数据,用户行为等,对数据进行实时或批量采集汇总,进行大数据分析,MQ是很好的选择。
MQ产品:
1.ActiveMQ:java语言开发,社区活跃度底,现在很少用。
2.RabbitMQ:Erlang语言开发,吞吐量较Kafka和RocketMQ低,因为不是java语言开发,使用它定制开发难度较大。
3.Kafka:Scala/java语言开发,最大特点高吞吐率,常用于大数据领域,没有遵循常见的MQ协议,使用的是自研协议。对于Spring Cloud Netflix,其仅支持RabbitMQ和Kafka。
4.RocketMQ:java语言开发,经过数年阿里双11考验,性能与稳定性非常高,没有遵循常见的MQ协议,使用的是自研协议。对于Spring Cloud Alibaba,其支持RocketMQ,Kafka,但提倡使用RocketMQ。
MQ常见协议:
1.JMS
2.STOMP
3.AMQP
4.MQTT
常识:
MOM:面向消息的中间件
PO:面向过程
OO:面向对象
AO:面向切面
RocketMQ介绍:
简介:
一个统一消息引擎,轻量级数据处理平台。阿里巴巴开源的消息中间件。
官网:
https://rocketmq.apache.org/
RocketMQ安装与启动:
基本概念:
1.消息(Message):消息系统所传输信息的物理载体,生成和消费数据的最小单位,每条消息必须属于一个主题。
2.主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,主题是RocketMQ进行消息订阅的基本单位。(主题可以看作是一个种类,消息是某个种类下的一种物品)
Topic:message = 1:n message:topic = 1:1
producer:topic = 1:n consumer:topic = 1:1
3.标签(Tag):为消息设置的标签,用于同一主题下区分不同类型的消息。消费者可以根据标签实现对不同子主题的不同消费逻辑,实现更好的扩展性。
4.队列(Queue):存储消息的物理实体。一个主题中可以包含多个队列,每个队列中存放的就是该主题的消息。一个主题的队列也被称为分区。一个主题的队列中的消息只能被一个消费者组中的一个消费者消费。一个队列中的消息不允许同一个消费者租中的多个消费者同时消费。
分片:分片不同于分区。在RocketMQ中,分片是指存放相应主题的Broker。每个分片中会创建相应数量的分区,每个分区大小都是相同的。
5.消息标识(MessageId/key):RocketMQ中每个消息都拥有唯一的MessageId,且可以携带具有业务标识的key,以便对消息的查询。注:MessageId有两个,在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId,offsetMsgId和key都称为消息标识。
msgId:producer端生成,producerIp+进程pid+MessageClientIDSetter类的ClassLoader的hashCode+当前时间+AutomaticInteger自增计数器。
offsetMsgId:broker端生成,brokerIp+物理分区的offset(Queue中的偏移量)。
key:用户指定的业务相关的唯一标识。
系统架构:
1.生产者(Producer):消息生产者,负责生产消息。例如,把秒杀请求写入到MQ的过程,就是消息生产的过程。RocketMQ中的消息生产者都是以生产者组的形式出现的。生产者组是同一类生产者的集合,这类生产者发送相同主题的消息。一个生产者组可以同时发送多个主题的消息。
2.消费者(Consumer):消息消费者,负责消费消息。例如,业务系统从MQ中读取到秒杀请求,并对请求处理的过程就是消息消费的过程。一个消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。RocketMQ中的消息消费者都是以消费者组形式出现。消费者组是同一类消费者的集合,这类消费者消费的是同一个主题的消息。实现负载均衡(将一个主题中的不同队列平均分配给同一个消费者组的不同消费者消费,并不是将消息负载均衡,是将队列负载均衡)和容错(一个消费者挂了,该消费者组中其它消费者可以接着消费挂了的消费者的剩下的队列。)
3.注册中心(Name Server):一个Broker和主题路由的注册中心,支持Broker的动态注册与发现。
包含两个功能,Broker管理,路由信息管理。
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳检测机制,检查Broker是否还存活。
路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。生产者和消费者通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
路由注册:
与zk、Eureka、Nacos等注册中心不同的地方:NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着一个Broker列表,用来动态存储Broker的信息。
NameServer的无状态方式:优点:NameServer集群搭建简单,扩容简单。缺点:对于Broker,必须明确指出所有NameServer地址。否则未指出的将不会去注册。也正因为如此,NameServer并不能随便扩容。因为,若Broker不重新配置,新增的NameServer对于Broker来说是不可见的,其不会向这个NameServer进行注册。
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每 30 秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。
路由剔除:
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。
NameServer中有一个定时任务,每隔 10 秒就会扫描一次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过 120 秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。
扩展:对于RocketMQ日常运维工作,例如Broker升级,需要停掉Broker的工作。需要怎么做?
将Broker的读写权限禁掉。一旦客户端(消费者或生产者)向broker发送请求,都会收到broker的没有权限的响应,然后客户端会进行对其它Broker的重试。
当这个Broker没有流量后,再关闭它,就实现Broker从NameServer的移除。
路由发现:
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每 30 秒会拉取一次最新的路由。
扩展:
Push模型:推送模型。其实时性较好,是一个“发布-订阅”模型,需要维护一个长连接。而长连接的维护是需要资源成本的。该模型适合于的场景:实时性要求较高;客户端数量不多,服务端数据变化较频繁 。(客户端和服务端一直连接,当服务端路由改变了,就马上告诉客户端)
Pull模型:拉取模型。存在的问题是,实时性较差。 (客户端每 30 秒拉取一次服务端的路由,如果服务端路由改变了,客户端就改变)
Long Polling模型:长轮询模型。其是对Push与Pull模型的整合,充分利用了这两种模型的优势,屏蔽了它们的劣势。(客户端每 30 秒拉取一次服务端的路由,访问时间保留一段时间,如果服务端路由改变了,客户端就改变,没有改变时,如果保留的时间结束,就断开连接,30秒后再访问连接)
客户端NameServer选择策略:
客户端指的是Producer与Consumer。客户端在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢?客户端首先会生产一个随机数,然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用round-robin策略,逐个尝试着去连接其它节点。
总结就是,首先采用的是随机策略进行的选择,失败后采用的是轮询策略。
Zookeeper Client选择Zookeeper Server:
简单来说就是,经过两次(打散)Shufæe,然后选择第一台Zookeeper Server。
详细说就是,将配置文件中的Zookeeper server地址进行第一次(打散)shufæe,然后随机选择一个。这个选择出的一般都是一个hostname。然后获取到该hostname对应的所有ip,再对这些ip进行第二次(打散)shufæe,从(打散)shufæe过的结果中取第一个server地址进行连接。
4.Broker:Broker充当着消息中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。
Kafka 0.8版本之后,offset是存放在Broker中的,之前版本是存放在Zookeeper中的。
Broker Server的功能模块示意图。
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。
Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息
Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。
为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。不过,这里有个问题,如果某Broker节点宕机,如何保证数据不丢失呢?其解决方案是,将每个Broker集群节点进行横向扩展,即将Broker节点再建为一个HA集群,解决单点问题。Broker节点集群是一个主从集群,即集群中具有Master与Slave两种角色。Master负责处理读写操作请求,Slave负责对Master中的数据进行备份。当Master挂掉了,Slave则会自动切换为Master去工作。所以这个Broker集群是主备集群。一个Master可以包含多个Slave,但一个Slave只能隶属于一个Master。Master与Slave 的对应关系是通过指定相同的BrokerName、不同的BrokerId 来确定的。BrokerId为 0 表示Master,非 0 表示Slave。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。集群部署如下:
5.工作流程:
1 )启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。
2 )启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每 30 秒向NameServer定时发送心跳包。
3 )发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。
手动创建Topic时,有两种模式:
集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量是相同的。
Broker模式:该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以不同。
自动创建Topic时:
默认采用的是Broker模式,会为每个Broker默认创建 4 个Queue。
4 )Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每 30 秒从NameServer更新一次路由信息。
5 )Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每 30 秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
读/写队列:
从物理上来讲,读/写队列是同一个队列。所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念。一般情况下,读/写队列数量是相同的。但是读写队列也可能不一致:
例如,创建Topic时设置的写队列数量为 8 ,读队列数量为 4 ,此时系统会创建 8 个队列,分别是0 1 2 3 4 5 6 7。生产者会将消息写入到这 8 个队列,但消费者只会消费0 1 2 3这 4 个队列中的消息,4 5 6 7 中的消息是不会被消费到的。
再如,创建Topic时设置的写队列数量为 4 ,读队列数量为 8 ,此时系统会创建 8 个队列,分别是0 1 2 3 4 5 6 7。生产者会将消息写入到0 1 2 3 这 4 个队列,但消费者会消费0 1 2 3 4 5 6 7这 8个队列中的消息,但是4 5 6 7中是没有消息的。此时假设消费者组中包含两个消费者,消费者1消费0 1 2 3,而消费者2消费4 5 6 7。但实际情况是,消费者2是没有消息可消费的。也就是说,当读/写队列数量设置不同时,总是有问题的。读/写队列数量设置不同时目的:方便Topic的队列的缩容。
例如,原来创建的Topic中包含 16 个队列,如何能够使其队列缩容为 8 个,还不会丢失消息?可以动态修改写队列数量为 8,读队列数量不变。此时新的消息只能写入到前 8 个队列,而消费者消费的却是16 个队列中的数据。当发现后 8个队列中的消息消费完毕后,就可以再将读队列数量动态设置为 8 。整个缩容过程,没有丢失任何消息。perm用于设置对当前创建Topic的操作权限: 2 表示只写, 4 表示只读, 6 表示读写。
window下载安装RocketMQ:
官网:
https://rocketmq.apache.org/
说明文档:
下载:
下载好binary包后,解压到一个无空格无中文的目录下:
启动:
java的默认启动参数中,启动时堆内存的大小为2g,太大了,调整如下:打开bin目录下的runserver.cmd
把Xms,Xmx,-Xmn都设置成512m如下:
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改成
set "JAVA_OPT=%JAVA_OPT% -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
同理设置runbroker.cmd:
打开bin目录下的runbroker.cmd,修改如下:
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g"
修改成
set "JAVA_OPT=%JAVA_OPT% -server -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98 -Xms512m -Xmx512m -Xmn512m"
-Drocketmq.broker.diskSpaceWarningLevelRatio=0.98:表示把Broker磁盘空间利用率设置为98%,默认是85%。
正确如下:
配置环境变量:
ROCKETMQ_HOME=D:\product\rocketmq-all-4.8.0-bin-release
变量名:ROCKETMQ_HOME
变量值:RocketMQ的bin目录的上一级目录
启动Name Server:
直接点击bin目录下的mqnamesrv.cmd
或 在bin目录下打开cmd,输入
start mqnamesrv.cmd
出现下图表示启动成功
启动Broker:
直接点击bin目录下的mqbroker.cmd
或 在bin目录下打开cmd,输入
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
出现下图表示启动成功
RocketMQ插件部署:
地址:
https://github.com/apache/rocketmq-externals
下载项目,可以使用git目录下载,也可以用压缩包下载方式下载,如果以上地址下载不了,可以使用下面地址百度网盘下载
链接: https://pan.baidu.com/s/1U1MGsXaumOEQ2X2A8j8gjQ?pwd=8daq 提取码: 8daq 复制这段内容后打开百度网盘手机App,操作更方便哦
下载成功目录如下:
进入D:\product\rocketmq-externals-master\rocketmq-console\src\main\resources目录打开application.properties进行配置如下:
配置如下:
编译插件,cmd进入rocketmq-externals-master\rocketmq-console执行以下命令进行maven打包并跳过测试:
mvn clean package -Dmaven.test.skip=true
成功如下,编译成功,maven会将插件编译成一个jar包,放到target文件夹下面:
启动:
在cmd进入target目录执行如下命令,启动rocketmq-console-ng-1.0.0.jar:
java -jar rocketmq-console-ng-1.0.1.jar
输入成功:
在浏览器输入127.0.0.1:8088,出现如下命令表示安装成功:
发送消息和接收消息:
配置环境变量:
ROCKETMQ_HOME="D:\product\rocketmq-all-4.8.0-bin-release"
NAMESRV_ADDR="localhost:9876"
ROCKETMQ_HOME=“D:\product\rocketmq-all-4.8.0-bin-release” RocketMq的安装目录,bin目录的上一个目录。
NAMESRV_ADDR=“localhost:9876” rocketMq的地址。
按照上面方式启动Name Server 和 Broker;
发送消息(cmd到bin目录的上一个目录,执行以下命令进行发送消息):
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Producer
成功如图:
接收消息(cmd到bin目录的上一个目录,执行以下命令进行接收消息):
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Consumer
成功如图:
具体操作也可以看官网:
https://rocketmq.apache.org/docs/quick-start/