消息队列(RocketMQ+Kafka)

发布于:2025-02-27 ⋅ 阅读:(13) ⋅ 点赞:(0)

基础

什么是消息队列:具备生产者,消费者,消息队列的场景

应用场景:

  • 异步(电商订单的创建、支付、发货流程)
  • 解耦
  • 削峰填谷(淘宝的双十一)

需解决的问题:

  • 消息重复(唯一ID,幂等)
  • 消息丢失(ack确认机制,死信队列)
  • 消息堆积(增加消费者,增加消费能力,增加集群分担)
  • 高可用(集群,主从,多副本)
  • 高性能(集群,分区,多机部署,负载均衡)

RocketMQ

整体架构视频:小白debug的视频

面试题:

官方文档**【中文】**【写的比较好的官方文档】

什么是RocketMQ

是阿里云开发,后并入Apche的开源分布式消息队列框架。支持高并发,低延迟,以及集群部署能力。

有哪些结构

Producer:

生产者,可以是生产者集群,想消息队列中发送消息。需要通过NameServer建立连接。发送消息时,需要配置目标Broker的IP,以及Topic名称,Tag标记(相当于Topic的二级标记)。

发送方式:

  • 普通发送
  • 批量发送
  • 顺序发送
  • 延迟返送
  • 事务发送

Broker:

消息数据的物理节点,负责消息的持久化以及高可用。

支持的集群部署方式

  • 一个Master,多个Slave
  • 多个Master,多个Slave

也就是Master和Slave之间可以多对多,一对多的关系

Broker节点创建是,需要像NameServer注册,并发送所有的Topic信息

Consumer:

消费者,消费者需要分组,不同的分组有不同的消费策略

  • 集群消费:一个Group只有一个Consumer可以接收消息,因此有负载均衡策略。
  • 广播消费:每个Group的每个Consumer都会接收消息。

NameServer:

注册中心,负责Broker、Producer、Consumer之间的协调。

Broker: 向NameServer注册节点信息,并且同步所有Topic信息。

Producer: 向NameServer获取Broker和Topic信息,并于对应的Broker中的Master建立链接,发送消息。

Consumer: 向NameServer获取目标Topic的Broker地址信息,用于接收消息。

Consumer的负载均衡策略

注意: Consumer有两种模式,集群和广播模式,其中只有集群模式有均衡策略的必要,因为广播模式会向Group中的所有Consumer发送消息。

策略有:

  • 平均策略(默认)
  • Broker选择策略,也就是机器选择策略
  • 一致性hash(避免增删Broker时数据的迁移)

消息堆积如何处理

方式1:增加消费者

如果Topic的数量大于Consumer,可以适当增加消费者提高消费的吞吐量。

但不是无脑增加消费者就能解决,因为如果消费者超过Topic数量,再怎么增加也没有用,因为多个消费者会变成资源争抢。

方式2:增加消费者的消费能力

如果不能增加消费者,那可以提高消费效率来减少消息堆积,例如 优化代码提高代码性能

方式3:辅助措施

  • 增加系统监控,提前发现,提前做出响应
  • 增加Broker节点,专门增加一个消费者,只是将消息搬运到新的Broker,不做任何处理

有哪些消费方式

两种:Push、Pull

Push:

类似监听注册机制,监听某个Topic绑定一个消费函数,当收到消息就会触发函数。

缺点:如果消费能力不足,可能导致消息堆积

Pull:

主动绑定某个Topic获取消息

优点:可以主动控制消息的消费,减少消息堆积

如果让你考虑实现一个消息队列中间件,你会怎么实现

  1. 考虑实现多个主题的消息队列
  2. 为了提升并发性能,将每个主题的消息队列拆分到多个服务器,提高并发量
  3. 为了提高可用性,对每个主题增加主从备份,并设置一致性策略,例如Raft,Quorum,Zookeeper等
  4. 为了避免节点宕机问题,需要进行持久化,将每个队列存储到文件,并定期刷到磁盘中
  5. 为了解决多节点随机写的效率问题,集中将多个队列数据放到一个文件,对这个文件进行一次性刷盘,利用顺序写提高刷盘性能
  6. 由于增加了集群,因此需要增加一个注册发现服务,也就是注册中心,可以使用Zookeeper或者直接类似RocketMQ的自研NameServer
  7. 需要考虑和处理一些问题,例如
    1. 消息堆积问题
    2. 集群的监控问题
    3. 消息的消费策略
    4. 负载均衡问题
    5. 容错机制

什么是零拷贝,RocketMQ使用了什么方式

知识点:什么是零拷贝

零拷贝是一个相对的概念,针对用户空间于内核空间的交互,使用mmap (内存映射)+write操作让用户空间和内核空间实现零拷贝,但是内核空间依然有3次拷贝,因此这里的零拷贝相对的是用户和内核空间。

第二种方式是使用 sendfile,此方法也是内核方法,可以直接在内核空间发起数据拷贝,从磁盘拷贝到内核空间在拷贝到IO,并且这里的拷贝不会有CPU参与,而是使用DMA控制器(Direct Memory Access)。这里指的是CPU零拷贝。

其中Kafka使用了Sendfile,RocketMQ使用了mmap+write方式

因此kafka的性能要比RocketMQ高一些

为什么RocketMQ不使用sendfile呢?

因为sendfile返回值是一个count

而mmap可以返回数据映射,可以方便当出现消息无法消费时,放入死信队列,或者其他操作

Kafka

简单说明和部署文档

为什么kafka要弃用zookeeper

抖音视频:为什么kafka要弃用zookeeper

为什么kafka要弃用zookeeper

  1. Zookeeper需要单独部署,维护麻烦
  2. Kafka只用到Zookeeper部分功能,因此过于臃肿
  3. 关键原因,Kafka遇到性能瓶颈,使用Zookeeper的进行元数据管理,在数据写入,以及集群管理上导致性能相对较差。

RocketMQ和Kafka

共同点和不同点

架构基本相同:

  • 都有Broker节点
  • 都有Topic主题,Queue和Prtitions(本质都是队列)
  • 都使用中间层进行Broker集群关联,RockerMQ使用自研的NameServer,Kafka使用Zookeeper(最新已弃用Zookeeper,使用Kraft一致性协议)

不同点:

  • Broker的协调方式不同,Kraft一致性协议具有去中心化的特点,更加简洁轻量,部署更方便。而RocketMQ使用自研的NameServer进行协调。
  • 底层的拷贝方式不同,Kafka使用sendfile进行零拷贝,相比RocketMQ使用的mmap+write的方式,效率更高。但是RocketMQ正式基于mmap内存映射的方式,能够提高对消息的操作性,因此提供了更多可用功能,例如死信队列。
  • 持久化方式不同,kafka使用segment的方式对每个队列进行持久化,这样如果队列很多,会有随机写的问题。而RocketMQ利用CommitLog的方式,通过顺序写,提高持久化性能。

怎么选择(各自的使用场景和优缺点)

维护频率:

RabbitMQ相对于Kafka和RocketMQ更新频率更慢,因此在后续bug的修复以及维护上后两者更具有优势。

可维护性:

RabbitMQ使用Erlang进行开发,维护更麻烦。而Kafka和RocketMQ都是基于Java开发(kafka底层是scala开发),由于Java市场更广因此更易于维护。

分布式架构:

RabbitMQ只有主从架构,中间有一个Exchang交换器,架构更简单,性能和可用性都不错。对于小公司来说,更适合。

Kafka和RocketMQ都是分布式架构,针对的都是高性能,高可用,因此如果没有那么大的流量需求会有大材小用的感觉,也会增加维护成本。

性能上:

RabbitMQ是万/秒级别的吞吐量,kafka和RocketMQ是10万/秒级别,并且由于kafka使用sendfile技术,甚至能达到17万/秒,因此可以根据自己的业务情况进行选择。

做技术就是一种权衡,简单易维护的性能可能欠佳,性能高的可能较难维护,要量力而行,不要一味追求某一种。

功能上:

RabbitMQ Kafka RocketMQ
消息协议 AMQP, MQTT, STOMP, HTTP 自定义协议 (Kafka协议) 自定义协议 (RocketMQ协议)
消息持久化 支持(消息和队列) 支持(默认持久化日志存储) 支持(存储在磁盘上的主题日志)
消息确认机制 支持(同步/异步确认) 支持(消费者确认消息) 支持(消费消息确认机制)
消息路由 支持多种路由模式(Direct, Fanout, Topic, Headers) 基于消息键(分区) 支持标签(Tag)与消息队列的绑定
高可用性和容错 集群模式、镜像队列 多副本机制(分区副本) 主从模式,支持跨地域部署
负载均衡 支持(基于消费者的公平分发) 基于分区的负载均衡 基于消息队列分配的负载均衡
延迟 支持(但通常较高,适用于低延迟场景) 较低的消息传递延迟 较低的消息传递延迟
消息顺序 按队列保证顺序(但不跨队列) 保证分区内的顺序 保证队列内的顺序
吞吐量 较低(适合较小规模应用) 非常高(适用于大规模数据流) 高吞吐量(适合大规模分布式系统)
消息过滤 支持(基于路由键和主题) 不支持(消息只能基于分区进行选择) 支持(基于Tag和MessageKey过滤)
集群与分布式支持 支持(集群和高可用队列) 支持(跨数据中心的分布式集群) 支持(多集群支持,跨地域高可用)
消息重复消费 支持(通过消息确认机制) 支持(但不能完全避免,取决于消费端实现) 支持(可以配置消息消费的重复策略)
消息过期与死信队列(DLQ) 支持(死信队列功能) 支持(过期消息可删除,但不支持死信队列 支持(有死信队列功能)
流量控制与背压 支持(基于消息队列的长度) 支持(基于内存/磁盘容量的压力控制) 支持(基于生产者和消费者的流控)
监控和管理工具 提供管理控制台、API、Prometheus 集成 提供JMX、Prometheus、Kafka Manager 等工具 提供管理控制台、Prometheus 集成
客户端支持 支持多语言客户端(Java, Python, Ruby, Go等) 支持多语言客户端(Java, Python, Go等) 支持多语言客户端(Java, Python, Go等)
集成与扩展 插件支持(如延迟消息、Shovel、Federation等) 支持多种消费者和生产者接口 支持多种消费者和生产者接口
主要用途 实时消息处理,任务队列,分布式系统中的通信 大数据流处理,日志收集,数据传输 分布式消息传递,金融、电商系统中的消息通信

Docker部署RocketMQ

保姆级教程

Docker部署Kafka

使用kraft部署kafka

使用zookeeper部署kafka