Zookeeper学习专栏(十):核心流程剖析之服务启动、请求处理与选举协议

发布于:2025-07-27 ⋅ 阅读:(18) ⋅ 点赞:(0)


前言

在分布式系统中,协调服务是构建高可用架构的基石。经过前九篇对Zookeeper基础原理、应用场景和API的深入探讨,我们终于迎来核心源码解析的关键篇章。本文将深入Zookeeper最核心的运行时脉络,揭开服务启动、请求处理、网络通信和一致性协议四大核心模块的实现奥秘。


一、服务端启动流程

启动流程图:
流程图
核心源码解析:

1.1 启动入口类:QuorumPeerMain

public class QuorumPeerMain {
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            // 解析命令行参数(通常是zoo.cfg路径)
            main.initializeAndRun(args);
        } catch (Exception e) {
            LOG.error("Unexpected exception during startup", e);
            System.exit(2);
        }
    }

    protected void initializeAndRun(String[] args) 
        throws ConfigException, IOException {
        
        // 1. 解析配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]); // 解析zoo.cfg文件
        }

        // 2. 启动数据清理守护线程
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
            config.getDataDir(), 
            config.getDataLogDir(),
            config.getSnapRetainCount(),  // 保留的快照数量
            config.getPurgeInterval()     // 清理间隔(小时)
        );
        purgeMgr.start();

        // 3. 判断启动模式
        if (config.isDistributed()) {
            // 集群模式启动
            runFromConfig(config);
        } else {
            // 单机模式启动(省略)
        }
    }
}

1.2 集群模式启动核心:runFromConfig

public void runFromConfig(QuorumPeerConfig config) throws IOException {
    // === 1. 初始化网络通信层 ===
    ServerCnxnFactory cnxnFactory = null;
    if (config.getClientPortAddress() != null) {
        // 使用反射创建通信工厂(默认NIOServerCnxnFactory)
        cnxnFactory = ServerCnxnFactory.createFactory();
        
        // 配置端口和最大连接数(核心方法)
        cnxnFactory.configure(config.getClientPortAddress(), 
                             config.getMaxClientCnxns());
    }

    // === 2. 初始化数据存储 ===
    // 创建事务日志和快照文件管理器
    FileTxnSnapLog txnLog = new FileTxnSnapLog(
        new File(config.getDataLogDir()), 
        new File(config.getDataDir())
    );

    // === 3. 创建QuorumPeer实例(核心线程) ===
    QuorumPeer quorumPeer = new QuorumPeer();
    
    // 3.1 基础配置注入
    quorumPeer.setTxnFactory(txnLog);            // 事务日志管理器
    quorumPeer.setQuorumPeers(config.getServers()); // 集群节点列表
    quorumPeer.setElectionType(config.getElectionAlg()); // 选举算法
    quorumPeer.setMyid(config.getServerId());     // 当前节点ID
    quorumPeer.setTickTime(config.getTickTime()); // 心跳间隔(ms)
    quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
    quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
    
    // 3.2 配置网络层
    if (cnxnFactory != null) {
        quorumPeer.setServerCnxnFactory(cnxnFactory);
    }
    
    // 3.3 配置数据存储
    quorumPeer.setZKDatabase(new ZKDatabase(txnLog));
    
    // 3.4 恢复数据
    quorumPeer.setLastLoggedZxid(txnLog.restore(quorumPeer.zkDb, quorumPeer));

    // === 4. 启动QuorumPeer线程 ===
    quorumPeer.start(); // 启动线程(进入run()方法)
}

1.3 QuorumPeer线程核心逻辑:run()

public void run() {
    while (running) {
        switch (getPeerState()) {
            case LOOKING: // 选举状态
                try {
                    // 1. 执行Leader选举
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception during election", e);
                    // 异常处理...
                }
                break;
                
            case FOLLOWING: // Follower状态
                try {
                    // 2. 启动Follower服务
                    follower = new Follower(this, new FollowerZooKeeperServer(...));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception in follower", e);
                } finally {
                    follower.shutdown();
                }
                break;
                
            case LEADING: // Leader状态
                try {
                    // 3. 启动Leader服务
                    leader = new Leader(this, new LeaderZooKeeperServer(...));
                    leader.lead();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception in leader", e);
                } finally {
                    leader.shutdown("Unexpected exception");
                }
        }
    }
}

1.4 关键子流程:数据恢复

// FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions) {
    // 1. 从快照恢复
    long deserializeResult = snapLog.deserialize(dt, sessions);
    
    // 2. 从事务日志恢复
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    long highestZxid = fastForwardFromEdits(dt, sessions);
    
    // 返回最大的ZXID
    return highestZxid;
}

// 快照恢复核心方法
public long deserialize(DataTree dt, Map<Long, Integer> sessions) 
    throws IOException {
    
    // 找到最新的快照文件
    File snapShot = findMostRecentSnapshot();
    if (snapShot == null) {
        return -1L; // 无快照
    }
    
    try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snapShot))) {
        // 反序列化快照
        InputArchive ia = BinaryInputArchive.getArchive(snapIS);
        deserialize(dt, sessions, ia); // 将快照加载到DataTree
        return dt.lastProcessedZxid;   // 返回快照对应的ZXID
    }
}

1.5 关键设计要点

分层初始化架构:
分层架构
数据恢复策略:

  • 先加载最新快照(snapshot.xxx文件)
  • 再重放快照之后的所有事务日志(log.xxx文件)
  • 使用CRC32校验数据完整性

状态机设计:

  • LOOKING:选举状态,执行FastLeaderElection
  • FOLLOWING:启动Follower服务,连接Leader
  • LEADING:启动Leader服务,维护集群

资源清理机制:

  • DatadirCleanupManager:定期清理旧快照和日志
  • 按保留策略(默认3个快照)自动删除历史文件

启动流程中的关键对象

对象名 作用描述 生命周期
QuorumPeer 集群节点主线程 整个运行期间
ServerCnxnFactory 网络通信服务 整个运行期间
FileTxnSnapLog 事务日志和快照管理 整个运行期间
ZKDatabase 内存数据库(DataTree) 整个运行期间
Follower/Leader 角色特定行为实现 状态持续期间

二、请求处理链(责任链模式)

2.1 Leader服务器处理链

// LeaderZooKeeperServer.java
protected void setupRequestProcessors() {
    // 创建最终处理器(实际执行操作)
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    
    // 创建待应用处理器(记录待提交提案)
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
        finalProcessor, getLeader());
    
    // 创建提交处理器(保证请求顺序性)
    CommitProcessor commitProcessor = new CommitProcessor(
        toBeAppliedProcessor, 
        "CommitProcessor", 
        getZooKeeperServer().isMatchSyncs()
    );
    
    // 创建提案处理器(广播提案)
    RequestProcessor proposalProcessor = new ProposalRequestProcessor(
        this, 
        commitProcessor
    );
    
    // 创建准备处理器(请求预处理)
    PrepRequestProcessor prepProcessor = new PrepRequestProcessor(
        this, 
        proposalProcessor
    );
    
    // 构建完整处理链
    firstProcessor = new LeaderRequestProcessor(this, prepProcessor);
    
    // 启动所有处理器线程
    startProcessors(new RequestProcessor[] {
        prepProcessor,
        proposalProcessor,
        commitProcessor,
        finalProcessor
    });
}

2.2 Follower服务器处理链

// FollowerZooKeeperServer.java
protected void setupRequestProcessors() {
    // 创建最终处理器
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    
    // 创建提交处理器
    commitProcessor = new CommitProcessor(
        finalProcessor, 
        "CommitProcessor", 
        true
    );
    
    // 创建同步处理器(持久化事务日志)
    syncProcessor = new SyncRequestProcessor(
        this, 
        new SendAckRequestProcessor(getFollower())
    );
    
    // 构建处理链
    firstProcessor = new FollowerRequestProcessor(this, syncProcessor);
    
    // 启动处理器线程
    startProcessors(new RequestProcessor[] {
        firstProcessor, 
        syncProcessor, 
        commitProcessor
    });
}

2.3 核心处理器

  1. PrepRequestProcessor:请求预处理
public void run() {
    try {
        while (true) {
            // 1. 从队列获取请求
            Request request = submittedRequests.take();
            
            // 2. 预处理请求(核心方法)
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this, e);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    // 请求类型检查(1-21为合法操作码)
    if (request.type < 0 || request.type > OpCode.maxOp) {
        throw new RequestProcessorException("Invalid op type");
    }
    
    try {
        // 3. 反序列化请求
        ByteBufferInputStream bbis = new ByteBufferInputStream(request.request);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis);
        Record record = null;
        
        // 4. 根据操作类型反序列化不同请求
        switch (request.type) {
            case OpCode.create:
                record = new CreateRequest();
                break;
            case OpCode.delete:
                record = new DeleteRequest();
                break;
            // 其他操作类型处理...
        }
        record.deserialize(bia, "request");
        
        // 5. 权限检查
        if (request.authInfo != null) {
            checkACL(request, record);
        }
        
        // 6. 生成事务头
        request.hdr = new TxnHeader(
            request.sessionId,
            request.cxid,
            zks.getZKDatabase().getNextZxid(), // 分配全局唯一ZXID
            Time.currentWallTime(),
            request.type
        );
        
        // 7. 传递到下一处理器
        nextProcessor.processRequest(request);
    } catch (Exception e) {
        // 异常处理...
    }
}
  1. SyncRequestProcessor:事务持久化
public void run() {
    try {
        int logCount = 0;
        while (true) {
            Request request = queuedRequests.take();
            
            // 1. 持久化到事务日志
            if (request != null) {
                // 写事务日志
                zks.getZKDatabase().append(request);
                
                // 写快照(按阈值触发)
                if (logCount > (snapCount / 2 + randRoll)) {
                    randRoll = r.nextInt(snapCount/2);
                    zks.takeSnapshot();
                    logCount = 0;
                }
            }
            
            // 2. 传递给下一处理器
            if (nextProcessor != null) {
                nextProcessor.processRequest(request);
            }
        }
    } catch (Exception e) {
        // 异常处理...
    }
}
  1. ProposalRequestProcessor:提案广播(仅Leader)
public void processRequest(Request request) {
    // 1. 读请求直接传递
    if (!Request.isValid(request.type)) {
        nextProcessor.processRequest(request);
        return;
    }
    
    // 2. 创建提案对象
    Proposal p = new Proposal();
    p.packet = new QuorumPacket();
    p.request = request;
    
    // 3. 将提案加入待发送队列
    synchronized (leader) {
        leader.addProposal(p);
    }
    
    // 4. 传递给下一处理器
    nextProcessor.processRequest(request);
}

// Leader.addProposal实现
public void addProposal(Proposal p) {
    synchronized (toBeProposed) {
        // 添加到待提案队列
        toBeProposed.add(p);
        // 唤醒发送线程
        toBeProposed.notifyAll();
    }
}
  1. CommitProcessor:提交调度器
public void run() {
    try {
        Request nextPending = null;
        while (true) {
            // 1. 检查是否有新请求
            if (nextPending == null) {
                nextPending = queuedRequests.take();
            }
            
            // 2. 处理提交请求
            if (nextPending.type == OpCode.commit) {
                // 按ZXID顺序提交
                commit(nextPending.zxid);
                nextPending = null;
            } 
            // 3. 处理本地读请求
            else if (nextPending.type == OpCode.getData) {
                nextProcessor.processRequest(nextPending);
                nextPending = null;
            }
            // 4. 写请求放入等待队列
            else {
                synchronized (queuedWriteRequests) {
                    queuedWriteRequests.add(nextPending);
                    nextPending = null;
                }
            }
            
            // 5. 检查可提交的写请求
            while (!queuedWriteRequests.isEmpty()) {
                Request writeReq = queuedWriteRequests.peek();
                // 如果该请求的ZXID已被提交
                if (writeReq.zxid <= lastCommitted) {
                    queuedWriteRequests.poll();
                    nextProcessor.processRequest(writeReq);
                } else {
                    break;
                }
            }
        }
    } catch (Exception e) {
        // 异常处理...
    }
}
  1. FinalRequestProcessor:最终执行
public void processRequest(Request request) {
    // 1. 会话有效性检查
    if (request.sessionId != 0) {
        Session session = zks.sessionTracker.getSession(request.sessionId);
        if (session == null) {
            return; // 会话已过期
        }
    }
    
    try {
        // 2. 执行请求操作
        switch (request.type) {
            case OpCode.create:
                processCreate(request);
                break;
            case OpCode.delete:
                processDelete(request);
                break;
            case OpCode.getData:
                processGetData(request);
                break;
            // 其他操作类型处理...
        }
    } catch (Exception e) {
        // 异常处理...
    }
    
    // 3. 发送响应
    if (request.cnxn != null) {
        request.cnxn.sendResponse(hdr, rsp, "response");
    }
}

private void processCreate(Request request) {
    CreateRequest createReq = (CreateRequest)request.request;
    // 在DataTree中创建节点
    rsp = zks.getZKDatabase().createNode(
        createReq.getPath(), 
        createReq.getData(),
        createReq.getAcl(),
        createReq.getFlags(),
        request.hdr.getZxid()
    );
}

处理链工作流程图:
处理链工作流程

处理器功能对比表:

处理器 所属角色 核心职责 关键数据结构
PrepRequestProcessor Leader/Follower 请求反序列化/ACL检查 RequestQueue
SyncRequestProcessor Leader/Follower 事务日志持久化 TransactionLog
ProposalRequestProcessor 仅Leader 提案广播 ProposalQueue
CommitProcessor Leader/Follower 请求提交调度 QueuedWriteRequests
FinalRequestProcessor Leader/Follower 内存数据库操作 DataTree/ZKDatabase

典型问题排查:

  1. 请求卡住:
    • 检查CommitProcessor是否堆积大量请求
    • 确认集群是否达到多数派(网络分区?)
  2. ACL权限拒绝:
    • PrepRequestProcessor中checkACL()抛出异常
    • 检查客户端认证信息
  3. 事务日志写入失败:
    • SyncRequestProcessor捕获IO异常
    • 检查磁盘空间和权限
  4. 提案丢失:
    • ProposalRequestProcessor未成功加入提案队列
    • 检查Leader选举状态

三、网络通信层(NIOServerCnxnFactory为例)

3.1 核心类结构与初始化

  1. 服务启动入口:NIOServerCnxnFactory
public class NIOServerCnxnFactory extends ServerCnxnFactory {
    // 核心组件
    private SelectorThread selectorThread;    // 主选择器线程
    private AcceptThread acceptThread;        // 接收连接线程
    private final ConnectionExpirer expirer;  // 连接过期管理器
    
    // 配置参数
    private int maxClientCnxns = 60;          // 最大连接数
    private int sessionlessCnxnTimeout;       // 无会话连接超时
    
    // 初始化方法
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        // 1. 初始化接收线程
        acceptThread = new AcceptThread(
            serverSock = ServerSocketChannel.open(),
            addr,
            selectorThread.getSelector()
        );
        
        // 2. 配置端口参数
        serverSock.socket().setReuseAddress(true);
        serverSock.socket().bind(addr);
        serverSock.configureBlocking(false);
        
        // 3. 启动线程
        acceptThread.start();
        selectorThread.start();
    }
}

3.2 核心处理流程源码解析

  1. 连接接收线程:AcceptThread
class AcceptThread extends Thread {
    public void run() {
        while (!stopped) {
            try {
                // 1. 等待新连接
                SocketChannel sc = serverSock.accept();
                if (sc != null) {
                    // 2. 配置连接参数
                    sc.configureBlocking(false);
                    sc.socket().setTcpNoDelay(true);
                    
                    // 3. 创建连接对象
                    NIOServerCnxn cnxn = createConnection(sc);
                    
                    // 4. 注册到选择器
                    selectorThread.addCnxn(cnxn);
                }
            } catch (IOException e) {
                LOG.warn("AcceptThread exception", e);
            }
        }
    }
    
    private NIOServerCnxn createConnection(SocketChannel sock) {
        // 初始化连接对象
        return new NIOServerCnxn(
            NIOServerCnxnFactory.this, 
            sock, 
            selectorThread.getSelector(),
            selectorThread.getNextWorker()
        );
    }
}
  1. 选择器线程:SelectorThread
class SelectorThread extends Thread {
    private final Selector selector;
    private final Set<NIOServerCnxn> cnxns = new HashSet<>();
    private final WorkerService workerPool;  // I/O工作线程池
    
    public void run() {
        while (!stopped) {
            try {
                // 1. 选择就绪事件
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                
                // 2. 处理所有就绪事件
                for (SelectionKey k : selected) {
                    if (k.isReadable() || k.isWritable()) {
                        // 3. 获取连接对象
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        
                        // 4. 提交给IOWorker处理
                        c.getWorker().schedule(c);
                    }
                }
                selected.clear();
            } catch (Exception e) {
                LOG.warn("SelectorThread error", e);
            }
        }
    }
    
    // 添加新连接
    void addCnxn(NIOServerCnxn cnxn) {
        synchronized (cnxns) {
            // 1. 检查连接数限制
            if (cnxns.size() >= maxClientCnxns) {
                cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_REJECTED);
                return;
            }
            
            // 2. 注册读事件
            cnxn.register(selector);
            cnxns.add(cnxn);
        }
    }
}
  1. I/O工作线程:IOWorkRequest
class IOWorkRequest extends WorkerService.WorkRequest {
    private final NIOServerCnxn cnxn;
    
    public void doWork() throws InterruptedException {
        // 1. 处理读事件
        if (cnxn.sockKey.isReadable()) {
            // 从通道读取数据
            int rc = cnxn.sock.read(cnxn.recvBuffer);
            
            if (rc > 0) {
                // 反序列化请求
                cnxn.recvBuffer.flip();
                processRequest(cnxn.recvBuffer);
            } else if (rc < 0) {
                // 连接关闭
                cnxn.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED);
            }
        }
        
        // 2. 处理写事件
        if (cnxn.sockKey.isWritable()) {
            // 获取待发送响应
            ByteBuffer bb = cnxn.outgoingQueue.poll();
            if (bb != null) {
                // 写入通道
                cnxn.sock.write(bb);
                
                // 如果队列还有数据,保持写事件注册
                if (!cnxn.outgoingQueue.isEmpty()) {
                    cnxn.enableWrite();
                }
            }
        }
    }
    
    private void processRequest(ByteBuffer buffer) {
        try {
            // 1. 反序列化请求头
            BinaryInputArchive bia = BinaryInputArchive.getArchive(
                new ByteBufferInputStream(buffer)
            );
            RequestHeader h = new RequestHeader();
            h.deserialize(bia, "header");
            
            // 2. 创建请求对象
            Request req = new Request(
                cnxn, 
                h.getSessionId(), 
                h.getXid(), 
                h.getType(), 
                buffer,
                cnxn.getAuthInfo()
            );
            
            // 3. 提交给处理链
            cnxn.zkServer.processRequest(req);
        } catch (Exception e) {
            LOG.error("Request processing error", e);
        }
    }
}
  1. 连接对象:NIOServerCnxn
class NIOServerCnxn extends ServerCnxn {
    final SocketChannel sock;          // 底层Socket通道
    final SelectionKey sockKey;        // 选择键
    final IOWorker worker;             // 分配的I/O工作线程
    
    // 缓冲区管理
    ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);
    final Queue<ByteBuffer> outgoingQueue = new ConcurrentLinkedQueue<>();
    
    // 注册选择器
    void register(Selector selector) throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_READ, this);
    }
    
    // 发送响应
    public void sendResponse(ReplyHeader h, Record r, String tag) {
        // 1. 序列化响应
        ByteBuffer bb = serializeResponse(h, r, tag);
        
        // 2. 加入发送队列
        outgoingQueue.add(bb);
        
        // 3. 注册写事件
        enableWrite();
    }
    
    private void enableWrite() {
        int i = sockKey.interestOps();
        if ((i & SelectionKey.OP_WRITE) == 0) {
            sockKey.interestOps(i | SelectionKey.OP_WRITE);
        }
    }
    
    // 关闭连接
    public void close(DisconnectReason reason) {
        try {
            // 1. 取消选择键
            if (sockKey != null) sockKey.cancel();
            
            // 2. 关闭通道
            sock.close();
            
            // 3. 清理会话
            zkServer.removeCnxn(this);
        } catch (IOException e) {
            LOG.debug("Error closing connection", e);
        }
    }
}

核心流程时序图:
核心流程时序图

3.3 性能优化技术

  1. I/O工作线程池
workerPool = new WorkerService(
    "NIOWorker", 
    numWorkerThreads,  // 默认2*CPU核心数
    true               // 守护线程
);

避免Selector线程被阻塞。
并行处理多个连接的I/O。

  1. 智能事件注册:减少不必要的Selector唤醒
// 只在有数据要写时注册写事件
void enableWrite() {
    int i = sockKey.interestOps();
    if ((i & SelectionKey.OP_WRITE) == 0) {
        sockKey.interestOps(i | SelectionKey.OP_WRITE);
    }
}
  1. 缓冲区复用
// 接收缓冲区复用
if (!recvBuffer.hasRemaining()) {
    recvBuffer = ByteBuffer.allocateDirect(
        recvBuffer.capacity() * 2);
}

动态扩容避免频繁分配。
大连接使用大缓冲区。

  1. 批量响应发送:单次系统调用发送多个响应包
void doWrite() {
    int batchSize = 10;
    while (batchSize-- > 0 && !outgoingQueue.isEmpty()) {
        ByteBuffer bb = outgoingQueue.poll();
        sock.write(bb);
    }
}

关键参数调优:

参数名 默认值 作用 调优建议
maxClientCnxns 60 单IP最大连接数 根据客户端类型调整
clientPortAddress 0.0.0.0:2181 监听地址 生产环境绑定内网IP
nioWorkerThreads 2 * CPU核心 I/O工作线程数 高并发场景增加
sessionlessCnxnTimeout 10000ms 无会话连接超时 防止恶意连接
maxResponseCacheSize 400 响应缓存大小 根据内存调整

四、Leader选举(FastLeaderElection)

算法核心:ZAB协议的选举阶段
选举流程

  1. 自增epoch(logicalclock++)
  2. 初始化投票:vote = (myid, zxid, epoch)
  3. 广播NOTIFICATION消息
  4. 接收投票并统计:
// FastLeaderElection#totalOrderPredicate()
if (new_zxid > current_zxid) return true; // 优先选zxid大的
if (new_zxid == current_zxid && new_id > current_id) return true; // zxid相同时选serverId大的
  1. 超过半数支持则成为Leader

节点状态转换

// QuorumPeer#run()
switch (getPeerState()) {
    case LOOKING:
        leaderElector.lookForLeader(); // 选举中
    case FOLLOWING:
        follower.followLeader(); // 跟随状态
    case LEADING:
        leader.lead(); // 领导状态
}

五、Zab协议实现

Zab协议流程图解:
Zab协议流程图

5.1 主要流程源码

  1. 协议状态机:QuorumPeer
public void run() {
    while (running) {
        switch (getPeerState()) {
            case LOOKING: // 选举阶段
                setCurrentVote(makeLEStrategy().lookForLeader());
                break;
                
            case FOLLOWING: // Follower状态
                Follower follower = new Follower(this, ...);
                follower.followLeader(); // 包含Discovery和Sync阶段
                break;
                
            case LEADING: // Leader状态
                Leader leader = new Leader(this, ...);
                leader.lead(); // 包含Broadcast阶段
                break;
        }
    }
}
  1. 发现阶段(Discovery)- Follower实现
// Follower.java
void followLeader() throws InterruptedException {
    // 1. 连接Leader
    connectToLeader(leaderAddr);
    
    // 2. 发送FOLLOWERINFO
    QuorumPacket fInfoPacket = new QuorumPacket(Leader.FOLLOWERINFO, ...);
    writePacket(fInfoPacket, true);
    
    // 3. 接收Leader的LeaderInfo
    QuorumPacket lInfoPacket = readPacket();
    if (lInfoPacket.getType() != Leader.LEADERINFO) {
        throw new IOException("First packet should be LEADERINFO");
    }
    
    // 4. 解析epoch
    long newEpoch = lInfoPacket.getEpoch();
    if (newEpoch < self.getAcceptedEpoch()) {
        throw new IOException("Epoch less than accepted epoch");
    }
    
    // 5. 发送ACKEPOCH
    QuorumPacket ackEpochPacket = new QuorumPacket(Leader.ACKEPOCH, ...);
    writePacket(ackEpochPacket, true);
    
    // 6. 进入同步阶段
    syncWithLeader(newEpoch);
}
  1. 同步阶段(Synchronization)
// Follower.java
protected void syncWithLeader(long newEpoch) throws Exception {
    // 1. 接收Leader的NEWLEADER包
    QuorumPacket newLeaderPacket = readPacket();
    if (newLeaderPacket.getType() != Leader.NEWLEADER) {
        throw new IOException("First packet should be NEWLEADER");
    }
    
    // 2. 检查是否需要同步
    if (self.getLastLoggedZxid() != leaderLastZxid) {
        // 3. 执行数据同步
        boolean needSnap = syncStrategy.determineSyncMethod();
        if (needSnap) {
            // 全量快照同步
            syncWithSnapshot(leader);
        } else {
            // 增量事务日志同步
            syncWithLogs(leader);
        }
    }
    
    // 4. 发送ACK给Leader
    writePacket(new QuorumPacket(Leader.ACK, ...), true);
    
    // 5. 等待Leader的UPTODATE包
    QuorumPacket uptodatePacket = readPacket();
    if (uptodatePacket.getType() != Leader.UPTODATE) {
        throw new IOException("Did not receive UPTODATE packet");
    }
    
    // 6. 进入广播阶段
    startFollowerThreads();
}
  1. 广播阶段(Broadcast)- Leader实现
// Leader.java
void lead() throws IOException, InterruptedException {
    // 1. 启动ZK服务
    startZkServer();
    
    // 2. 等待Follower连接
    waitForEpochAck(self.getId(), leaderStateSummary);
    
    // 3. 发送NEWLEADER包
    sendNewLeader();
    
    // 4. 等待多数Follower的ACK
    waitForNewLeaderAck(self.getId());
    
    // 5. 发送UPTODATE包
    sendUptodate();
    
    // 6. 进入广播循环
    while (running) {
        // 7. 从队列获取提案
        Proposal p = pendingProposals.take();
        
        // 8. 广播提案
        broadcastProposal(p);
        
        // 9. 等待ACK
        waitForAckQuorum(p);
        
        // 10. 提交提案
        commit(p);
    }
}

// 广播提案方法
private void broadcastProposal(Proposal p) {
    // 构造提案包
    QuorumPacket proposal = new QuorumPacket(
        Leader.PROPOSAL, 
        p.request.zxid,
        p.request.serialize(), 
        null
    );
    
    // 发送给所有Follower
    for (LearnerHandler f : followers) {
        f.queuePacket(proposal);
    }
    
    // 本地记录
    outstandingProposals.put(p.request.zxid, p);
}
  1. 提案提交与ACK处理
// Leader.java
private void waitForAckQuorum(Proposal p) {
    synchronized (p) {
        while (!p.hasAllQuorums()) {
            // 等待ACK
            p.wait(rpcTimeout);
        }
    }
}

// ACK处理
public void processAck(long sid, long zxid, SocketAddress followerAddr) {
    // 1. 获取对应提案
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) return;
    
    // 2. 添加ACK
    p.ackSet.add(sid);
    
    // 3. 检查是否达到多数
    if (isQuorumSynced(p.ackSet)) {
        synchronized (p) {
            // 4. 满足条件则唤醒等待线程
            p.notifyAll();
        }
    }
}

// 提交提案
private void commit(Proposal p) {
    // 1. 创建提交包
    QuorumPacket commitPacket = new QuorumPacket(
        Leader.COMMIT, 
        p.request.zxid, 
        null, 
        null
    );
    
    // 2. 广播COMMIT
    for (LearnerHandler f : followers) {
        f.queuePacket(commitPacket);
    }
    
    // 3. 本地提交
    commitProcessor.commit(p.request);
    
    // 4. 从未完成提案中移除
    outstandingProposals.remove(p.request.zxid);
}
  1. 崩溃恢复实现
// Leader.java
protected void recovery() {
    // 1. 获取最大ZXID
    long maxCommittedLog = getMaxCommittedLog();
    
    // 2. 获取未提交提案列表
    List<Proposal> outstanding = getOutstandingProposals();
    
    // 3. 重建提案状态
    for (Proposal p : outstanding) {
        // 4. 检查提案是否在多数派中持久化
        if (isCommittedInQuorum(p)) {
            // 重新提交
            commit(p);
        } else {
            // 丢弃提案
            outstandingProposals.remove(p.request.zxid);
        }
    }
    
    // 5. 重新建立与Follower的连接
    waitForEpochAck(self.getId(), leaderStateSummary);
}

5.2 关键数据结构

  1. 提案对象(Proposal)
class Proposal {
    long zxid;                  // 事务ID
    Request request;            // 原始请求
    Set<Long> ackSet = new HashSet<>(); // ACK集合
    boolean committed = false;  // 提交状态
    
    // 检查是否达到多数
    boolean hasAllQuorums() {
        return ackSet.size() >= getQuorumSize();
    }
}
  1. Leader状态跟踪
class Leader {
    // 未完成提案表
    ConcurrentHashMap<Long, Proposal> outstandingProposals = 
        new ConcurrentHashMap<>();
    
    // 已提交提案表
    ConcurrentSkipListSet<Long> committedLog = new ConcurrentSkipListSet<>();
    
    // Follower列表
    List<LearnerHandler> followers = Collections.synchronizedList(new ArrayList<>());
}

5.3 Zab协议特性实现

  1. 全序性保证
// 为每个提案分配全局唯一ZXID
public long getNextZxid() {
    // 高32位是epoch,低32位是计数器
    return (epoch << 32) | (counter++);
}
  1. 可靠性保证
// 等待多数ACK
while (!p.hasAllQuorums()) {
    p.wait(timeout);
}

Zab协议通过精心设计的四个阶段(选举、发现、同步、广播)实现了分布式系统的强一致性,其源码实现展示了以下核心思想:

  1. 状态机驱动:通过明确的状态转换管理协议流程
  2. 多数派原则:所有关键操作需获得多数节点确认
  3. 幂等设计:提案处理可安全重试
  4. 顺序保障:ZXID全局排序确保操作有序性
  5. 增量恢复:优先使用事务日志同步,减少全量传输

总结

通过对Zookeeper五大核心模块的源码级剖析,我们揭开了这个分布式协调服务的神秘面纱:
核心设计哲学总结

  • 分层架构
    从QuorumPeerMain启动入口到FinalRequestProcessor的请求终结,Zookeeper通过清晰的层级划分(网络层→处理链→存储层→协议层)实现了复杂功能的优雅解耦。
  • 状态机驱动范式
    通过LOOKING→FOLLOWING→LEADING三态转换,将分布式系统最复杂的共识问题转化为确定性的状态迁移,源码中QuorumPeer.run()的状态机实现堪称经典。
  • 流水线性能优化
    请求处理链的责任链模式(如Prep→Sync→Proposal→Commit的分段处理)与网络层的SelectorThread→IOWorker协作机制,共同构建了高吞吐量的处理流水线。

分布式共识的精髓实现

  • Zab协议的四步流程:选举(Election)→发现(Discovery)→同步(Sync)→广播(Broadcast)的精密协作,在Leader.lead()和Follower.followLeader()中得以完美呈现。
  • 崩溃恢复的智慧:通过epoch+ZXID的全局唯一标识(getNextZxid()实现)和提案重放机制,解决了分布式系统最棘手的脑裂问题。
  • 数据一致性保障:CommitProcessor的顺序提交控制与outstandingProposals的多数派确认机制,共同守护了状态机的线性一致性。

源码阅读的价值
当我们在3万行源码中追踪一个create /node请求的完整生命周期:

  1. 从NIOServerCnxn的字节反序列化开始
  2. 穿越PrepRequestProcessor的ACL检查
  3. 经历SyncRequestProcessor的磁盘持久化
  4. 通过Zab协议的提案广播
  5. 最终在DataTree落地生根

这种全景式跟踪带来的认知深度,远超过任何理论描述。

本篇虽已深入核心流程,但Zookeeper的精华远不止此:会话管理的神秘时间轮、Watch机制的跨节点传播、动态配置的切换… 这些留给读者探索的宝藏,正是分布式领域永不枯竭的技术魅力。


网站公告

今日签到

点亮在社区的每一天
去签到