ZooKeeper Java客户端与分布式应用实战

1. ZooKeeper Java客户端实战

ZooKeeper应用开发主要通过Java客户端API连接和操作ZooKeeper集群,有官方和第三方两种客户端选择。

1.1 ZooKeeper原生Java客户端

依赖引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>

注意:客户端版本需与服务端保持一致,避免兼容性问题

基本使用
public class ZkClientDemo {private static final String CLUSTER_CONNECT_STR = "192.168.22.156:2181,192.168.22.190:2181,192.168.22.200:2181";public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None) {countDownLatch.countDown();System.out.println("连接建立");}}});countDownLatch.await();System.out.println(zooKeeper.getState()); // CONNECTED// 创建持久节点zooKeeper.create("/user", "fox".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}
原生API的局限性
  • Watcher监测为一次性,需重复注册
  • 无自动重连机制
  • 异常处理复杂
  • 仅提供byte[]接口,缺少POJO序列化支持
  • 需手动检查节点存在性
  • 不支持级联删除
常用方法
  • create(path, data, acl, createMode):创建节点
  • delete(path, version):删除节点
  • exists(path, watch):判断节点存在性
  • getData(path, watch):获取节点数据
  • setData(path, data, version):设置节点数据
  • getChildren(path, watch):获取子节点列表
  • sync(path):同步客户端与leader节点

所有方法都提供同步和异步两个版本,且支持条件更新(通过version参数控制)。

同步创建节点
@Test
public void createTest() throws KeeperException, InterruptedException {String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}", path);
}
异步创建节点
@Test
public void createAsyncTest() throws InterruptedException {zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name),"context");
}
修改节点数据
@Test
public void setTest() throws KeeperException, InterruptedException {Stat stat = new Stat();byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}", new String(data));zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}", new String(dataAfter));
}

1.2 Curator开源客户端(常用)

依赖引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
客户端创建
// 方式一:使用newClient方法
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();// 方式二:使用builder模式(推荐)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base") // 命名空间隔离.build();
client.start();
重试策略类型
  • ExponentialBackoffRetry:重试间隔按指数增长
  • RetryNTimes:最大重试次数
  • RetryOneTime:只重试一次
  • RetryUntilElapsed:在指定时间内重试
基本操作
// 创建节点
@Test
public void testCreate() throws Exception {String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{} successfully.", path);
}// 创建层级节点
@Test
public void testCreateWithParent() throws Exception {String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);
}// 获取数据
@Test
public void testGetData() throws Exception {byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));
}// 更新数据
@Test
public void testSetData() throws Exception {curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));
}// 删除节点
@Test
public void testDelete() throws Exception {String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
异步接口
@Test
public void testAsync() throws Exception {// 默认在EventThread中执行curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}).forPath(ZK_NODE);// 指定自定义线程池ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}, executorService).forPath(ZK_NODE);
}
监听器机制

Curator提供了三种Cache监听模式:

  1. NodeCache - 监听单个节点
public class NodeCacheTest {public static final String NODE_CACHE = "/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(() -> {log.info("{} path nodeChanged: ", NODE_CACHE);printNodeData();});nodeCache.start();}
}
  1. PathChildrenCache - 监听子节点(不包含二级子节点)
public class PathCacheTest {public static final String PATH = "/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener((client, event) -> {log.info("event: {}", event);});pathChildrenCache.start(true);}
}
  1. TreeCache - 监听当前节点及所有递归子节点
public class TreeCacheTest {public static final String TREE_CACHE = "/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener((client, event) -> {log.info("tree cache: {}", event);});treeCache.start();}
}

2. ZooKeeper在分布式命名服务中的实战

2.1 分布式API目录

Dubbo框架使用ZooKeeper实现分布式JNDI功能:

  • 服务提供者在启动时向/dubbo/${serviceName}/providers节点写入API地址
  • 服务消费者订阅该节点下的URL地址,获取所有服务提供者的API

2.2 分布式节点命名

动态节点命名方案:

  1. 使用数据库自增ID特性
  2. 使用ZooKeeper持久顺序节点的顺序特性

ZooKeeper方案流程:

  • 启动服务,连接ZooKeeper,检查/创建根节点
  • 在根节点下创建临时顺序节点,取回编号作为NodeId
  • 根据需要删除临时顺序节点

2.3 分布式ID生成器

方案对比
  1. Java UUID
  2. Redis INCR/INCRBY操作
  3. Twitter SnowFlake算法
  4. ZooKeeper顺序节点
  5. MongoDB ObjectId
基于ZooKeeper的实现
public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String makeId(String path) throws Exception {String str = createSeqNode(path);if (null != str) {int index = str.lastIndexOf(path);if (index >= 0) {index += path.length();return index <= str.length() ? str.substring(index) : "";}}return str;}
}
基于SnowFlake算法的实现
public class SnowflakeIdGenerator {private static final long START_TIME = 1483200000000L;private static final int WORKER_ID_BITS = 13;private static final int SEQUENCE_BITS = 10;private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;private static final long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;private long workerId;private long lastTimestamp = -1L;private long sequence = 0L;public synchronized void init(long workerId) {if (workerId > MAX_WORKER_ID) {throw new IllegalArgumentException("worker Id wrong: " + workerId);}this.workerId = workerId;}private synchronized long generateId() {long current = System.currentTimeMillis();if (current < lastTimestamp) {return -1; // 时钟回拨}if (current == lastTimestamp) {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE) {current = this.nextMs(lastTimestamp);}} else {sequence = 0L;}lastTimestamp = current;long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}
}

3. ZooKeeper实现分布式队列

3.1 设计思路

  1. 创建持久节点作为队列根节点
  2. 入队:在根节点下创建临时有序节点
  3. 出队:获取最小序号节点,读取数据后删除

3.2 Curator实现

public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 序列化器QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建队列(可指定锁路径保证原子性)DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).lockPath("/orderlock") // 可选:分布式锁路径.buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}

3.3 注意事项

  • ZooKeeper不适合大数据量存储,官方不推荐作为队列使用
  • 在吞吐量不高的小型系统中较为适用
  • 使用锁路径(lockPath)可保证操作的原子性和顺序性
  • 不指定锁路径可提高性能,但可能面临并发问题

总结

ZooKeeper提供了强大的分布式协调能力,通过原生API或Curator客户端可以实现多种分布式场景下的解决方案。在选择方案时需要根据具体需求权衡性能、一致性和复杂性,特别是在高并发场景下需要考虑ZooKeeper的适用性和局限性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/96704.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/96704.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0303 【软考高项】项目管理概述 - 组织系统(项目型组织、职能型组织、矩阵型组织)

0303 【软考高项】项目管理概述 - 组织系统&#xff08;项目型组织、职能型组织、矩阵型组织&#xff09; 目录0303 【软考高项】项目管理概述 - 组织系统&#xff08;项目型组织、职能型组织、矩阵型组织&#xff09;一、基本概念二、职能型组织二、项目型组织三、矩阵型组织3…

计算机视觉与模式识别前沿一览:2025年8月arXiv 热点研究趋势解析

本推文分析了arXiv中Computer Vision and Patteren Recognition(计算机视觉与模式识别)领域2025年8月发布的近50篇论文的研究热点&#xff0c;旨在帮助读者快速了解近期领域内的前沿技术与研究方向。arXiv是全球最具影响力的开放电子预印本平台之一&#xff0c;由美国国家科学基…

vim复制本地到linux服务器上,换行缩进过大,不对的问题

所搜的试了:setlocal shiftwidth? :setlocal tabstop? :setlocal expandtab? :setlocal softtabstop?" 设置为 4 个空格缩进 :setlocal shiftwidth4" 通常你会希望 tabstop 和 softtabstop 也保持一致 :setlocal tabstop4 :setlocal softtabstop4尝试完不起作用&…

【小程序】微信小程序九宫格抽奖动画(完整版)

这是一个微信小程序九宫格抽奖页面的完整代码&#xff0c;包括 WXML、WXSS、JS 和 JSON。 效果 九宫格抽奖功能说明&#xff1a; 静态页面布局&#xff1a; 3x3 九宫格&#xff0c;中间是“立即抽奖”按钮&#xff0c;周围是奖品金额。抽奖动画&#xff1a; 点击“立即抽奖”…

java类冲突

一、为什么会发生类冲突&#xff1f; 在 Java 的类加载机制中&#xff0c;类的唯一性是由“类加载器类的全限定名”共同决定的。当你的项目依赖了多个 jar 包&#xff0c;这些 jar 包里有同名的类&#xff08;包名和类名完全一样&#xff09;&#xff0c;但实现却不同。类加载器…

GIT客户端配置支持中文

环境&#xff1a;windows10、Git-2.42.0.2-64-bit.exe1. 问题描述客户端安装后&#xff0c;默认是不支持中文显示的&#xff0c;中文名的文件显示乱码&#xff0c;提交时打的标签内容也不支持中文显示。2. 解决新建Git全局配置文件&#xff0c;文件名为.gitconfig&#xff0c;内…

Teable vs NocoDB 开源、在线协同 多维表格大PK

文章目录 Teable 简介 特性 docker-compose部署 功能截图 NocoDB 简介 docker-compose部署 功能截图 总结 Teable 简介 Teable 是一款企业级高性能多维表格解决方案,通过无代码方式快速构建业务管理系统,支持私有部署和精细权限管理。 官方文档 特性 🚀 卓越性能 轻松处…

SQL专家云能做哪些事儿?

背景数据库是信息化的基石&#xff0c;支撑着整个业务系统&#xff0c;发挥着非常重要的作用&#xff0c;被喻为“IT的心脏”。因此&#xff0c;让数据库安全、稳定、高效地运行已经成为IT管理者必须要面对的问题。但是很多组织没有专业的DBA&#xff0c;数据库运维面临着极大的…

Python 高效实现 Word 转 PDF:告别 Office 依赖

在工作中&#xff0c;经常会遇到需要把 Word 文档转换成 PDF 的情况。比如生成报表、分发文档、或者做归档保存&#xff0c;PDF 格式在排版和跨平台显示上更稳定。传统的做法往往依赖 Microsoft Office 或 LibreOffice 等软件来完成转换&#xff0c;但在自动化环境&#xff08;…

SQL优化简单思路

1. 背景 在实际生产中&#xff0c;因为SQL较慢、SQL关联不合理、不了解索引的性质、不熟悉mysql执行计划分析&#xff0c;可能会出现一些生产事故&#xff0c;本文会简单说明SQL通常的优化分析思路。 基本的优化原则&#xff1a; 先优化SQL再优化mysql server最后优化硬件 2. 优…

软考 系统架构设计师系列知识点之杂项集萃(144)

接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(143) 第268题 甲、乙、丙、丁4人加工A、B、C、D四种工件所需工时如下表所示。指派每人加工一种工件,四人加工四种工件其总工时最短的最优方案中,工件B应由()加工。 A B C D 甲

P1168 中位数

题目描述给定一个长度为 N 的非负整数序列 A&#xff0c;对于前奇数项求中位数。输入格式第一行一个正整数 N。第二行 N 个正整数 A1…N​。输出格式共 ⌊2N1​⌋ 行&#xff0c;第 i 行为 A1…2i−1​ 的中位数。输入输出样例输入 #1复制7 1 3 5 7 9 11 6输出 #11 3 5 6输入 #…

【CE】图形化CE游戏教程通关手册

【CE】图形化CE游戏教程通关手册 文章目录【CE】图形化CE游戏教程通关手册导读需求1️⃣ 第一关提示操作总结2️⃣ 第二关&#xff08;代码共享&#xff09;提示操作验证3️⃣ 第三关提示提示总结导读 需求 除了Tutorial-x86_64.exe教程外&#xff0c;CE还提供了图形化教程gtu…

leetcode 2785. 将字符串中的元音字母排序 中等

给你一个下标从 0 开始的字符串 s &#xff0c;将 s 中的元素重新 排列 得到新的字符串 t &#xff0c;它满足&#xff1a;所有辅音字母都在原来的位置上。更正式的&#xff0c;如果满足 0 < i < s.length 的下标 i 处的 s[i] 是个辅音字母&#xff0c;那么 t[i] s[i] 。…

支付子系统架构及常见问题

支付流程对于支付系统来说&#xff0c;它最重要的其实是安全&#xff0c;所以整个支付流程采用秘钥加签的方式进行操作&#xff0c;一共四对秘钥&#xff0c;以支付宝在线支付为例子&#xff0c;首先通过RSA2算法生成商户公钥以及商户私钥&#xff0c;同时支付宝平台会提供支付…

内存传输速率MT/s

1 0 0 0 0 0 0 0 0 010 9 8 7 6 5 4 3 2 1十 亿 千 百 十 万 千 百 十 个亿 万 万 万传输速率 …

.env文件的作用和使用方法

目录 什么是 .env 文件&#xff1f; 为什么要使用 .env 文件&#xff1f;&#xff08;好处&#xff09; 如何使用 .env 文件&#xff1f; 通用步骤&#xff1a; 具体技术栈中的实现&#xff1a; 最佳实践和注意事项 总结 什么是 .env 文件&#xff1f; .env 文件&#x…

深度拆解 Python 装饰器参数传递:从装饰器生效到参数转交的每一步

在 Python 装饰器的学习中&#xff0c;“被装饰函数的参数如何传递到装饰器内层函数”是一个高频疑问点。很多开发者能写出装饰器的基本结构&#xff0c;却对参数传递的底层逻辑一知半解。本文将以一段具体代码为例&#xff0c;把参数传递过程拆成“装饰器生效→调用触发→参数…

【Vue2 ✨】Vue2 入门之旅 · 进阶篇(七):Vue Router 原理解析

在前几篇文章中&#xff0c;我们介绍了 Vue 的性能优化机制、组件缓存等内容。本篇将深入解析 Vue Router 的原理&#xff0c;了解 Vue 如何管理路由并进行导航。 目录 Vue Router 的基本概念路由模式&#xff1a;hash 和 history路由匹配原理导航守卫Vue Router 的路由过渡动…

Linux磁盘级文件/文件系统理解

Linux磁盘级文件/文件系统理解 1. 磁盘的物理结构 磁盘的核心是一个利用磁性介质和机械运动进行数据读写的、非易失性的存储设备。 1.1 盘片 盘片是传统机械硬盘中最核心的部件&#xff0c;它是数据存储的物理载体。盘片是一个坚硬的、表面极度光滑的圆形碟片&#xff0c;被安装…