文章目录
- 前言
- 一、服务端启动流程
- 1.1 启动入口类:QuorumPeerMain
- 1.2 集群模式启动核心:runFromConfig
- 1.3 QuorumPeer线程核心逻辑:run()
- 1.4 关键子流程:数据恢复
- 1.5 关键设计要点
- 二、请求处理链(责任链模式)
- 2.1 Leader服务器处理链
- 2.2 Follower服务器处理链
- 2.3 核心处理器
- 三、网络通信层(NIOServerCnxnFactory为例)
- 3.1 核心类结构与初始化
- 3.2 核心处理流程源码解析
- 3.3 性能优化技术
- 四、Leader选举(FastLeaderElection)
- 五、Zab协议实现
- 5.1 主要流程源码
- 5.2 关键数据结构
- 5.3 Zab协议特性实现
- 总结
前言
在分布式系统中,协调服务是构建高可用架构的基石。经过前九篇对Zookeeper基础原理、应用场景和API的深入探讨,我们终于迎来核心源码解析的关键篇章。本文将深入Zookeeper最核心的运行时脉络,揭开服务启动、请求处理、网络通信和一致性协议四大核心模块的实现奥秘。
一、服务端启动流程
启动流程图:
核心源码解析:
1.1 启动入口类:QuorumPeerMain
public class QuorumPeerMain {public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {// 解析命令行参数(通常是zoo.cfg路径)main.initializeAndRun(args);} catch (Exception e) {LOG.error("Unexpected exception during startup", e);System.exit(2);}}protected void initializeAndRun(String[] args) throws ConfigException, IOException {// 1. 解析配置文件QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]); // 解析zoo.cfg文件}// 2. 启动数据清理守护线程DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(),config.getSnapRetainCount(), // 保留的快照数量config.getPurgeInterval() // 清理间隔(小时));purgeMgr.start();// 3. 判断启动模式if (config.isDistributed()) {// 集群模式启动runFromConfig(config);} else {// 单机模式启动(省略)}}
}
1.2 集群模式启动核心:runFromConfig
public void runFromConfig(QuorumPeerConfig config) throws IOException {// === 1. 初始化网络通信层 ===ServerCnxnFactory cnxnFactory = null;if (config.getClientPortAddress() != null) {// 使用反射创建通信工厂(默认NIOServerCnxnFactory)cnxnFactory = ServerCnxnFactory.createFactory();// 配置端口和最大连接数(核心方法)cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());}// === 2. 初始化数据存储 ===// 创建事务日志和快照文件管理器FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));// === 3. 创建QuorumPeer实例(核心线程) ===QuorumPeer quorumPeer = new QuorumPeer();// 3.1 基础配置注入quorumPeer.setTxnFactory(txnLog); // 事务日志管理器quorumPeer.setQuorumPeers(config.getServers()); // 集群节点列表quorumPeer.setElectionType(config.getElectionAlg()); // 选举算法quorumPeer.setMyid(config.getServerId()); // 当前节点IDquorumPeer.setTickTime(config.getTickTime()); // 心跳间隔(ms)quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());// 3.2 配置网络层if (cnxnFactory != null) {quorumPeer.setServerCnxnFactory(cnxnFactory);}// 3.3 配置数据存储quorumPeer.setZKDatabase(new ZKDatabase(txnLog));// 3.4 恢复数据quorumPeer.setLastLoggedZxid(txnLog.restore(quorumPeer.zkDb, quorumPeer));// === 4. 启动QuorumPeer线程 ===quorumPeer.start(); // 启动线程(进入run()方法)
}
1.3 QuorumPeer线程核心逻辑:run()
public void run() {while (running) {switch (getPeerState()) {case LOOKING: // 选举状态try {// 1. 执行Leader选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception during election", e);// 异常处理...}break;case FOLLOWING: // Follower状态try {// 2. 启动Follower服务follower = new Follower(this, new FollowerZooKeeperServer(...));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception in follower", e);} finally {follower.shutdown();}break;case LEADING: // Leader状态try {// 3. 启动Leader服务leader = new Leader(this, new LeaderZooKeeperServer(...));leader.lead();} catch (Exception e) {LOG.warn("Unexpected exception in leader", e);} finally {leader.shutdown("Unexpected exception");}}}
}
1.4 关键子流程:数据恢复
// FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions) {// 1. 从快照恢复long deserializeResult = snapLog.deserialize(dt, sessions);// 2. 从事务日志恢复FileTxnLog txnLog = new FileTxnLog(dataDir);long highestZxid = fastForwardFromEdits(dt, sessions);// 返回最大的ZXIDreturn highestZxid;
}// 快照恢复核心方法
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {// 找到最新的快照文件File snapShot = findMostRecentSnapshot();if (snapShot == null) {return -1L; // 无快照}try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snapShot))) {// 反序列化快照InputArchive ia = BinaryInputArchive.getArchive(snapIS);deserialize(dt, sessions, ia); // 将快照加载到DataTreereturn dt.lastProcessedZxid; // 返回快照对应的ZXID}
}
1.5 关键设计要点
分层初始化架构:
数据恢复策略:
- 先加载最新快照(snapshot.xxx文件)
- 再重放快照之后的所有事务日志(log.xxx文件)
- 使用CRC32校验数据完整性
状态机设计:
- LOOKING:选举状态,执行FastLeaderElection
- FOLLOWING:启动Follower服务,连接Leader
- LEADING:启动Leader服务,维护集群
资源清理机制:
- DatadirCleanupManager:定期清理旧快照和日志
- 按保留策略(默认3个快照)自动删除历史文件
启动流程中的关键对象
对象名 | 作用描述 | 生命周期 |
---|---|---|
QuorumPeer | 集群节点主线程 | 整个运行期间 |
ServerCnxnFactory | 网络通信服务 | 整个运行期间 |
FileTxnSnapLog | 事务日志和快照管理 | 整个运行期间 |
ZKDatabase | 内存数据库(DataTree) | 整个运行期间 |
Follower/Leader | 角色特定行为实现 | 状态持续期间 |
二、请求处理链(责任链模式)
2.1 Leader服务器处理链
// LeaderZooKeeperServer.java
protected void setupRequestProcessors() {// 创建最终处理器(实际执行操作)RequestProcessor finalProcessor = new FinalRequestProcessor(this);// 创建待应用处理器(记录待提交提案)RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());// 创建提交处理器(保证请求顺序性)CommitProcessor commitProcessor = new CommitProcessor(toBeAppliedProcessor, "CommitProcessor", getZooKeeperServer().isMatchSyncs());// 创建提案处理器(广播提案)RequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);// 创建准备处理器(请求预处理)PrepRequestProcessor prepProcessor = new PrepRequestProcessor(this, proposalProcessor);// 构建完整处理链firstProcessor = new LeaderRequestProcessor(this, prepProcessor);// 启动所有处理器线程startProcessors(new RequestProcessor[] {prepProcessor,proposalProcessor,commitProcessor,finalProcessor});
}
2.2 Follower服务器处理链
// FollowerZooKeeperServer.java
protected void setupRequestProcessors() {// 创建最终处理器RequestProcessor finalProcessor = new FinalRequestProcessor(this);// 创建提交处理器commitProcessor = new CommitProcessor(finalProcessor, "CommitProcessor", true);// 创建同步处理器(持久化事务日志)syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));// 构建处理链firstProcessor = new FollowerRequestProcessor(this, syncProcessor);// 启动处理器线程startProcessors(new RequestProcessor[] {firstProcessor, syncProcessor, commitProcessor});
}
2.3 核心处理器
- PrepRequestProcessor:请求预处理
public void run() {try {while (true) {// 1. 从队列获取请求Request request = submittedRequests.take();// 2. 预处理请求(核心方法)pRequest(request);}} catch (Exception e) {handleException(this, e);}
}protected void pRequest(Request request) throws RequestProcessorException {// 请求类型检查(1-21为合法操作码)if (request.type < 0 || request.type > OpCode.maxOp) {throw new RequestProcessorException("Invalid op type");}try {// 3. 反序列化请求ByteBufferInputStream bbis = new ByteBufferInputStream(request.request);BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis);Record record = null;// 4. 根据操作类型反序列化不同请求switch (request.type) {case OpCode.create:record = new CreateRequest();break;case OpCode.delete:record = new DeleteRequest();break;// 其他操作类型处理...}record.deserialize(bia, "request");// 5. 权限检查if (request.authInfo != null) {checkACL(request, record);}// 6. 生成事务头request.hdr = new TxnHeader(request.sessionId,request.cxid,zks.getZKDatabase().getNextZxid(), // 分配全局唯一ZXIDTime.currentWallTime(),request.type);// 7. 传递到下一处理器nextProcessor.processRequest(request);} catch (Exception e) {// 异常处理...}
}
- SyncRequestProcessor:事务持久化
public void run() {try {int logCount = 0;while (true) {Request request = queuedRequests.take();// 1. 持久化到事务日志if (request != null) {// 写事务日志zks.getZKDatabase().append(request);// 写快照(按阈值触发)if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);zks.takeSnapshot();logCount = 0;}}// 2. 传递给下一处理器if (nextProcessor != null) {nextProcessor.processRequest(request);}}} catch (Exception e) {// 异常处理...}
}
- ProposalRequestProcessor:提案广播(仅Leader)
public void processRequest(Request request) {// 1. 读请求直接传递if (!Request.isValid(request.type)) {nextProcessor.processRequest(request);return;}// 2. 创建提案对象Proposal p = new Proposal();p.packet = new QuorumPacket();p.request = request;// 3. 将提案加入待发送队列synchronized (leader) {leader.addProposal(p);}// 4. 传递给下一处理器nextProcessor.processRequest(request);
}// Leader.addProposal实现
public void addProposal(Proposal p) {synchronized (toBeProposed) {// 添加到待提案队列toBeProposed.add(p);// 唤醒发送线程toBeProposed.notifyAll();}
}
- CommitProcessor:提交调度器
public void run() {try {Request nextPending = null;while (true) {// 1. 检查是否有新请求if (nextPending == null) {nextPending = queuedRequests.take();}// 2. 处理提交请求if (nextPending.type == OpCode.commit) {// 按ZXID顺序提交commit(nextPending.zxid);nextPending = null;} // 3. 处理本地读请求else if (nextPending.type == OpCode.getData) {nextProcessor.processRequest(nextPending);nextPending = null;}// 4. 写请求放入等待队列else {synchronized (queuedWriteRequests) {queuedWriteRequests.add(nextPending);nextPending = null;}}// 5. 检查可提交的写请求while (!queuedWriteRequests.isEmpty()) {Request writeReq = queuedWriteRequests.peek();// 如果该请求的ZXID已被提交if (writeReq.zxid <= lastCommitted) {queuedWriteRequests.poll();nextProcessor.processRequest(writeReq);} else {break;}}}} catch (Exception e) {// 异常处理...}
}
- FinalRequestProcessor:最终执行
public void processRequest(Request request) {// 1. 会话有效性检查if (request.sessionId != 0) {Session session = zks.sessionTracker.getSession(request.sessionId);if (session == null) {return; // 会话已过期}}try {// 2. 执行请求操作switch (request.type) {case OpCode.create:processCreate(request);break;case OpCode.delete:processDelete(request);break;case OpCode.getData:processGetData(request);break;// 其他操作类型处理...}} catch (Exception e) {// 异常处理...}// 3. 发送响应if (request.cnxn != null) {request.cnxn.sendResponse(hdr, rsp, "response");}
}private void processCreate(Request request) {CreateRequest createReq = (CreateRequest)request.request;// 在DataTree中创建节点rsp = zks.getZKDatabase().createNode(createReq.getPath(), createReq.getData(),createReq.getAcl(),createReq.getFlags(),request.hdr.getZxid());
}
处理链工作流程图:
处理器功能对比表:
处理器 | 所属角色 | 核心职责 | 关键数据结构 |
---|---|---|---|
PrepRequestProcessor | Leader/Follower | 请求反序列化/ACL检查 | RequestQueue |
SyncRequestProcessor | Leader/Follower | 事务日志持久化 | TransactionLog |
ProposalRequestProcessor | 仅Leader | 提案广播 | ProposalQueue |
CommitProcessor | Leader/Follower | 请求提交调度 | QueuedWriteRequests |
FinalRequestProcessor | Leader/Follower | 内存数据库操作 | DataTree/ZKDatabase |
典型问题排查:
- 请求卡住:
- 检查CommitProcessor是否堆积大量请求
- 确认集群是否达到多数派(网络分区?)
- ACL权限拒绝:
- PrepRequestProcessor中checkACL()抛出异常
- 检查客户端认证信息
- 事务日志写入失败:
- SyncRequestProcessor捕获IO异常
- 检查磁盘空间和权限
- 提案丢失:
- ProposalRequestProcessor未成功加入提案队列
- 检查Leader选举状态
三、网络通信层(NIOServerCnxnFactory为例)
3.1 核心类结构与初始化
- 服务启动入口:NIOServerCnxnFactory
public class NIOServerCnxnFactory extends ServerCnxnFactory {// 核心组件private SelectorThread selectorThread; // 主选择器线程private AcceptThread acceptThread; // 接收连接线程private final ConnectionExpirer expirer; // 连接过期管理器// 配置参数private int maxClientCnxns = 60; // 最大连接数private int sessionlessCnxnTimeout; // 无会话连接超时// 初始化方法public void configure(InetSocketAddress addr, int maxcc) throws IOException {// 1. 初始化接收线程acceptThread = new AcceptThread(serverSock = ServerSocketChannel.open(),addr,selectorThread.getSelector());// 2. 配置端口参数serverSock.socket().setReuseAddress(true);serverSock.socket().bind(addr);serverSock.configureBlocking(false);// 3. 启动线程acceptThread.start();selectorThread.start();}
}
3.2 核心处理流程源码解析
- 连接接收线程:AcceptThread
class AcceptThread extends Thread {public void run() {while (!stopped) {try {// 1. 等待新连接SocketChannel sc = serverSock.accept();if (sc != null) {// 2. 配置连接参数sc.configureBlocking(false);sc.socket().setTcpNoDelay(true);// 3. 创建连接对象NIOServerCnxn cnxn = createConnection(sc);// 4. 注册到选择器selectorThread.addCnxn(cnxn);}} catch (IOException e) {LOG.warn("AcceptThread exception", e);}}}private NIOServerCnxn createConnection(SocketChannel sock) {// 初始化连接对象return new NIOServerCnxn(NIOServerCnxnFactory.this, sock, selectorThread.getSelector(),selectorThread.getNextWorker());}
}
- 选择器线程:SelectorThread
class SelectorThread extends Thread {private final Selector selector;private final Set<NIOServerCnxn> cnxns = new HashSet<>();private final WorkerService workerPool; // I/O工作线程池public void run() {while (!stopped) {try {// 1. 选择就绪事件selector.select();Set<SelectionKey> selected = selector.selectedKeys();// 2. 处理所有就绪事件for (SelectionKey k : selected) {if (k.isReadable() || k.isWritable()) {// 3. 获取连接对象NIOServerCnxn c = (NIOServerCnxn) k.attachment();// 4. 提交给IOWorker处理c.getWorker().schedule(c);}}selected.clear();} catch (Exception e) {LOG.warn("SelectorThread error", e);}}}// 添加新连接void addCnxn(NIOServerCnxn cnxn) {synchronized (cnxns) {// 1. 检查连接数限制if (cnxns.size() >= maxClientCnxns) {cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_REJECTED);return;}// 2. 注册读事件cnxn.register(selector);cnxns.add(cnxn);}}
}
- I/O工作线程:IOWorkRequest
class IOWorkRequest extends WorkerService.WorkRequest {private final NIOServerCnxn cnxn;public void doWork() throws InterruptedException {// 1. 处理读事件if (cnxn.sockKey.isReadable()) {// 从通道读取数据int rc = cnxn.sock.read(cnxn.recvBuffer);if (rc > 0) {// 反序列化请求cnxn.recvBuffer.flip();processRequest(cnxn.recvBuffer);} else if (rc < 0) {// 连接关闭cnxn.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED);}}// 2. 处理写事件if (cnxn.sockKey.isWritable()) {// 获取待发送响应ByteBuffer bb = cnxn.outgoingQueue.poll();if (bb != null) {// 写入通道cnxn.sock.write(bb);// 如果队列还有数据,保持写事件注册if (!cnxn.outgoingQueue.isEmpty()) {cnxn.enableWrite();}}}}private void processRequest(ByteBuffer buffer) {try {// 1. 反序列化请求头BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(buffer));RequestHeader h = new RequestHeader();h.deserialize(bia, "header");// 2. 创建请求对象Request req = new Request(cnxn, h.getSessionId(), h.getXid(), h.getType(), buffer,cnxn.getAuthInfo());// 3. 提交给处理链cnxn.zkServer.processRequest(req);} catch (Exception e) {LOG.error("Request processing error", e);}}
}
- 连接对象:NIOServerCnxn
class NIOServerCnxn extends ServerCnxn {final SocketChannel sock; // 底层Socket通道final SelectionKey sockKey; // 选择键final IOWorker worker; // 分配的I/O工作线程// 缓冲区管理ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);final Queue<ByteBuffer> outgoingQueue = new ConcurrentLinkedQueue<>();// 注册选择器void register(Selector selector) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_READ, this);}// 发送响应public void sendResponse(ReplyHeader h, Record r, String tag) {// 1. 序列化响应ByteBuffer bb = serializeResponse(h, r, tag);// 2. 加入发送队列outgoingQueue.add(bb);// 3. 注册写事件enableWrite();}private void enableWrite() {int i = sockKey.interestOps();if ((i & SelectionKey.OP_WRITE) == 0) {sockKey.interestOps(i | SelectionKey.OP_WRITE);}}// 关闭连接public void close(DisconnectReason reason) {try {// 1. 取消选择键if (sockKey != null) sockKey.cancel();// 2. 关闭通道sock.close();// 3. 清理会话zkServer.removeCnxn(this);} catch (IOException e) {LOG.debug("Error closing connection", e);}}
}
核心流程时序图:
3.3 性能优化技术
- I/O工作线程池
workerPool = new WorkerService("NIOWorker", numWorkerThreads, // 默认2*CPU核心数true // 守护线程
);
避免Selector线程被阻塞。
并行处理多个连接的I/O。
- 智能事件注册:减少不必要的Selector唤醒
// 只在有数据要写时注册写事件
void enableWrite() {int i = sockKey.interestOps();if ((i & SelectionKey.OP_WRITE) == 0) {sockKey.interestOps(i | SelectionKey.OP_WRITE);}
}
- 缓冲区复用
// 接收缓冲区复用
if (!recvBuffer.hasRemaining()) {recvBuffer = ByteBuffer.allocateDirect(recvBuffer.capacity() * 2);
}
动态扩容避免频繁分配。
大连接使用大缓冲区。
- 批量响应发送:单次系统调用发送多个响应包
void doWrite() {int batchSize = 10;while (batchSize-- > 0 && !outgoingQueue.isEmpty()) {ByteBuffer bb = outgoingQueue.poll();sock.write(bb);}
}
关键参数调优:
参数名 | 默认值 | 作用 | 调优建议 |
---|---|---|---|
maxClientCnxns | 60 | 单IP最大连接数 | 根据客户端类型调整 |
clientPortAddress | 0.0.0.0:2181 | 监听地址 | 生产环境绑定内网IP |
nioWorkerThreads | 2 * CPU核心 | I/O工作线程数 | 高并发场景增加 |
sessionlessCnxnTimeout | 10000ms | 无会话连接超时 | 防止恶意连接 |
maxResponseCacheSize | 400 | 响应缓存大小 | 根据内存调整 |
四、Leader选举(FastLeaderElection)
算法核心:ZAB协议的选举阶段
选举流程:
- 自增epoch(logicalclock++)
- 初始化投票:vote = (myid, zxid, epoch)
- 广播NOTIFICATION消息
- 接收投票并统计:
// FastLeaderElection#totalOrderPredicate()
if (new_zxid > current_zxid) return true; // 优先选zxid大的
if (new_zxid == current_zxid && new_id > current_id) return true; // zxid相同时选serverId大的
- 超过半数支持则成为Leader
节点状态转换:
// QuorumPeer#run()
switch (getPeerState()) {case LOOKING:leaderElector.lookForLeader(); // 选举中case FOLLOWING:follower.followLeader(); // 跟随状态case LEADING:leader.lead(); // 领导状态
}
五、Zab协议实现
Zab协议流程图解:
5.1 主要流程源码
- 协议状态机:QuorumPeer
public void run() {while (running) {switch (getPeerState()) {case LOOKING: // 选举阶段setCurrentVote(makeLEStrategy().lookForLeader());break;case FOLLOWING: // Follower状态Follower follower = new Follower(this, ...);follower.followLeader(); // 包含Discovery和Sync阶段break;case LEADING: // Leader状态Leader leader = new Leader(this, ...);leader.lead(); // 包含Broadcast阶段break;}}
}
- 发现阶段(Discovery)- Follower实现
// Follower.java
void followLeader() throws InterruptedException {// 1. 连接LeaderconnectToLeader(leaderAddr);// 2. 发送FOLLOWERINFOQuorumPacket fInfoPacket = new QuorumPacket(Leader.FOLLOWERINFO, ...);writePacket(fInfoPacket, true);// 3. 接收Leader的LeaderInfoQuorumPacket lInfoPacket = readPacket();if (lInfoPacket.getType() != Leader.LEADERINFO) {throw new IOException("First packet should be LEADERINFO");}// 4. 解析epochlong newEpoch = lInfoPacket.getEpoch();if (newEpoch < self.getAcceptedEpoch()) {throw new IOException("Epoch less than accepted epoch");}// 5. 发送ACKEPOCHQuorumPacket ackEpochPacket = new QuorumPacket(Leader.ACKEPOCH, ...);writePacket(ackEpochPacket, true);// 6. 进入同步阶段syncWithLeader(newEpoch);
}
- 同步阶段(Synchronization)
// Follower.java
protected void syncWithLeader(long newEpoch) throws Exception {// 1. 接收Leader的NEWLEADER包QuorumPacket newLeaderPacket = readPacket();if (newLeaderPacket.getType() != Leader.NEWLEADER) {throw new IOException("First packet should be NEWLEADER");}// 2. 检查是否需要同步if (self.getLastLoggedZxid() != leaderLastZxid) {// 3. 执行数据同步boolean needSnap = syncStrategy.determineSyncMethod();if (needSnap) {// 全量快照同步syncWithSnapshot(leader);} else {// 增量事务日志同步syncWithLogs(leader);}}// 4. 发送ACK给LeaderwritePacket(new QuorumPacket(Leader.ACK, ...), true);// 5. 等待Leader的UPTODATE包QuorumPacket uptodatePacket = readPacket();if (uptodatePacket.getType() != Leader.UPTODATE) {throw new IOException("Did not receive UPTODATE packet");}// 6. 进入广播阶段startFollowerThreads();
}
- 广播阶段(Broadcast)- Leader实现
// Leader.java
void lead() throws IOException, InterruptedException {// 1. 启动ZK服务startZkServer();// 2. 等待Follower连接waitForEpochAck(self.getId(), leaderStateSummary);// 3. 发送NEWLEADER包sendNewLeader();// 4. 等待多数Follower的ACKwaitForNewLeaderAck(self.getId());// 5. 发送UPTODATE包sendUptodate();// 6. 进入广播循环while (running) {// 7. 从队列获取提案Proposal p = pendingProposals.take();// 8. 广播提案broadcastProposal(p);// 9. 等待ACKwaitForAckQuorum(p);// 10. 提交提案commit(p);}
}// 广播提案方法
private void broadcastProposal(Proposal p) {// 构造提案包QuorumPacket proposal = new QuorumPacket(Leader.PROPOSAL, p.request.zxid,p.request.serialize(), null);// 发送给所有Followerfor (LearnerHandler f : followers) {f.queuePacket(proposal);}// 本地记录outstandingProposals.put(p.request.zxid, p);
}
- 提案提交与ACK处理
// Leader.java
private void waitForAckQuorum(Proposal p) {synchronized (p) {while (!p.hasAllQuorums()) {// 等待ACKp.wait(rpcTimeout);}}
}// ACK处理
public void processAck(long sid, long zxid, SocketAddress followerAddr) {// 1. 获取对应提案Proposal p = outstandingProposals.get(zxid);if (p == null) return;// 2. 添加ACKp.ackSet.add(sid);// 3. 检查是否达到多数if (isQuorumSynced(p.ackSet)) {synchronized (p) {// 4. 满足条件则唤醒等待线程p.notifyAll();}}
}// 提交提案
private void commit(Proposal p) {// 1. 创建提交包QuorumPacket commitPacket = new QuorumPacket(Leader.COMMIT, p.request.zxid, null, null);// 2. 广播COMMITfor (LearnerHandler f : followers) {f.queuePacket(commitPacket);}// 3. 本地提交commitProcessor.commit(p.request);// 4. 从未完成提案中移除outstandingProposals.remove(p.request.zxid);
}
- 崩溃恢复实现
// Leader.java
protected void recovery() {// 1. 获取最大ZXIDlong maxCommittedLog = getMaxCommittedLog();// 2. 获取未提交提案列表List<Proposal> outstanding = getOutstandingProposals();// 3. 重建提案状态for (Proposal p : outstanding) {// 4. 检查提案是否在多数派中持久化if (isCommittedInQuorum(p)) {// 重新提交commit(p);} else {// 丢弃提案outstandingProposals.remove(p.request.zxid);}}// 5. 重新建立与Follower的连接waitForEpochAck(self.getId(), leaderStateSummary);
}
5.2 关键数据结构
- 提案对象(Proposal)
class Proposal {long zxid; // 事务IDRequest request; // 原始请求Set<Long> ackSet = new HashSet<>(); // ACK集合boolean committed = false; // 提交状态// 检查是否达到多数boolean hasAllQuorums() {return ackSet.size() >= getQuorumSize();}
}
- Leader状态跟踪
class Leader {// 未完成提案表ConcurrentHashMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<>();// 已提交提案表ConcurrentSkipListSet<Long> committedLog = new ConcurrentSkipListSet<>();// Follower列表List<LearnerHandler> followers = Collections.synchronizedList(new ArrayList<>());
}
5.3 Zab协议特性实现
- 全序性保证
// 为每个提案分配全局唯一ZXID
public long getNextZxid() {// 高32位是epoch,低32位是计数器return (epoch << 32) | (counter++);
}
- 可靠性保证
// 等待多数ACK
while (!p.hasAllQuorums()) {p.wait(timeout);
}
Zab协议通过精心设计的四个阶段(选举、发现、同步、广播)实现了分布式系统的强一致性,其源码实现展示了以下核心思想:
- 状态机驱动:通过明确的状态转换管理协议流程
- 多数派原则:所有关键操作需获得多数节点确认
- 幂等设计:提案处理可安全重试
- 顺序保障:ZXID全局排序确保操作有序性
- 增量恢复:优先使用事务日志同步,减少全量传输
总结
通过对Zookeeper五大核心模块的源码级剖析,我们揭开了这个分布式协调服务的神秘面纱:
核心设计哲学总结
- 分层架构
从QuorumPeerMain启动入口到FinalRequestProcessor的请求终结,Zookeeper通过清晰的层级划分(网络层→处理链→存储层→协议层)实现了复杂功能的优雅解耦。 - 状态机驱动范式
通过LOOKING→FOLLOWING→LEADING三态转换,将分布式系统最复杂的共识问题转化为确定性的状态迁移,源码中QuorumPeer.run()的状态机实现堪称经典。 - 流水线性能优化
请求处理链的责任链模式(如Prep→Sync→Proposal→Commit的分段处理)与网络层的SelectorThread→IOWorker协作机制,共同构建了高吞吐量的处理流水线。
分布式共识的精髓实现
- Zab协议的四步流程:选举(Election)→发现(Discovery)→同步(Sync)→广播(Broadcast)的精密协作,在Leader.lead()和Follower.followLeader()中得以完美呈现。
- 崩溃恢复的智慧:通过epoch+ZXID的全局唯一标识(getNextZxid()实现)和提案重放机制,解决了分布式系统最棘手的脑裂问题。
- 数据一致性保障:CommitProcessor的顺序提交控制与outstandingProposals的多数派确认机制,共同守护了状态机的线性一致性。
源码阅读的价值
当我们在3万行源码中追踪一个create /node请求的完整生命周期:
- 从NIOServerCnxn的字节反序列化开始
- 穿越PrepRequestProcessor的ACL检查
- 经历SyncRequestProcessor的磁盘持久化
- 通过Zab协议的提案广播
- 最终在DataTree落地生根
这种全景式跟踪带来的认知深度,远超过任何理论描述。
本篇虽已深入核心流程,但Zookeeper的精华远不止此:会话管理的神秘时间轮、Watch机制的跨节点传播、动态配置的切换… 这些留给读者探索的宝藏,正是分布式领域永不枯竭的技术魅力。