文章目录
前言
在分布式系统中,协调服务是构建高可用架构的基石。经过前九篇对Zookeeper基础原理、应用场景和API的深入探讨,我们终于迎来核心源码解析的关键篇章。本文将深入Zookeeper最核心的运行时脉络,揭开服务启动、请求处理、网络通信和一致性协议四大核心模块的实现奥秘。
一、服务端启动流程
启动流程图:
核心源码解析:
1.1 启动入口类:QuorumPeerMain
public class QuorumPeerMain {
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
// 解析命令行参数(通常是zoo.cfg路径)
main.initializeAndRun(args);
} catch (Exception e) {
LOG.error("Unexpected exception during startup", e);
System.exit(2);
}
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException {
// 1. 解析配置文件
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); // 解析zoo.cfg文件
}
// 2. 启动数据清理守护线程
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(), // 保留的快照数量
config.getPurgeInterval() // 清理间隔(小时)
);
purgeMgr.start();
// 3. 判断启动模式
if (config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
// 单机模式启动(省略)
}
}
}
1.2 集群模式启动核心:runFromConfig
public void runFromConfig(QuorumPeerConfig config) throws IOException {
// === 1. 初始化网络通信层 ===
ServerCnxnFactory cnxnFactory = null;
if (config.getClientPortAddress() != null) {
// 使用反射创建通信工厂(默认NIOServerCnxnFactory)
cnxnFactory = ServerCnxnFactory.createFactory();
// 配置端口和最大连接数(核心方法)
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
}
// === 2. 初始化数据存储 ===
// 创建事务日志和快照文件管理器
FileTxnSnapLog txnLog = new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())
);
// === 3. 创建QuorumPeer实例(核心线程) ===
QuorumPeer quorumPeer = new QuorumPeer();
// 3.1 基础配置注入
quorumPeer.setTxnFactory(txnLog); // 事务日志管理器
quorumPeer.setQuorumPeers(config.getServers()); // 集群节点列表
quorumPeer.setElectionType(config.getElectionAlg()); // 选举算法
quorumPeer.setMyid(config.getServerId()); // 当前节点ID
quorumPeer.setTickTime(config.getTickTime()); // 心跳间隔(ms)
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
// 3.2 配置网络层
if (cnxnFactory != null) {
quorumPeer.setServerCnxnFactory(cnxnFactory);
}
// 3.3 配置数据存储
quorumPeer.setZKDatabase(new ZKDatabase(txnLog));
// 3.4 恢复数据
quorumPeer.setLastLoggedZxid(txnLog.restore(quorumPeer.zkDb, quorumPeer));
// === 4. 启动QuorumPeer线程 ===
quorumPeer.start(); // 启动线程(进入run()方法)
}
1.3 QuorumPeer线程核心逻辑:run()
public void run() {
while (running) {
switch (getPeerState()) {
case LOOKING: // 选举状态
try {
// 1. 执行Leader选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception during election", e);
// 异常处理...
}
break;
case FOLLOWING: // Follower状态
try {
// 2. 启动Follower服务
follower = new Follower(this, new FollowerZooKeeperServer(...));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception in follower", e);
} finally {
follower.shutdown();
}
break;
case LEADING: // Leader状态
try {
// 3. 启动Leader服务
leader = new Leader(this, new LeaderZooKeeperServer(...));
leader.lead();
} catch (Exception e) {
LOG.warn("Unexpected exception in leader", e);
} finally {
leader.shutdown("Unexpected exception");
}
}
}
}
1.4 关键子流程:数据恢复
// FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions) {
// 1. 从快照恢复
long deserializeResult = snapLog.deserialize(dt, sessions);
// 2. 从事务日志恢复
FileTxnLog txnLog = new FileTxnLog(dataDir);
long highestZxid = fastForwardFromEdits(dt, sessions);
// 返回最大的ZXID
return highestZxid;
}
// 快照恢复核心方法
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// 找到最新的快照文件
File snapShot = findMostRecentSnapshot();
if (snapShot == null) {
return -1L; // 无快照
}
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snapShot))) {
// 反序列化快照
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
deserialize(dt, sessions, ia); // 将快照加载到DataTree
return dt.lastProcessedZxid; // 返回快照对应的ZXID
}
}
1.5 关键设计要点
分层初始化架构:
数据恢复策略:
- 先加载最新快照(snapshot.xxx文件)
- 再重放快照之后的所有事务日志(log.xxx文件)
- 使用CRC32校验数据完整性
状态机设计:
- LOOKING:选举状态,执行FastLeaderElection
- FOLLOWING:启动Follower服务,连接Leader
- LEADING:启动Leader服务,维护集群
资源清理机制:
- DatadirCleanupManager:定期清理旧快照和日志
- 按保留策略(默认3个快照)自动删除历史文件
启动流程中的关键对象
对象名 | 作用描述 | 生命周期 |
---|---|---|
QuorumPeer | 集群节点主线程 | 整个运行期间 |
ServerCnxnFactory | 网络通信服务 | 整个运行期间 |
FileTxnSnapLog | 事务日志和快照管理 | 整个运行期间 |
ZKDatabase | 内存数据库(DataTree) | 整个运行期间 |
Follower/Leader | 角色特定行为实现 | 状态持续期间 |
二、请求处理链(责任链模式)
2.1 Leader服务器处理链
// LeaderZooKeeperServer.java
protected void setupRequestProcessors() {
// 创建最终处理器(实际执行操作)
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 创建待应用处理器(记录待提交提案)
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader());
// 创建提交处理器(保证请求顺序性)
CommitProcessor commitProcessor = new CommitProcessor(
toBeAppliedProcessor,
"CommitProcessor",
getZooKeeperServer().isMatchSyncs()
);
// 创建提案处理器(广播提案)
RequestProcessor proposalProcessor = new ProposalRequestProcessor(
this,
commitProcessor
);
// 创建准备处理器(请求预处理)
PrepRequestProcessor prepProcessor = new PrepRequestProcessor(
this,
proposalProcessor
);
// 构建完整处理链
firstProcessor = new LeaderRequestProcessor(this, prepProcessor);
// 启动所有处理器线程
startProcessors(new RequestProcessor[] {
prepProcessor,
proposalProcessor,
commitProcessor,
finalProcessor
});
}
2.2 Follower服务器处理链
// FollowerZooKeeperServer.java
protected void setupRequestProcessors() {
// 创建最终处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 创建提交处理器
commitProcessor = new CommitProcessor(
finalProcessor,
"CommitProcessor",
true
);
// 创建同步处理器(持久化事务日志)
syncProcessor = new SyncRequestProcessor(
this,
new SendAckRequestProcessor(getFollower())
);
// 构建处理链
firstProcessor = new FollowerRequestProcessor(this, syncProcessor);
// 启动处理器线程
startProcessors(new RequestProcessor[] {
firstProcessor,
syncProcessor,
commitProcessor
});
}
2.3 核心处理器
- PrepRequestProcessor:请求预处理
public void run() {
try {
while (true) {
// 1. 从队列获取请求
Request request = submittedRequests.take();
// 2. 预处理请求(核心方法)
pRequest(request);
}
} catch (Exception e) {
handleException(this, e);
}
}
protected void pRequest(Request request) throws RequestProcessorException {
// 请求类型检查(1-21为合法操作码)
if (request.type < 0 || request.type > OpCode.maxOp) {
throw new RequestProcessorException("Invalid op type");
}
try {
// 3. 反序列化请求
ByteBufferInputStream bbis = new ByteBufferInputStream(request.request);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis);
Record record = null;
// 4. 根据操作类型反序列化不同请求
switch (request.type) {
case OpCode.create:
record = new CreateRequest();
break;
case OpCode.delete:
record = new DeleteRequest();
break;
// 其他操作类型处理...
}
record.deserialize(bia, "request");
// 5. 权限检查
if (request.authInfo != null) {
checkACL(request, record);
}
// 6. 生成事务头
request.hdr = new TxnHeader(
request.sessionId,
request.cxid,
zks.getZKDatabase().getNextZxid(), // 分配全局唯一ZXID
Time.currentWallTime(),
request.type
);
// 7. 传递到下一处理器
nextProcessor.processRequest(request);
} catch (Exception e) {
// 异常处理...
}
}
- SyncRequestProcessor:事务持久化
public void run() {
try {
int logCount = 0;
while (true) {
Request request = queuedRequests.take();
// 1. 持久化到事务日志
if (request != null) {
// 写事务日志
zks.getZKDatabase().append(request);
// 写快照(按阈值触发)
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
zks.takeSnapshot();
logCount = 0;
}
}
// 2. 传递给下一处理器
if (nextProcessor != null) {
nextProcessor.processRequest(request);
}
}
} catch (Exception e) {
// 异常处理...
}
}
- ProposalRequestProcessor:提案广播(仅Leader)
public void processRequest(Request request) {
// 1. 读请求直接传递
if (!Request.isValid(request.type)) {
nextProcessor.processRequest(request);
return;
}
// 2. 创建提案对象
Proposal p = new Proposal();
p.packet = new QuorumPacket();
p.request = request;
// 3. 将提案加入待发送队列
synchronized (leader) {
leader.addProposal(p);
}
// 4. 传递给下一处理器
nextProcessor.processRequest(request);
}
// Leader.addProposal实现
public void addProposal(Proposal p) {
synchronized (toBeProposed) {
// 添加到待提案队列
toBeProposed.add(p);
// 唤醒发送线程
toBeProposed.notifyAll();
}
}
- CommitProcessor:提交调度器
public void run() {
try {
Request nextPending = null;
while (true) {
// 1. 检查是否有新请求
if (nextPending == null) {
nextPending = queuedRequests.take();
}
// 2. 处理提交请求
if (nextPending.type == OpCode.commit) {
// 按ZXID顺序提交
commit(nextPending.zxid);
nextPending = null;
}
// 3. 处理本地读请求
else if (nextPending.type == OpCode.getData) {
nextProcessor.processRequest(nextPending);
nextPending = null;
}
// 4. 写请求放入等待队列
else {
synchronized (queuedWriteRequests) {
queuedWriteRequests.add(nextPending);
nextPending = null;
}
}
// 5. 检查可提交的写请求
while (!queuedWriteRequests.isEmpty()) {
Request writeReq = queuedWriteRequests.peek();
// 如果该请求的ZXID已被提交
if (writeReq.zxid <= lastCommitted) {
queuedWriteRequests.poll();
nextProcessor.processRequest(writeReq);
} else {
break;
}
}
}
} catch (Exception e) {
// 异常处理...
}
}
- FinalRequestProcessor:最终执行
public void processRequest(Request request) {
// 1. 会话有效性检查
if (request.sessionId != 0) {
Session session = zks.sessionTracker.getSession(request.sessionId);
if (session == null) {
return; // 会话已过期
}
}
try {
// 2. 执行请求操作
switch (request.type) {
case OpCode.create:
processCreate(request);
break;
case OpCode.delete:
processDelete(request);
break;
case OpCode.getData:
processGetData(request);
break;
// 其他操作类型处理...
}
} catch (Exception e) {
// 异常处理...
}
// 3. 发送响应
if (request.cnxn != null) {
request.cnxn.sendResponse(hdr, rsp, "response");
}
}
private void processCreate(Request request) {
CreateRequest createReq = (CreateRequest)request.request;
// 在DataTree中创建节点
rsp = zks.getZKDatabase().createNode(
createReq.getPath(),
createReq.getData(),
createReq.getAcl(),
createReq.getFlags(),
request.hdr.getZxid()
);
}
处理链工作流程图:
处理器功能对比表:
处理器 | 所属角色 | 核心职责 | 关键数据结构 |
---|---|---|---|
PrepRequestProcessor | Leader/Follower | 请求反序列化/ACL检查 | RequestQueue |
SyncRequestProcessor | Leader/Follower | 事务日志持久化 | TransactionLog |
ProposalRequestProcessor | 仅Leader | 提案广播 | ProposalQueue |
CommitProcessor | Leader/Follower | 请求提交调度 | QueuedWriteRequests |
FinalRequestProcessor | Leader/Follower | 内存数据库操作 | DataTree/ZKDatabase |
典型问题排查:
- 请求卡住:
- 检查CommitProcessor是否堆积大量请求
- 确认集群是否达到多数派(网络分区?)
- ACL权限拒绝:
- PrepRequestProcessor中checkACL()抛出异常
- 检查客户端认证信息
- 事务日志写入失败:
- SyncRequestProcessor捕获IO异常
- 检查磁盘空间和权限
- 提案丢失:
- ProposalRequestProcessor未成功加入提案队列
- 检查Leader选举状态
三、网络通信层(NIOServerCnxnFactory为例)
3.1 核心类结构与初始化
- 服务启动入口:NIOServerCnxnFactory
public class NIOServerCnxnFactory extends ServerCnxnFactory {
// 核心组件
private SelectorThread selectorThread; // 主选择器线程
private AcceptThread acceptThread; // 接收连接线程
private final ConnectionExpirer expirer; // 连接过期管理器
// 配置参数
private int maxClientCnxns = 60; // 最大连接数
private int sessionlessCnxnTimeout; // 无会话连接超时
// 初始化方法
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
// 1. 初始化接收线程
acceptThread = new AcceptThread(
serverSock = ServerSocketChannel.open(),
addr,
selectorThread.getSelector()
);
// 2. 配置端口参数
serverSock.socket().setReuseAddress(true);
serverSock.socket().bind(addr);
serverSock.configureBlocking(false);
// 3. 启动线程
acceptThread.start();
selectorThread.start();
}
}
3.2 核心处理流程源码解析
- 连接接收线程:AcceptThread
class AcceptThread extends Thread {
public void run() {
while (!stopped) {
try {
// 1. 等待新连接
SocketChannel sc = serverSock.accept();
if (sc != null) {
// 2. 配置连接参数
sc.configureBlocking(false);
sc.socket().setTcpNoDelay(true);
// 3. 创建连接对象
NIOServerCnxn cnxn = createConnection(sc);
// 4. 注册到选择器
selectorThread.addCnxn(cnxn);
}
} catch (IOException e) {
LOG.warn("AcceptThread exception", e);
}
}
}
private NIOServerCnxn createConnection(SocketChannel sock) {
// 初始化连接对象
return new NIOServerCnxn(
NIOServerCnxnFactory.this,
sock,
selectorThread.getSelector(),
selectorThread.getNextWorker()
);
}
}
- 选择器线程:SelectorThread
class SelectorThread extends Thread {
private final Selector selector;
private final Set<NIOServerCnxn> cnxns = new HashSet<>();
private final WorkerService workerPool; // I/O工作线程池
public void run() {
while (!stopped) {
try {
// 1. 选择就绪事件
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
// 2. 处理所有就绪事件
for (SelectionKey k : selected) {
if (k.isReadable() || k.isWritable()) {
// 3. 获取连接对象
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
// 4. 提交给IOWorker处理
c.getWorker().schedule(c);
}
}
selected.clear();
} catch (Exception e) {
LOG.warn("SelectorThread error", e);
}
}
}
// 添加新连接
void addCnxn(NIOServerCnxn cnxn) {
synchronized (cnxns) {
// 1. 检查连接数限制
if (cnxns.size() >= maxClientCnxns) {
cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_REJECTED);
return;
}
// 2. 注册读事件
cnxn.register(selector);
cnxns.add(cnxn);
}
}
}
- I/O工作线程:IOWorkRequest
class IOWorkRequest extends WorkerService.WorkRequest {
private final NIOServerCnxn cnxn;
public void doWork() throws InterruptedException {
// 1. 处理读事件
if (cnxn.sockKey.isReadable()) {
// 从通道读取数据
int rc = cnxn.sock.read(cnxn.recvBuffer);
if (rc > 0) {
// 反序列化请求
cnxn.recvBuffer.flip();
processRequest(cnxn.recvBuffer);
} else if (rc < 0) {
// 连接关闭
cnxn.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED);
}
}
// 2. 处理写事件
if (cnxn.sockKey.isWritable()) {
// 获取待发送响应
ByteBuffer bb = cnxn.outgoingQueue.poll();
if (bb != null) {
// 写入通道
cnxn.sock.write(bb);
// 如果队列还有数据,保持写事件注册
if (!cnxn.outgoingQueue.isEmpty()) {
cnxn.enableWrite();
}
}
}
}
private void processRequest(ByteBuffer buffer) {
try {
// 1. 反序列化请求头
BinaryInputArchive bia = BinaryInputArchive.getArchive(
new ByteBufferInputStream(buffer)
);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// 2. 创建请求对象
Request req = new Request(
cnxn,
h.getSessionId(),
h.getXid(),
h.getType(),
buffer,
cnxn.getAuthInfo()
);
// 3. 提交给处理链
cnxn.zkServer.processRequest(req);
} catch (Exception e) {
LOG.error("Request processing error", e);
}
}
}
- 连接对象:NIOServerCnxn
class NIOServerCnxn extends ServerCnxn {
final SocketChannel sock; // 底层Socket通道
final SelectionKey sockKey; // 选择键
final IOWorker worker; // 分配的I/O工作线程
// 缓冲区管理
ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);
final Queue<ByteBuffer> outgoingQueue = new ConcurrentLinkedQueue<>();
// 注册选择器
void register(Selector selector) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_READ, this);
}
// 发送响应
public void sendResponse(ReplyHeader h, Record r, String tag) {
// 1. 序列化响应
ByteBuffer bb = serializeResponse(h, r, tag);
// 2. 加入发送队列
outgoingQueue.add(bb);
// 3. 注册写事件
enableWrite();
}
private void enableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) == 0) {
sockKey.interestOps(i | SelectionKey.OP_WRITE);
}
}
// 关闭连接
public void close(DisconnectReason reason) {
try {
// 1. 取消选择键
if (sockKey != null) sockKey.cancel();
// 2. 关闭通道
sock.close();
// 3. 清理会话
zkServer.removeCnxn(this);
} catch (IOException e) {
LOG.debug("Error closing connection", e);
}
}
}
核心流程时序图:
3.3 性能优化技术
- I/O工作线程池
workerPool = new WorkerService(
"NIOWorker",
numWorkerThreads, // 默认2*CPU核心数
true // 守护线程
);
避免Selector线程被阻塞。
并行处理多个连接的I/O。
- 智能事件注册:减少不必要的Selector唤醒
// 只在有数据要写时注册写事件
void enableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) == 0) {
sockKey.interestOps(i | SelectionKey.OP_WRITE);
}
}
- 缓冲区复用
// 接收缓冲区复用
if (!recvBuffer.hasRemaining()) {
recvBuffer = ByteBuffer.allocateDirect(
recvBuffer.capacity() * 2);
}
动态扩容避免频繁分配。
大连接使用大缓冲区。
- 批量响应发送:单次系统调用发送多个响应包
void doWrite() {
int batchSize = 10;
while (batchSize-- > 0 && !outgoingQueue.isEmpty()) {
ByteBuffer bb = outgoingQueue.poll();
sock.write(bb);
}
}
关键参数调优:
参数名 | 默认值 | 作用 | 调优建议 |
---|---|---|---|
maxClientCnxns | 60 | 单IP最大连接数 | 根据客户端类型调整 |
clientPortAddress | 0.0.0.0:2181 | 监听地址 | 生产环境绑定内网IP |
nioWorkerThreads | 2 * CPU核心 | I/O工作线程数 | 高并发场景增加 |
sessionlessCnxnTimeout | 10000ms | 无会话连接超时 | 防止恶意连接 |
maxResponseCacheSize | 400 | 响应缓存大小 | 根据内存调整 |
四、Leader选举(FastLeaderElection)
算法核心:ZAB协议的选举阶段
选举流程:
- 自增epoch(logicalclock++)
- 初始化投票:vote = (myid, zxid, epoch)
- 广播NOTIFICATION消息
- 接收投票并统计:
// FastLeaderElection#totalOrderPredicate()
if (new_zxid > current_zxid) return true; // 优先选zxid大的
if (new_zxid == current_zxid && new_id > current_id) return true; // zxid相同时选serverId大的
- 超过半数支持则成为Leader
节点状态转换:
// QuorumPeer#run()
switch (getPeerState()) {
case LOOKING:
leaderElector.lookForLeader(); // 选举中
case FOLLOWING:
follower.followLeader(); // 跟随状态
case LEADING:
leader.lead(); // 领导状态
}
五、Zab协议实现
Zab协议流程图解:
5.1 主要流程源码
- 协议状态机:QuorumPeer
public void run() {
while (running) {
switch (getPeerState()) {
case LOOKING: // 选举阶段
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case FOLLOWING: // Follower状态
Follower follower = new Follower(this, ...);
follower.followLeader(); // 包含Discovery和Sync阶段
break;
case LEADING: // Leader状态
Leader leader = new Leader(this, ...);
leader.lead(); // 包含Broadcast阶段
break;
}
}
}
- 发现阶段(Discovery)- Follower实现
// Follower.java
void followLeader() throws InterruptedException {
// 1. 连接Leader
connectToLeader(leaderAddr);
// 2. 发送FOLLOWERINFO
QuorumPacket fInfoPacket = new QuorumPacket(Leader.FOLLOWERINFO, ...);
writePacket(fInfoPacket, true);
// 3. 接收Leader的LeaderInfo
QuorumPacket lInfoPacket = readPacket();
if (lInfoPacket.getType() != Leader.LEADERINFO) {
throw new IOException("First packet should be LEADERINFO");
}
// 4. 解析epoch
long newEpoch = lInfoPacket.getEpoch();
if (newEpoch < self.getAcceptedEpoch()) {
throw new IOException("Epoch less than accepted epoch");
}
// 5. 发送ACKEPOCH
QuorumPacket ackEpochPacket = new QuorumPacket(Leader.ACKEPOCH, ...);
writePacket(ackEpochPacket, true);
// 6. 进入同步阶段
syncWithLeader(newEpoch);
}
- 同步阶段(Synchronization)
// Follower.java
protected void syncWithLeader(long newEpoch) throws Exception {
// 1. 接收Leader的NEWLEADER包
QuorumPacket newLeaderPacket = readPacket();
if (newLeaderPacket.getType() != Leader.NEWLEADER) {
throw new IOException("First packet should be NEWLEADER");
}
// 2. 检查是否需要同步
if (self.getLastLoggedZxid() != leaderLastZxid) {
// 3. 执行数据同步
boolean needSnap = syncStrategy.determineSyncMethod();
if (needSnap) {
// 全量快照同步
syncWithSnapshot(leader);
} else {
// 增量事务日志同步
syncWithLogs(leader);
}
}
// 4. 发送ACK给Leader
writePacket(new QuorumPacket(Leader.ACK, ...), true);
// 5. 等待Leader的UPTODATE包
QuorumPacket uptodatePacket = readPacket();
if (uptodatePacket.getType() != Leader.UPTODATE) {
throw new IOException("Did not receive UPTODATE packet");
}
// 6. 进入广播阶段
startFollowerThreads();
}
- 广播阶段(Broadcast)- Leader实现
// Leader.java
void lead() throws IOException, InterruptedException {
// 1. 启动ZK服务
startZkServer();
// 2. 等待Follower连接
waitForEpochAck(self.getId(), leaderStateSummary);
// 3. 发送NEWLEADER包
sendNewLeader();
// 4. 等待多数Follower的ACK
waitForNewLeaderAck(self.getId());
// 5. 发送UPTODATE包
sendUptodate();
// 6. 进入广播循环
while (running) {
// 7. 从队列获取提案
Proposal p = pendingProposals.take();
// 8. 广播提案
broadcastProposal(p);
// 9. 等待ACK
waitForAckQuorum(p);
// 10. 提交提案
commit(p);
}
}
// 广播提案方法
private void broadcastProposal(Proposal p) {
// 构造提案包
QuorumPacket proposal = new QuorumPacket(
Leader.PROPOSAL,
p.request.zxid,
p.request.serialize(),
null
);
// 发送给所有Follower
for (LearnerHandler f : followers) {
f.queuePacket(proposal);
}
// 本地记录
outstandingProposals.put(p.request.zxid, p);
}
- 提案提交与ACK处理
// Leader.java
private void waitForAckQuorum(Proposal p) {
synchronized (p) {
while (!p.hasAllQuorums()) {
// 等待ACK
p.wait(rpcTimeout);
}
}
}
// ACK处理
public void processAck(long sid, long zxid, SocketAddress followerAddr) {
// 1. 获取对应提案
Proposal p = outstandingProposals.get(zxid);
if (p == null) return;
// 2. 添加ACK
p.ackSet.add(sid);
// 3. 检查是否达到多数
if (isQuorumSynced(p.ackSet)) {
synchronized (p) {
// 4. 满足条件则唤醒等待线程
p.notifyAll();
}
}
}
// 提交提案
private void commit(Proposal p) {
// 1. 创建提交包
QuorumPacket commitPacket = new QuorumPacket(
Leader.COMMIT,
p.request.zxid,
null,
null
);
// 2. 广播COMMIT
for (LearnerHandler f : followers) {
f.queuePacket(commitPacket);
}
// 3. 本地提交
commitProcessor.commit(p.request);
// 4. 从未完成提案中移除
outstandingProposals.remove(p.request.zxid);
}
- 崩溃恢复实现
// Leader.java
protected void recovery() {
// 1. 获取最大ZXID
long maxCommittedLog = getMaxCommittedLog();
// 2. 获取未提交提案列表
List<Proposal> outstanding = getOutstandingProposals();
// 3. 重建提案状态
for (Proposal p : outstanding) {
// 4. 检查提案是否在多数派中持久化
if (isCommittedInQuorum(p)) {
// 重新提交
commit(p);
} else {
// 丢弃提案
outstandingProposals.remove(p.request.zxid);
}
}
// 5. 重新建立与Follower的连接
waitForEpochAck(self.getId(), leaderStateSummary);
}
5.2 关键数据结构
- 提案对象(Proposal)
class Proposal {
long zxid; // 事务ID
Request request; // 原始请求
Set<Long> ackSet = new HashSet<>(); // ACK集合
boolean committed = false; // 提交状态
// 检查是否达到多数
boolean hasAllQuorums() {
return ackSet.size() >= getQuorumSize();
}
}
- Leader状态跟踪
class Leader {
// 未完成提案表
ConcurrentHashMap<Long, Proposal> outstandingProposals =
new ConcurrentHashMap<>();
// 已提交提案表
ConcurrentSkipListSet<Long> committedLog = new ConcurrentSkipListSet<>();
// Follower列表
List<LearnerHandler> followers = Collections.synchronizedList(new ArrayList<>());
}
5.3 Zab协议特性实现
- 全序性保证
// 为每个提案分配全局唯一ZXID
public long getNextZxid() {
// 高32位是epoch,低32位是计数器
return (epoch << 32) | (counter++);
}
- 可靠性保证
// 等待多数ACK
while (!p.hasAllQuorums()) {
p.wait(timeout);
}
Zab协议通过精心设计的四个阶段(选举、发现、同步、广播)实现了分布式系统的强一致性,其源码实现展示了以下核心思想:
- 状态机驱动:通过明确的状态转换管理协议流程
- 多数派原则:所有关键操作需获得多数节点确认
- 幂等设计:提案处理可安全重试
- 顺序保障:ZXID全局排序确保操作有序性
- 增量恢复:优先使用事务日志同步,减少全量传输
总结
通过对Zookeeper五大核心模块的源码级剖析,我们揭开了这个分布式协调服务的神秘面纱:
核心设计哲学总结
- 分层架构
从QuorumPeerMain启动入口到FinalRequestProcessor的请求终结,Zookeeper通过清晰的层级划分(网络层→处理链→存储层→协议层)实现了复杂功能的优雅解耦。 - 状态机驱动范式
通过LOOKING→FOLLOWING→LEADING三态转换,将分布式系统最复杂的共识问题转化为确定性的状态迁移,源码中QuorumPeer.run()的状态机实现堪称经典。 - 流水线性能优化
请求处理链的责任链模式(如Prep→Sync→Proposal→Commit的分段处理)与网络层的SelectorThread→IOWorker协作机制,共同构建了高吞吐量的处理流水线。
分布式共识的精髓实现
- Zab协议的四步流程:选举(Election)→发现(Discovery)→同步(Sync)→广播(Broadcast)的精密协作,在Leader.lead()和Follower.followLeader()中得以完美呈现。
- 崩溃恢复的智慧:通过epoch+ZXID的全局唯一标识(getNextZxid()实现)和提案重放机制,解决了分布式系统最棘手的脑裂问题。
- 数据一致性保障:CommitProcessor的顺序提交控制与outstandingProposals的多数派确认机制,共同守护了状态机的线性一致性。
源码阅读的价值
当我们在3万行源码中追踪一个create /node请求的完整生命周期:
- 从NIOServerCnxn的字节反序列化开始
- 穿越PrepRequestProcessor的ACL检查
- 经历SyncRequestProcessor的磁盘持久化
- 通过Zab协议的提案广播
- 最终在DataTree落地生根
这种全景式跟踪带来的认知深度,远超过任何理论描述。
本篇虽已深入核心流程,但Zookeeper的精华远不止此:会话管理的神秘时间轮、Watch机制的跨节点传播、动态配置的切换… 这些留给读者探索的宝藏,正是分布式领域永不枯竭的技术魅力。