目录
一:消息队列
1:什么是消息队列?
消息队列(Message Queue)是一种用于在应用程序或系统组件之间传递消息的通信机制。它通过将消息存储在队列中,实现发送者(生产者)和接收者(消费者)之间的解耦,使得生产者和消费者可以独立运行,无需直接交互。
2:消息队列的特征
(1)存储
与依赖于使用套接字的基本 TCP 和 UDP 协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止。
(2)异步
与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中实现一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息进行处理。这样,应用程序就可以在某些故障情况下运行,例如连接断断续续或源进程或目标进程故障。
路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式。
3:为什么需要消息队列
消息队列的主要作用包括:
解耦:将生产者和消费者分离,降低系统组件间的直接依赖,提高灵活性和可维护性。
异步处理:允许非实时处理任务,提高系统的响应速度和吞吐量。
削峰填谷:在流量高峰时缓冲消息,避免系统过载,平滑处理压力。
可靠性:通过持久化和重试机制,确保消息不丢失,提高系统的可靠性。
扩展性:支持分布式部署,便于水平扩展和负载均衡。
二:Kafka基础与入门
1:Kafka基本概念
分布式流处理平台:高吞吐、低延迟、可水平扩展。
核心功能:
消息系统(发布/订阅模式)。
存储系统(数据持久化)。
流处理(实时处理数据流)。
设计目标:高容错、高并发、支持海量数据。
2:Kafka相关术语
Broker:Kafka服务器节点,组成集群。
Topic:消息的逻辑分类,生产者与消费者交互的主题。
Partition:Topic的物理分片,每个分片有序但全局无序。
Replica:分区的副本,保证高可用(Leader处理读写,Follower同步数据)。
Producer:向Topic发布消息的客户端。
Consumer:从Topic订阅消息的客户端。
Consumer Group:一组消费者协同消费,共享分区(组内竞争,组间独立)。
Offset:消息在分区内的唯一标识(消费者通过offset记录消费位置)。
3:Kafka拓扑架构
生产者 → Broker集群 → 消费者:
生产者将消息发送至指定Topic的Partition。
Broker集群管理Topic和Partition的存储与复制。
消费者按组订阅Topic,从Partition拉取消息。
ZooKeeper作用(旧版):
管理Broker元数据(新版Kafka逐步移除ZooKeeper依赖)。
选举Leader副本。
监控Broker存活状态。
4:Topic与partition
Topic:
逻辑单元,支持多生产者/消费者。
通过配置指定分区数和副本数。
Partition:
提升并行度和吞吐量(每个分区独立处理)。
分区策略:
轮询(Round-robin)、键哈希(Key-hash,相同Key到同一分区)、自定义。
分区与消费:一个分区只能被同组内的一个消费者消费。
5:producer生产机制
Producer 是消息和数据的生产者,它发送消息到 broker 时,会根据 Partition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置的合理,所有消息都可以均匀分布到不同的 Partition 里,这样就实现了数据的负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。
6:consumer消费机制
Kafka 发布消息通常有两种模式:队列模式(queuing)和发布 / 订阅模式(publish - subscribe)。在队列模式下,只有一个消费组,而这个消费组有多个消费者,一条消息只能被这个消费组中的一个消费者所消费;而在发布 / 订阅模式下,可有多个消费组,每个消费组只有一个消费者,同一条消息可被多个消费组消费。
Kafka 中的 Producer 和 consumer 采用的是 push、pull 的模式,即 producer 向 broker 进行 push 消息,consumer 从 bork 进行 pull 消息,push 和 pull 对于消息的生产和消费是异步进行的。pull 模式的一个好处是 consumer 可自主控制消费消息的速率,同时 consumer 还可以自己控制消费消息的方式是批量的从 broker 拉取数据还是逐条消费数据。
三:zookeeper概念介绍
1:zookeeper应用举例
(1)什么是的单点故障问题呢?
指系统中只有一个主节点(Master)负责关键任务,若该节点宕机,整个系统将瘫痪,导致服务不可用。
(2)传统的方式是怎么解决单点故障?以及有哪些缺点?
解决方式:
主备架构(Master-Backup):部署备用节点,主节点故障时手动切换。
硬件负载均衡:通过冗余硬件分散请求。
缺点:
手动切换延迟:备节点需人工介入,恢复时间长。
脑裂问题:可能出现多个主节点同时工作,导致数据不一致。
资源浪费:备节点平时闲置,利用率低。
2:zookeeper的工作原理是什么?
(1)master启动
集群启动时,多个节点通过ZAB协议(Zookeeper Atomic Broadcast)竞选Master。
获得超过半数投票的节点成为Master,其余为Follower。
(2)master故障
Master宕机后,Zookeeper检测到会话超时,触发重新选举。
剩余节点重新投票,选出新Master(如原Follower升级)。
(3)master恢复
原Master恢复后,自动降级为Follower,同步新Master的数据,确保集群一致性。
3:zookeeper集群架构
角色分工:
Leader:处理写请求,负责事务提案和协调。
Follower:处理读请求,参与选举和提案投票。
Observer(可选):仅处理读请求,不参与投票,用于扩展读性能。
节点数量:通常为奇数(如3/5台),避免选举时出现票数僵局。
4:zookeeper的工作流程
客户端连接:客户端连接到任意Zookeeper节点(Follower/Observer)。
请求处理:
读请求:直接由当前节点返回数据。
写请求:转发给Leader,通过两阶段提交(2PC)确保集群数据一致性。
数据同步:
Leader广播写操作,Follower确认后提交事务。
所有节点最终保持一致状态(最终一致性)。
会话管理:
客户端与Zookeeper维护Session,超时未心跳则释放临时节点(如Ephemeral Znodes)。
四:zookeeper在Kafka中的作用
1:broker注册
Broker 是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的 Broker 管理起来,此时就使用到了 Zookeeper。在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点:/brokers/ids
2:topic注册
在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护,由专门的节点来记录,如:/borkers/topics
3:生产者负载均衡
由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上,那么如何实现生产者的负载均衡,Kafka 支持传统的四层负载均衡,也支持 Zookeeper 方式实现负载均衡。
4:消费者负载均衡
与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。
5:记录消息分区与消费者的关系
消费组 (Consumer Group) 下有多个 Consumer(消费者)。对于每个消费者组 (Consumer Group),Kafka 都会为其分配一个全局唯一的 Group ID,Group 内部的所有消费者共享该 ID。订阅的 topic 的每个分区只能分配给某个 group 下的一个 consumer(当然该分区还可以被分配给其他 group)。
6:消息消费进度offest记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 Offset 记录到 Zookeeper 上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
7:消费者注册
(1)注册到消费者分组
(2)对消费者分组中的消费者的变化注册监听
(3)对 Broker 服务器变化注册监听
(4)进行消费者负载均衡
五:单节点部署Kafka
1:安装zookeeper
#上传安装包zookeeper..和Kafka
#部署环境
dnf -y install java
#解压安装包zookeeper..
#移动安装包zookeeper..
mv zookeeper.. /etc/zookeeper/
cd /etc/zookeeper/
cd conf/
cp zoo_sample.cfg zoo.cfg
#修改zookeeper配置文件,并启动zookeeper-服务
vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data
mkdir zookeeper-data
cd bin/
#启动zookeeper-服务
./zkServer.sh start
2:安装Kafka
#解压安装包kafka
#移动安装包kafka
mv kafka.. /etc/kafka/
cd /etc/kafka/
cd config/
#修改kafka配置文件,并启动kafka服务
vim server.properties
log.dirs=/etc/kafka/kafka-logs
mkdir kafka-logs
#启动kafka服务
nohup bin/kafka-server-start.sh config /server.propreties &
3:测试
#检查2个端口的开启状态
netstat -anpt | grep 2181
netstat -anpt | grep 9092
验证:
在Kafka的状态下,
修改脚本:sed -i 's/egrep/grep -E' bin/kafka-run-class.sh
cd bin/
创建topic:
kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic test
列出topic:
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
查看topic:
kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test
生产消息:
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
消费消息(打开另个终端,一边生产消息,一边查看消费消息):
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
删除topic:
kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test
六:集群部署Kafka
1:资源列表
操作系统 | IP | 主机名 | 应用 |
---|---|---|---|
OpenBuler24 | 192.168.10.101 | kafka1 | Zookeeper,kafka |
OpenBuler24 | 192.168.10.102 | kafka2 | Zookeeper,kafka |
OpenBuler24 | 192.168.10.103 | kafka3 | Zookeeper,kafka |
2:基础环境设置
#上传安装包zookeeper..和Kafka
vim /etc/hosts
192.168.10.201 Kafka1
192.168.10.202 Kafka2
192.168.10.203 Kafka3
修改主机名:
hostnamectl set-hostname Kafka1
#关闭防火墙
systemctl stop firewalld
setenforce 0
3:安装zookeeper
#部署环境
dnf -y install java
#解压安装包zookeeper..
#移动安装包zookeeper..
mv zookeeper.. /etc/zookeeper/
cd /etc/zookeeper/
cd conf/
cp zoo_sample.cfg zoo.cfg
#修改zookeeper配置文件
vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data
clientPort=2181
server.1=192.168.10.201:2888:3888
server.2=192.168.10.202:2888:3888
server.3=192.168.10.203:2888:3888
cd /etc/zookeeper/
mkdir zookeeper-data
#设置id标识
echo '1' > /etc/zookeeper/zookeeper-data/myid
echo '2' > /etc/zookeeper/zookeeper-data/myid
echo '3' > /etc/zookeeper/zookeeper-data/myid
cd bin/
#启动zookeeper-服务
./zkServer.sh start
4:安装Kafka
#解压安装包kafka
#移动安装包kafka
mv kafka.. /etc/kafka/
cd /etc/kafka/
cd config/
#修改kafka配置文件
vim server.properties
broker.id=1 ##21行,修改(其他2个节点也各自修改)
listeners=PLAINTEXT://192.168.10.201:9092
##31行,修改(其他2个节点也各自修改)
log.dirs=/etc/kafka/kafka-logs ##60行,修改
num.partitions=1 ##65行
zookeeper.connect=192.168.10.201:2181,192.168.10.202:2181,192.168.10.203:2181
##123行,(其他2个节点也进行相同修改)
mkdir kafka-logs
#启动kafka服务
nohup bin/kafka-server-start.sh config /server.propreties &
#检查2个端口的开启状态
netstat -anpt | grep 2181
netstat -anpt | grep 9092
5:测试
在Kafka的状态下,
修改脚本:sed -i 's/egrep/grep -E' bin/kafka-run-class.sh
cd bin/
创建topic:
./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
列出topic:
./kafka-topics.sh --list --zookeeper kafka1:2181
查看topic:
./kafka-topics.sh --describe --zookeeper kafka1:2181 --topic test
生产消息:
./kafka-console-producer.sh --broker-list kafka1:9092 --topic test
消费消息(打开另个终端,一边生产消息,一边查看消费消息):
./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
删除topic:
./kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test