NameServer核心组件
Broker核心组件
Broker心跳注册
Producer发送消息过程
- Producer启动时会从NameServer拉取broker信息缓存在本次,发送消息时先从本地缓存找,找不到在从NameServer拉取。启动定时任务定时从NameServer拉取broker的最新信息。
- Producer发送消息时,默认会轮询⽬标Topic下的所有MessageQueue,并采⽤递增取模的⽅式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的⽬的。⽽由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
- Producer如果发送失败则继续查找与上次失败不同的broker中的MessageQueue发送消息,默认重试次数是2。如果是顺序消息使用了MessageSelector,则使用提供的算法查找MessageQueue去发送消息。
Consumer拉取消息
- 集群模式下,consumer与MessageQueue会通过算法确定拉取关系,即一个MessageQueue只会由消费组中的一个consumer负责拉取。
- ⼴播模式下,每⼀条消息都会投递给订阅了Topic的所有消费者实例,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
- 推模式与拉模式:都是使用拉的方式去服务端请求。拉模式底层基于 长轮询(Long Polling) 模拟“推送”效果:消费者发起请求后,Broker 主动将消息推送给消费者,Broker 若暂无消息,会保持连接直到有消息或超时。推模式消费端只需要注册监听器,拉模式需显式调用 pull 方法,并处理偏移量(offset)、网络重试等问题。
消息持久化
- CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消息的偏移量为⽂件名。
- ConsumerQueue:ConsumeQueue⽂件主要是加速消费者进⾏消息索引。每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ConsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件中的消费进度,会保存在config/consumerOffset.json⽂件当中。
- IndexFile:主要是辅助消费者进⾏消息索引。消费者进⾏消息消费时,通过ConsumeQueue⽂件就⾜够完成消息检索了,但是如果消费者指定时间戳进⾏消
费,或者要按照MeessageId或者MessageKey来检索⽂件,⽐如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue⽂件就不够⽤了。IndexFile⽂件就是⽤来辅助这类消息检索的。 - abort:这个⽂件是RocketMQ⽤来判断程序是否正常关闭的⼀个标识⽂件。正常情况下,会在启动时创建,⽽关闭服务时删除。但是如果遇到⼀些服务器宕机,或者kill-9这样⼀些⾮正常关闭服务的情况,这个abort⽂件就不会删除,因此RocketMQ就可以判断上⼀次服务是⾮正常关闭的,后续就会做⼀些数据恢复的操作。比如根据CommitLog恢复ConsumerQueue和IndexFile来保证消息对齐。
- Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue⽂件。然后对超过72⼩时的数据进⾏删除。默认情况下,RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。RocketMQ在删除过期CommitLog⽂件时,并不检查消息是否被消费过。
延迟消息
延迟消息在broker端的processor处理时会先把消息转发到系统Topic下(固定延迟级别转移到SCHEDULE_TOPIC_XXXX Topic中,指定时间转移到rmq_sys_wheel_timer Topic中),延迟消息被转存到系统的Topic下之后,接下来就是要启动⼀系列的定时任务。延迟时间到了后,再将消息转储回到Producer提交的业务Topic和Queue中,这样就可以正常被消费者消费了。