在 Kubernetes 为主流注册发现的今天,给出如何在 Spring Boot 中基于 ZooKeeper 实现服务注册/发现、分布式锁、配置中心以及集群协调的完整代码与最佳实践。所有示例均可直接复制运行。
1. ZooKeeper 架构与核心原理
1.1 角色
- Leader:处理写请求,广播事务(ZAB 协议)。
- Follower / Observer:处理读请求,Follower 参与选举,Observer 仅扩展读能力。
1.2 一致性协议:ZAB(ZooKeeper Atomic Broadcast)
- 所有写请求统一由 Leader 生成全局递增的
zxid
。 - 两阶段提交(Proposal → ACK → Commit)。
- 崩溃恢复阶段:根据
zxid
选举新 Leader,保证已 Commit 的事务不丢失。
1.3 数据模型
/
├── services
│ ├── user-service
│ │ ├── 192.168.1.10#8080 (EPHEMERAL_SEQUENTIAL)
│ │ └── 192.168.1.11#8080
│ └── order-service
├── configs
│ └── application.yml
└── locks├── pay_lock_0000000001 (EPHEMERAL_SEQUENTIAL)└── pay_lock_0000000002
- EPHEMERAL:会话断则节点自动删除,天然适合心跳/服务实例。
- SEQUENTIAL:节点名后缀自增,用于公平锁、队列。
2. Spring Boot 集成 ZooKeeper
场景:K8s 已有 Service 发现,但团队需要异构语言互通、强一致配置、分布式锁,于是引入 ZooKeeper。
2.1 依赖
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.5.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.5.0</version>
</dependency>
2.2 自动配置(Spring Boot 3.x)
@Configuration
@EnableConfigurationProperties(ZkProps.class)
public class ZkConfig {@Bean(initMethod = "start", destroyMethod = "close")public CuratorFramework curator(ZkProps p) {return CuratorFrameworkFactory.builder().connectString(p.getUrl()).sessionTimeoutMs(30_000).connectionTimeoutMs(10_000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();}@Beanpublic ServiceDiscovery<InstanceDetails> discovery(CuratorFramework client) throws Exception {ServiceDiscovery<InstanceDetails> sd = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath("/services").serializer(new JsonInstanceSerializer<>(InstanceDetails.class)).build();sd.start();return sd;}
}
2.3 服务注册(应用启动时自动注册)
@Component
@RequiredArgsConstructor
public class ZkRegistrar implements ApplicationRunner {private final ServiceDiscovery<InstanceDetails> discovery;private final ZkProps props;@Overridepublic void run(ApplicationArguments args) throws Exception {InstanceDetails payload = new InstanceDetails(props.getProfile());ServiceInstance<InstanceDetails> instance = ServiceInstance.<InstanceDetails>builder().name(props.getAppName()).id(props.getPodIp() + ":" + props.getPort()).address(props.getPodIp()).port(props.getPort()).payload(payload).build();discovery.registerService(instance);}
}
2.4 服务发现(负载均衡示例)
@Component
@RequiredArgsConstructor
public class ZkLoadBalancer {private final ServiceDiscovery<InstanceDetails> discovery;public InstanceDetails choose(String serviceName) throws Exception {Collection<ServiceInstance<InstanceDetails>> instances =discovery.queryForInstances(serviceName);if (instances.isEmpty()) throw new IllegalStateException("No instances");// 轮询return instances.stream().skip(ThreadLocalRandom.current().nextInt(instances.size())).findFirst().orElseThrow().getPayload();}
}
3. 分布式锁:Curator Recipes
Curator 提供 InterProcessMutex(可重入)、InterProcessSemaphoreMutex(不可重入)等实现。
3.1 配置
@Bean
public InterProcessMutex payLock(CuratorFramework client) {return new InterProcessMutex(client, "/locks/pay");
}
3.2 业务中使用
@Service
@RequiredArgsConstructor
public class PayService {private final InterProcessMutex payLock;public void pay(String orderId) throws Exception {if (payLock.acquire(10, TimeUnit.SECONDS)) {try {// 幂等扣款逻辑} finally {payLock.release();}} else {throw new RuntimeException("获取锁超时");}}
}
3.3 高级:读写锁
@Bean
public InterProcessReadWriteLock rwLock(CuratorFramework client) {return new InterProcessReadWriteLock(client, "/locks/rw");
}
4. 配置中心(动态刷新)
4.1 存储
/configs/application.yml
4.2 监听与热更新
@Component
@RequiredArgsConstructor
public class ConfigWatcher {private final CuratorFramework client;private final Environment env;@PostConstructpublic void watch() throws Exception {TreeCache cache = TreeCache.newBuilder(client, "/configs").build();cache.getListenable().addListener((cf, event) -> {if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {String path = event.getData().getPath();if (path.endsWith("application.yml")) {byte[] data = event.getData().getData();// 这里触发 Spring Environment 刷新((ConfigurableEnvironment) env).getPropertySources().replace("zk-config", new MapPropertySource("zk-config",new Yaml().load(new String(data))));}}});cache.start();}
}
5. 最佳实践与注意事项
维度 | 建议 |
---|---|
部署 | 3 或 5 节点奇数集群,独立 SSD,JVM 堆 4-8G,开启快照自动清理。 |
会话 | 会话超时 < 客户端 GC 时间;避免长时间 STW。 |
节点 | 数据节点 < 1MB,子节点 < 10 万;使用 Observer 扩展读。 |
锁 | 锁路径独立;锁内逻辑幂等、可重试;设置超时避免死锁。 |
K8s | 用 StatefulSet 部署 ZooKeeper;Headless Service 使 Pod 稳定 DNS。 |
迁移 | 若未来迁到 etcd,可通过 Curator-to-etcd Bridge 逐步替换。 |
6. 小结
功能 | K8s 原生 | ZooKeeper 方案优势 |
---|---|---|
服务发现 | CoreDNS | 跨语言、精细权重、健康检查可扩展 |
分布式锁 | ❌ | 强一致、可重入、读写锁 |
配置中心 | ConfigMap | 监听粒度细、版本化、变更审计 |
集群协调 | ❌ | Leader 选举、队列、屏障(Barrier) |
在 K8s 为主的今天,ZooKeeper 并非过时,而是作为强一致协调层的补充,特别适合金融交易、库存扣减、大规模异构系统。
参考阅读
- Curator 官方文档
- ZooKeeper Internals
如需进一步探讨性能压测脚本或K8s Operator 部署方案,欢迎留言!