Linux下的C/C++开发之操作Zookeeper

ZooKeeper C 客户端简介与安装

ZooKeeper C API 简介

ZooKeeper 官方提供了多语言客户端,C 语言客户端是最底层的实现之一,功能全面且稳定,适合嵌入式开发、系统级组件、C++ 项目集成等场景。

  • zookeeper.h 是 ZooKeeper 提供的 C 语言客户端头文件,包含了所有 API 函数、常量定义与结构体声明。

  • libzookeeper_mt.so 是其线程安全版本的动态链接库,支持多线程调用。

C API 底层通过 异步非阻塞机制 + 回调函数 + Watcher 模型 与服务端交互,结构灵活但稍显底层,因此常用于构建:

  • 高性能系统组件(如 RPC 框架、服务注册中心)

  • 嵌入式服务发现模块

  • 自定义封装(例如封装为 C++ 类)

文件说明
zookeeper.hC API 头文件(函数声明、状态码、结构体)
libzookeeper_mt.so多线程安全版动态链接库
libzookeeper_st.so单线程版本(适用于无并发环境)

 安装 ZooKeeper C 客户端开发库

方法一:通过包管理器安装

sudo apt update
sudo apt install libzookeeper-mt-dev

安装完成后,关键文件位置如下:

文件路径
头文件/usr/include/zookeeper/zookeeper.h
动态库/usr/lib/x86_64-linux-gnu/libzookeeper_mt.so
pkg-config 支持提供 .pc 文件可用于 CMake 自动识别

方法二:从源码编译安装

下载安装包
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4.tar.gz
tar -zxvf apache-zookeeper-3.8.4.tar.gz
cd apache-zookeeper-3.8.4编译 C 客户端库
cd zookeeper-client/zookeeper-client-c
mkdir build && cd build
cmake ..
make -j4
sudo make install

验证安装成功:

pkg-config --cflags --libs zookeeper输出:
-I/usr/include/zookeeper -lzookeeper_mt

安装完成后,你就可以在 CMakeLists.txt 中这样链接 ZooKeeper:

# 查找并加载 PkgConfig 模块,这是使用 pkg-config 工具的前提条件。
# 如果系统中没有找到 pkg-config,CMake 将会报错并停止配置。
find_package(PkgConfig REQUIRED)# 使用 pkg_check_modules 函数通过 pkg-config 查找名为 "zookeeper" 的库。
# ZK 是变量前缀,相关的信息会被存储在以 ZK_ 开头的变量中:
# - ZK_INCLUDE_DIRS:头文件目录
# - ZK_LIBRARY_DIRS:库文件目录
# - ZK_LIBRARIES:需要链接的库名称列表
# REQUIRED 表示这是一个必须存在的库,如果找不到会报错。
pkg_check_modules(ZK REQUIRED zookeeper)# 将 ZooKeeper 的头文件路径添加到编译器的包含路径中,
# 这样在编译时就可以找到 #include <zookeeper/zookeeper.h> 等头文件。
include_directories(${ZK_INCLUDE_DIRS})# 将 ZooKeeper 的库文件路径添加到链接器的搜索路径中,
# 这样链接器才能找到对应的 .so 或 .a 文件。
link_directories(${ZK_LIBRARY_DIRS})# 将 ZooKeeper 库链接到你的目标可执行文件或库(my_target)上。
# ${ZK_LIBRARIES} 包含了所有需要链接的库名称(例如 -lzookeeper)。
target_link_libraries(my_target ${ZK_LIBRARIES})

ZooKeeper.h常用API介绍

会话管理函数

1. 初始化会话 - zookeeper_init

创建与 ZooKeeper 集群的会话连接,返回会话句柄。

zhandle_t *zookeeper_init(  const char *host, // ZooKeeper 服务器地址列表(逗号分隔,如 "host1:2181,host2:2181")  watcher_fn fn,              // 全局监视器回调函数(会话事件通知)  int recv_timeout,           // 会话超时时间(毫秒,建议 ≥ 2000)  const clientid_t *clientid, // 先前会话的 clientid(断连重试用,首次传 NULL)  void *context,              // 用户自定义上下文(传递到监视器回调)  int flags                   // 保留参数(必须为 0)  
);  

返回值:

  • 成功:指向 zhandle_t 的指针(会话句柄)

  • 失败:NULL(需检查 errno 或调用 zoo_state(zh) 获取错误状态)

说明:

  • 会话超时:recv_timeout 是 ZooKeeper 服务器判断客户端存活的心跳间隔,超时后会话失效,临时节点会被删除。

  • clientid:用于断线重连时恢复会话(通过 zoo_client_id(zh) 获取当前会话的 clientid)。

2. 关闭会话 - zookeeper_close

主动关闭会话并释放资源,所有临时节点会被自动删除。

int zookeeper_close(zhandle_t *zh);  

参数:

  • zhzookeeper_init 返回的会话句柄。

返回值:

  • ZOK (0):关闭成功

  • ZBADARGUMENTS (-8):无效句柄

注意:

  1. 线程安全:必须在所有异步操作完成后再调用 zookeeper_close,否则可能导致内存泄漏。

  2. 资源释放:关闭后不可再使用该句柄发起任何操作。

  3. 临时节点:会话关闭后,其创建的所有临时节点(ZOO_EPHEMERAL)会被服务器自动删除。

会话生命周期辅助函数

1. 获取会话状态 - zoo_state

查询当前会话的连接状态(如是否已连接、认证失败等)。

int zoo_state(zhandle_t *zh);  

返回值:

  • ZOO_CONNECTED_STATE:已连接

  • ZOO_EXPIRED_SESSION_STATE:会话已过期

  • ZOO_AUTH_FAILED_STATE:认证失败

2. 获取会话 ID - zoo_client_id

获取当前会话的 clientid,用于断线重连时恢复会话。

clientid_t *zoo_client_id(zhandle_t *zh);  clientid_t *id = zoo_client_id(zh);  
zhandle_t *new_zh = zookeeper_init(..., id, ...);  // 复用旧会话  

监视器(Watcher)机制

Watcher 是 ZooKeeper 提供的事件通知系统,允许客户端在以下场景中接收异步通知:

  1. 节点变更:数据修改、子节点增减、节点创建/删除

  2. 会话状态变化:连接建立、会话过期、认证失败

其核心设计类似于发布-订阅模型,但具有一次性触发和轻量级的特点。

一次性触发

Watcher 被触发后立即自动注销,后续变更不会通知。避免高频事件风暴(例如频繁数据变更导致大量通知)。

// 注册 Watcher 监听节点数据变化
zoo_wget(zh, "/config", watcher_cb, ctx, buffer, &len, NULL);// 当 /config 数据第一次变更时,watcher_cb 被触发
// 后续 /config 再次变更不会触发回调,除非重新注册

事件类型与路径绑定

每个Watcher 关联到特定路径和特定事件类型(如数据变更、子节点变更),通知中会包含触发事件的路径,方便多节点监听时区分来源。Watcher 回调仅告知事件类型和节点路径,不包含具体数据变更内容,需客户端主动查询最新状态(如触发后调用 zoo_get)。

Watcher工作原理

全局监视器原型 - watcher_fn

处理会话事件(连接状态变化)和节点事件(数据变更、子节点变更)。

typedef void (*watcher_fn)(  zhandle_t *zh,    // 会话句柄  int type,         // 事件类型(如 ZOO_SESSION_EVENT)  int state,        // 连接状态(如 ZOO_CONNECTED_STATE)  const char *path, // 触发事件的节点路径(会话事件为 NULL)  void *watcherCtx  // 用户设置的上下文  
);  

type=会话事件(ZOO_SESSION_EVENT)时会检查state去判断具体的连接状态

状态值含义处理建议
ZOO_CONNECTED_STATE连接已建立恢复业务操作
ZOO_EXPIRED_SESSION_STATE会话过期(临时节点已删除)必须重新初始化会话
ZOO_AUTH_FAILED_STATE认证失败检查 ACL 或重新调用 zoo_add_auth

type=节点事件时(type != ZOO_SESSION_EVENT)state无意义,只需要关注type和path

事件类型触发条件典型注册 API常见用途
ZOO_CREATED_EVENT被监视节点被创建zoo_wexists等待节点就绪
ZOO_DELETED_EVENT被监视节点被删除zoo_wexists / zoo_wget资源释放检查
ZOO_CHANGED_EVENT被监视节点数据变更zoo_wget配置热更新
ZOO_CHILD_EVENT子节点列表变更zoo_wget_children服务实例列表维护

Watcher 的注册方式

1. 全局 Watcher

在 zookeeper_init 初始化会话时设置。仅接收会话状态变化事件(如连接断开、会话过期)。

zhandle_t *zh = zookeeper_init("127.0.0.1:2181", global_watcher, 3000, NULL, NULL, 0);void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *ctx) {if (type == ZOO_SESSION_EVENT) {// 处理会话事件}
}

2. 局部 Watcher

通过 zoo_wexistszoo_wget 等带 w 前缀的 API 注册。监听特定节点的特定事件类型。

// 监听节点创建/删除
zoo_wexists(zh, "/service/new", service_watcher, "create_ctx", NULL);// 监听数据变更
zoo_wget(zh, "/config", data_watcher, "config_ctx", buffer, &len, NULL);

节点操作相关函数

同步节点操作

1. 创建节点 - zoo_create

同步创建指定路径的节点,支持临时(Ephemeral)、顺序(Sequential)和持久化类型。

int zoo_create(  zhandle_t *zh,                 // 会话句柄  const char *path,              // 节点路径(如 "/services/node")  const char *value,             // 节点数据(NULL 表示空数据)  int valuelen,                  // 数据长度(若 value=NULL 则传 -1)  const struct ACL_vector *acl,  // 节点权限(如 &ZOO_OPEN_ACL_UNSAFE)  int flags,                     // 节点标志(ZOO_EPHEMERAL | ZOO_SEQUENCE)  char *path_buffer,             // 输出实际节点路径(用于顺序节点)  int path_buffer_len            // 输出缓冲区长度  
);  

返回值:

  • ZOK :创建成功

  • ZNODEEXISTS :节点已存在

  • ZNOCHILDRENFOREPHEMERALS :尝试在临时节点下创建子节点

  • ZINVALIDSTATE :会话已失效

节点创建标志:

标志常量作用
ZOO_EPHEMERAL1临时节点(会话结束自动删除)
ZOO_SEQUENCE2顺序节点(路径自动追加全局唯一序号,如 /lock-0000000001
ZOO_CONTAINER4容器节点(当最后一个子节点被删除后,服务器会异步删除该容器节点)
ZOO_PERSISTENT0默认持久化节点(显式删除或 ACL 清理前永久存在)

注意:

  • flags:

    • ZOO_EPHEMERAL:临时节点(会话结束自动删除)

    • ZOO_SEQUENCE:顺序节点(路径自动追加全局唯一序号,如 /node0000000001

  • path_buffer:若创建顺序节点,此缓冲区返回实际路径(需预分配足够空间)。

2. 删除节点 - zoo_delete

同步删除指定节点(必须无子节点且版本匹配)。

int zoo_delete(  zhandle_t *zh,         // 会话句柄  const char *path,      // 节点路径  int version            // 期望版本号(-1 表示忽略版本检查)  
);  

返回值:

  • ZOK:删除成功

  • ZNONODE:节点不存在

  • ZNOTEMPTY:节点存在子节点

  • ZBADVERSION:版本号不匹配

注意事项:

  • 版本控制:传入 version=-1 可跳过版本检查,否则需与节点当前版本一致。

  • 原子性:删除操作是原子的,若失败则节点状态不变。

3. 检查节点是否存在 - zoo_exists

同步检查节点是否存在,并可注册监视器(Watcher)监听节点创建/删除事件。

int zoo_exists(  zhandle_t *zh,            // 会话句柄  const char *path,         // 节点路径  int watch,                // 是否注册监视器(1=注册,0=不注册)  struct Stat *stat         // 输出节点元数据(可置 NULL)  
);  

返回值:

  • ZOK:节点存在

  • ZNONODE:节点不存在

  • ZNOAUTH :无权访问

Stat 结构体字段(部分关键字段):

struct Stat {  int64_t czxid;          // 创建该节点的事务ID  int64_t mzxid;          // 最后修改的事务ID  int64_t ctime;          // 创建时间(毫秒 epoch)  int32_t version;        // 数据版本号  int32_t cversion;       // 子节点版本号  int32_t aversion;       // ACL版本号  // ... 其他字段省略  
};  

4. 获取节点数据 - zoo_get

同步获取节点数据及元数据,可注册监视器监听数据变更事件。

int zoo_get(  zhandle_t *zh,            // 会话句柄  const char *path,         // 节点路径  int watch,                // 是否注册监视器(1=注册,0=不注册)  char *buffer,             // 输出数据缓冲区  int *buffer_len,          // 输入缓冲区长度,输出实际数据长度  struct Stat *stat         // 输出节点元数据  
);  

返回值:

  • ZOK:获取成功

  • ZNONODE:节点不存在

  • ZNOAUTH:无权访问

数据读取注意事项:

  1. 缓冲区管理:

    • 调用前需预分配 buffer,并通过 buffer_len 传入其长度。

    • 若缓冲区不足,函数返回 ZOK 但 *buffer_len 会被设为实际所需长度。

  2. 数据截断:若数据超过缓冲区长度,会被静默截断(无错误提示)。

5. 更新节点数据 - zoo_set

同步更新节点数据,支持版本检查。

int zoo_set(  zhandle_t *zh,         // 会话句柄  const char *path,      // 节点路径  const char *buffer,    // 新数据缓冲区  int buflen,            // 数据长度(-1 表示以 strlen(buffer) 计算)  int version            // 期望版本号(-1=忽略版本检查)  
);  

返回值:

  • ZOK:更新成功

  • ZBADVERSION (-103):版本号不匹配

  • ZNONODE:节点不存在

版本控制逻辑:

  • 每次数据更新后,节点的 version 会递增。

  • 若传入 version=5,则仅当节点当前版本为 5 时才会更新。

6. 获取直接子节点列表 - zoo_get_children

同步获取指定节点的所有直接子节点名称列表,并可选择注册监视器监听直接子节点变更事件(创建/删除子节点)。

int zoo_get_children(  zhandle_t *zh,               // 会话句柄  const char *path,            // 父节点路径(如 "/services")  int watch,                   // 是否注册监视器(1=注册,0=不注册)  struct String_vector *children // 输出子节点名称数组,需手动调用 deallocate_String_vector 释放
);  

返回值:

  • ZOK:获取成功

  • ZNONODE:父节点不存在

  • ZNOAUTH:无权访问父节点

  • ZINVALIDSTATE:会话已失效

7. 增强版子节点获取 - zoo_get_children2

在 zoo_get_children 基础上,额外返回父节点的元数据(Stat 结构体)。

int zoo_get_children2(  zhandle_t *zh,               // 会话句柄  const char *path,            // 父节点路径  int watch,                   // 是否注册监视器  struct String_vector *children, // 输出子节点列表  struct Stat *stat            // 输出父节点元数据(可置NULL)  
);  

异步节点操作

zookeeper提供了异步节点操作

所有 zoo_a 前缀函数(如 zoo_acreate)都是异步节点操作

  • 非阻塞模型:调用后立即返回,实际操作由后台线程完成。

  • 回调触发:操作完成时,ZooKeeper 客户端线程会调用注册的回调函数(在 zookeeper_interest() 或 zookeeper_process() 调用线程中执行)。

  • 上下文传递:通过 const void *data 参数传递用户自定义上下文(如结构体指针),实现异步状态管理。

// === 1. 节点创建与删除 ===
int zoo_acreate(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径const char *value,             // 节点数据int valuelen,                  // 数据长度const struct ACL_vector *acl,  // ACL权限int flags,                     // 节点标志(如ZOO_EPHEMERAL)string_completion_t completion, // 回调函数const void *data,               // 用户上下文char *path_buffer,             // 输出实际路径(用于顺序节点)int path_buffer_len            // 缓冲区长度
);int zoo_adelete(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径int version,                   // 版本号(-1忽略版本检查)void_completion_t completion,  // 回调函数const void *data               // 用户上下文
);// === 2. 节点数据读写 ===
int zoo_aget(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径int watch,                     // 是否注册监视器data_completion_t completion,  // 回调函数const void *data               // 用户上下文
);int zoo_aset(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径const char *buffer,            // 新数据int buflen,                    // 数据长度int version,                   // 版本号(-1忽略版本检查)stat_completion_t completion,  // 回调函数const void *data               // 用户上下文
);// === 3. 节点元数据与子节点操作 ===
int zoo_aexists(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径int watch,                     // 是否注册监视器stat_completion_t completion,  // 回调函数const void *data               // 用户上下文
);int zoo_aget_children(zhandle_t *zh,                 // 会话句柄const char *path,              // 节点路径int watch,                     // 是否注册监视器strings_completion_t completion, // 回调函数const void *data               // 用户上下文
);

回调函数类型

(1) 通用无返回值回调 - void_completion_t

处理无返回数据的操作(如 zoo_adeletezoo_aset

typedef void (*void_completion_t)(int rc, const void *data);  rc		操作结果错误码(同同步 API 错误码,如 ZOK、ZNONODE)
data	用户调用异步 API 时传入的上下文指针(如对象指针、请求 ID 等)
void delete_callback(int rc, const void *data) {  const char *req_id = (const char *)data;  if (rc == ZOK) {  printf("[%s] 节点删除成功\n", req_id);  } else {  fprintf(stderr, "[%s] 删除失败: %s\n", req_id, zerror(rc));  }  
}  zoo_adelete(zh, "/path/to/node", -1, delete_callback, "request_123");  

(2) 返回 Stat 元数据的回调 - stat_completion_t

处理需返回节点元数据的操作(如 zoo_aexists

typedef void (*stat_completion_t)(int rc, const struct Stat *stat, const void *data);  stat   节点元数据指针(内存由 ZooKeeper 管理,回调结束后失效)
void exists_callback(int rc, const Stat *stat, const void *data) {  if (rc == ZOK) {  printf("节点存在,最新版本: %d\n", stat->version);  } else if (rc == ZNONODE) {  printf("节点不存在\n");  }  
}  zoo_aexists(zh, "/path/to/node", 1, exists_callback, NULL);  

(3) 返回节点数据的回调 - data_completion_t

处理需返回节点数据的操作(如 zoo_aget

typedef void (*data_completion_t)(int rc, const char *value, int valuelen,  const Stat *stat, const void *data);  value	    节点数据指针(由 ZooKeeper 管理,回调结束后失效,需立即拷贝数据)
valuelen	数据实际长度(若 value=NULL,则 valuelen=-1)
stat指向节点元数据的结构体包含节点的所有状态信息(如版本号、创建时间等),由 ZooKeeper 服务器返回。
data 用户自定义上下文指针,从异步API调用处原样传递到回调函数,用于跨请求传递状态。
void data_callback(int rc, const char *value, int valuelen, const Stat *stat, const void *data) {const char *node_path = (const char *)data; // 从上下文获取节点路径if (rc == ZOK) {if (valuelen > 0) {// 安全拷贝数据(value可能不含终止符)char *data_copy = (char *)malloc(valuelen + 1);memcpy(data_copy, value, valuelen);data_copy[valuelen] = '\0'; // 手动添加终止符printf("[成功] 节点 %s 数据: %s\n", node_path, data_copy);printf("数据版本: %d\n", stat->version);free(data_copy); // 必须释放拷贝的内存} else {printf("[成功] 节点 %s 数据为空\n", node_path);}} else {fprintf(stderr, "[失败] 读取节点 %s 错误: %s\n", node_path, zerror(rc));}
}int ret = zoo_aget(zh, node_path, 1, data_callback, node_path);

(4) 返回子节点列表的回调 - strings_completion_t

处理返回子节点列表的操作(如 zoo_aget_children

typedef void (*strings_completion_t)(int rc, const String_vector *strings,  const void *data);  strings	const 子节点名称数组(由 ZooKeeper 管理,回调结束后失效,需深拷贝数据)typedef struct String_vector {  int32_t count;      // 子节点数量  char **data;        // 子节点名称数组(每个元素为以 '\0' 结尾的字符串)  
} String_vector; 
void children_callback(int rc, const String_vector *strings,  const void *data) {  if (rc == ZOK) {  for (int i = 0; i < strings->count; i++) {  printf("子节点 %d: %s\n", i, strings->data[i]);  }  }  
}  zoo_aget_children(zh, "/parent", 1, children_callback, NULL);  

Watcher节点操作

通过 zoo_wexistszoo_wget 等带 w 前缀的 API 注册。监听特定节点的特定事件类型。

这类函数在同步执行ZooKeeper节点操作(如检查存在、获取数据、列出子节点)的同时,自动注册Watcher监听器,当指定节点发生特定变更(如数据修改、子节点增减、节点创建/删除)时,ZooKeeper会通过回调函数实时通知客户端,实现事件驱动的响应机制。所有Watcher均为一次性触发,触发后需重新注册,适合用于配置热更新、服务发现等需要实时感知节点变化的场景。

// 检查节点是否存在并注册 Watcher(触发事件:ZOO_CREATED_EVENT/ZOO_DELETED_EVENT)
int zoo_wexists(zhandle_t *zh,                  /* 会话句柄 */const char *path,               /* 节点路径 */watcher_fn watcher,             /* 事件回调函数 */void *watcherCtx,               /* 用户上下文数据 */struct Stat *stat               /* 输出节点元数据(可选) */
);// 获取节点数据并注册 Watcher(触发事件:ZOO_CHANGED_EVENT/ZOO_DELETED_EVENT)
int zoo_wget(zhandle_t *zh,                  /* 会话句柄 */const char *path,               /* 节点路径 */watcher_fn watcher,             /* 事件回调函数 */void *watcherCtx,               /* 用户上下文数据 */char *buffer,                   /* 输出数据缓冲区 */int *buffer_len,                /* 输入缓冲区大小/输出实际数据长度 */struct Stat *stat               /* 输出节点元数据(可选) */
);// 获取子节点列表并注册 Watcher(触发事件:ZOO_CHILD_EVENT)
int zoo_wget_children(zhandle_t *zh,                  /* 会话句柄 */const char *path,               /* 父节点路径 */watcher_fn watcher,             /* 事件回调函数 */void *watcherCtx,               /* 用户上下文数据 */struct String_vector *strings   /* 输出子节点名称数组 */
);// 获取子节点列表及父节点元数据(触发事件:ZOO_CHILD_EVENT)
int zoo_wget_children2(zhandle_t *zh,                  /* 会话句柄 */const char *path,               /* 父节点路径 */watcher_fn watcher,             /* 事件回调函数 */void *watcherCtx,               /* 用户上下文数据 */struct String_vector *strings,  /* 输出子节点名称数组 */struct Stat *stat               /* 输出父节点元数据 */
);// 移除已注册的 Watcher
int zoo_remove_watches(zhandle_t *zh,                  /* 会话句柄 */const char *path,               /* 节点路径 */int watcher_type,               /* 监视器类型(ZOO_WATCHER_EVENT等) */watcher_fn watcher,             /* 要移除的回调函数(NULL表示全部) */void *watcherCtx,               /* 要移除的上下文(需匹配注册值) */int local                       /* 是否仅移除本地未触发的 Watcher */
);

错误处理相关函数

(1) 获取错误码字符串描述

const char *zerror(int err);  // 将错误码(如ZCONNECTIONLOSS)转换为可读字符串

常见错误码

错误码常量触发场景
ZOK0操作成功
ZNONODE-101节点不存在(zoo_delete/zoo_get 等操作路径无效)
ZNOAUTH-102无权限操作节点(ACL 限制)
ZBADVERSION-103版本号不匹配(乐观锁冲突,如 zoo_set 传入错误 version)
ZNOCHILDRENFOREPHEMERALS-108临时节点不允许创建子节点
ZNODEEXISTS-110节点已存在(zoo_create 冲突)
ZNOTEMPTY-111节点有子节点(zoo_delete 非空节点)
ZCONNECTIONLOSS-112连接断开(需检查会话状态并重试)
ZOPERATIONTIMEOUT-115操作超时(服务器未响应)
ZINVALIDSTATE-152会话已失效(如调用 zoo_set 时会话过期)

(2) 获取当前会话状态

int zoo_state(zhandle_t *zh);  // 返回会话状态(如ZOO_EXPIRED_SESSION_STATE)

常见状态码

状态码常量含义
ZOO_EXPIRED_SESSION_STATE-112会话已过期(临时节点会被删除,需重新初始化会话)
ZOO_AUTH_FAILED_STATE-113认证失败(ACL 权限不匹配)
ZOO_CONNECTING_STATE1正在连接服务器
ZOO_ASSOCIATING_STATE2正在协商会话参数(如超时时间)
ZOO_CONNECTED_STATE3已连接(会话正常)
ZOO_READONLY_STATE5只读模式连接(连接到 Observer 节点)

Zookeeper实战

配置中心实现

功能描述

  • 将配置存储在 ZooKeeper 节点中

  • 客户端启动时读取配置

  • 注册 Watcher 实现配置热更新

实现原理:

        这段代码实现了一个基于 ZooKeeper 的配置中心模块,其核心功能是从 ZooKeeper 中读取指定配置节点的内容,并在节点内容变化时自动感知更新。初始化时,ConfigCenter 通过 zookeeper_init 建立与 ZooKeeper 的连接,并通过 zoo_wget 获取配置节点(如 /app/config)的值,同时注册一个 watcher 监听器。一旦该节点被修改(如配置更新)、创建或重建,注册的 configWatcher 会被触发,程序自动调用 reloadConfig 重新读取最新配置,实现配置热更新。整个机制利用 ZooKeeper 的数据节点和 watcher 通知能力,实现了轻量、实时、自动刷新的分布式配置中心,确保系统在不重启的前提下可以动态应用配置变更。

// 配置中心类:负责连接 ZooKeeper,读取并监听某个配置节点
class ConfigCenter {
public:ConfigCenter(const std::string& hosts, const std::string& configPath): hosts_(hosts), configPath_(configPath), zh_(nullptr) {}~ConfigCenter() {if (zh_) zookeeper_close(zh_);}// 初始化函数:建立连接并读取配置bool init() {// 建立连接,注册全局 Watcher(Session 状态监听)zh_ = zookeeper_init(hosts_.c_str(), globalWatcher, 3000, nullptr, this, 0);if (!zh_) return false;// 等待连接真正建立成功(阻塞直到状态变为 CONNECTED)while (zoo_state(zh_) != ZOO_CONNECTED_STATE) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 初次读取配置内容return reloadConfig();}std::string getConfig() const {return config_;}private:// 全局 Watcher:监听连接状态变化(如连接成功、断开等)static void globalWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_SESSION_EVENT && state == ZOO_CONNECTED_STATE) {std::cout << "连接建立成功\n";}}// 配置节点 Watcher:监听配置内容变化或节点创建static void configWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {auto self = static_cast<ConfigCenter*>(context);// 节点发生变化,重新加载配置if (type == ZOO_CHANGED_EVENT || type == ZOO_CREATED_EVENT) {std::cout << "配置变更,重新加载...\n";self->reloadConfig();  // 重新读取配置,并重新注册 watcher}}// 读取配置内容,并注册监听器bool reloadConfig() {char buffer[1024];             // 存储配置数据的缓冲区int len = sizeof(buffer);      // 缓冲区大小// 读取节点内容 + 注册 watcherint rc = zoo_wget(zh_, configPath_.c_str(), configWatcher, this, buffer, &len, nullptr);if (rc == ZOK) {// 设置本地配置config_ = std::string(buffer, len);std::cout << "当前配置: " << config_ << "\n";return true;}// 错误处理std::cerr << "读取配置失败: " << zerror(rc) << "\n";return false;}zhandle_t* zh_;            std::string hosts_;       std::string configPath_;   std::string config_;        
};int main() {ConfigCenter config("localhost:2181", "/app/config");if (!config.init()) return 1;// 模拟主程序持续运行,不断使用配置while (true) {std::cout << "使用配置: " << config.getConfig() << "\n";std::this_thread::sleep_for(std::chrono::seconds(5));}
}

服务注册与发现

功能描述

  • 服务启动时注册临时节点

  • 客户端发现所有可用服务

  • 监听服务列表变化

实现原理:

这段代码实现了一个基于 ZooKeeper 的服务注册与发现机制,其核心思想是利用 ZooKeeper 的 临时节点和 watcher 机制 来动态维护服务列表。服务端调用 registerService 方法,在指定路径(如 /services)下创建一个临时节点(如 /services/serviceA),代表该服务在线;如果服务进程宕机或断线,ZooKeeper 会自动删除该节点。客户端通过 startDiscovery 方法调用 zoo_wget_children 获取子节点列表,并注册watcher 监听服务列表变化。当服务上下线时,watcher 被触发,客户端自动重新拉取并更新服务缓存。这样,服务消费者始终能感知当前可用服务的变化,从而实现分布式系统中的动态服务注册与自动发现。

// ServiceRegistry:封装 ZooKeeper 服务注册与发现逻辑
class ServiceRegistry {
public:ServiceRegistry(const std::string& hosts, const std::string& servicePath): hosts_(hosts), servicePath_(servicePath), zh_(nullptr) {}~ServiceRegistry() {if (zh_) zookeeper_close(zh_);  // 释放连接}// 初始化:建立 ZooKeeper 会话连接bool init() {zh_ = zookeeper_init(hosts_.c_str(), nullptr, 3000, nullptr, nullptr, 0);return zh_ != nullptr;}// 服务端调用:注册服务(创建临时节点)bool registerService(const std::string& serviceName) {char pathBuffer[128];  // 用于接收最终创建的路径int rc = zoo_create(zh_,(servicePath_ + "/" + serviceName).c_str(),  // 例如 /services/serviceAnullptr, -1,&ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL,  // 临时节点,断线自动删除pathBuffer, sizeof(pathBuffer));return rc == ZOK;}// 客户端调用:获取当前本地缓存的服务列表std::vector<std::string> discoverServices() {std::lock_guard<std::mutex> lock(mutex_);return services_;}// 客户端调用:启动服务发现(并注册 watcher)bool startDiscovery() {String_vector children;int rc = zoo_wget_children(zh_, servicePath_.c_str(),serviceWatcher, this,&children);if (rc != ZOK) return false;updateServiceList(&children);  // 初始拉取服务列表deallocate_String_vector(&children);  // 释放 ZooKeeper 内部分配的内存return true;}private:// 子节点变化的 watcher 回调函数static void serviceWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_CHILD_EVENT) {auto self = static_cast<ServiceRegistry*>(context);// 重新注册 watcher 并更新列表(一次性 watch)String_vector children;zoo_wget_children(zh, self->servicePath_.c_str(),serviceWatcher, self,&children);self->updateServiceList(&children);deallocate_String_vector(&children);}}// 更新服务列表缓存void updateServiceList(String_vector* children) {std::lock_guard<std::mutex> lock(mutex_);services_.clear();for (int i = 0; i < children->count; ++i) {services_.emplace_back(children->data[i]);}std::cout << "服务列表更新: ";for (const auto& s : services_) std::cout << s << " ";std::cout << "\n";}zhandle_t* zh_;                      // ZooKeeper 连接句柄std::string hosts_;                  // 连接地址,如 localhost:2181std::string servicePath_;            // 服务父路径,如 /servicesstd::vector<std::string> services_;  // 当前发现的服务节点std::mutex mutex_;                   // 保护 services_ 的互斥锁
};// 服务端进程示例:注册服务并保持运行
void runService(const std::string& serviceName) {ServiceRegistry registry("localhost:2181", "/services");if (!registry.init()) {std::cerr << "ZooKeeper 连接失败\n";return;}if (registry.registerService(serviceName)) {std::cout << serviceName << " 注册成功\n";std::this_thread::sleep_for(std::chrono::minutes(10));  // 模拟长时间运行} else {std::cerr << "注册服务失败\n";}
}// 客户端进程示例:持续监听服务列表并选择其中之一使用
void runClient() {ServiceRegistry client("localhost:2181", "/services");if (!client.init() || !client.startDiscovery()) {std::cerr << "客户端初始化失败\n";return;}while (true) {auto services = client.discoverServices();if (services.empty()) {std::cout << "无可用服务\n";} else {// 随机选择一个服务节点模拟访问std::cout << "选择服务: " << services[rand() % services.size()] << "\n";}std::this_thread::sleep_for(std::chrono::seconds(3));  // 模拟请求间隔}
}// 主函数:通过命令行参数区分运行模式
int main(int argc, char** argv) {if (argc < 2) {std::cout << "用法: " << argv[0] << " [server 服务名 | client]\n";return 1;}std::string mode = argv[1];if (mode == "server") {if (argc < 3) {std::cerr << "请提供服务名: " << argv[0] << " server 服务名\n";return 1;}std::string serviceName = argv[2];runService(serviceName);  // 服务端注册} else if (mode == "client") {runClient();  // 客户端监听} else {std::cerr << "未知模式: " << mode << "\n";return 1;}return 0;
}# 启动一个服务端实例,注册为 serviceA
./zookeeper_registry server serviceA# 启动客户端监听
./zookeeper_registry client

分布式锁实现

功能描述

  • 使用顺序临时节点实现公平锁

  • 监听前一个节点释放锁

  • 实现阻塞和非阻塞两种获取方式

实现原理:

        这段代码通过 ZooKeeper 实现了一个分布式锁机制,其核心思路是利用临时顺序节点(EPHEMERAL | SEQUENTIAL) 实现公平竞争与互斥访问。每个客户端线程尝试加锁时,会在指定目录下创建一个唯一的临时顺序节点(如 /locks/resource/lock-00000001)。然后获取该目录下所有子节点,并判断自己是否是编号最小的那个节点——如果是,就成功获得锁;否则,就找到比自己编号小的“前驱节点”,并设置监听(watch),当前驱节点被删除(即锁被释放)后,自己再次获得锁的机会。锁释放的方式是删除自身创建的节点,ZooKeeper 会自动触发 watcher 通知下一个等待者。整个机制确保了竞争公平、自动清理(临时节点)、无单点依赖,是 ZooKeeper 在分布式协调中的经典应用之一。

// 分布式锁类:通过 ZooKeeper 实现锁竞争
class DistributedLock {
public:DistributedLock(zhandle_t* zh, const std::string& lockPath): zh_(zh), lockPath_(lockPath), myNode_(""), acquired_(false) {}// 尝试加锁bool tryLock() {char pathBuffer[256];// 1. 创建 EPHEMERAL + SEQUENTIAL 节点(临时顺序节点)int rc = zoo_create(zh_, (lockPath_ + "/lock-").c_str(), nullptr, -1, &ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL | ZOO_SEQUENCE,pathBuffer, sizeof(pathBuffer));if (rc != ZOK) return false;myNode_ = pathBuffer;  // 记录当前节点路径std::string nodeName = myNode_.substr(myNode_.find_last_of('/') + 1); // 提取 lock-000000xx// 2. 获取所有子节点(所有竞争者)String_vector children;if (zoo_get_children(zh_, lockPath_.c_str(), 0, &children) != ZOK) return false;// 3. 找到当前目录下最小的子节点(最早创建的节点)std::string minNode;for (int i = 0; i < children.count; ++i) {if (minNode.empty() || children.data[i] < minNode) {minNode = children.data[i];}}// 4. 若自己是最小节点,获得锁if (nodeName == minNode) {acquired_ = true;return true;}// 5. 否则,找到比自己小的前一个节点(前驱节点)std::string prevNode;for (int i = 0; i < children.count; ++i) {if (children.data[i] < nodeName && (prevNode.empty() || children.data[i] > prevNode)) {prevNode = children.data[i];}}if (!prevNode.empty()) {// 6. 监听前驱节点是否被删除(代表锁被释放)std::atomic<bool> lockReleased(false);std::string prevPath = lockPath_ + "/" + prevNode;zoo_wexists(zh_, prevPath.c_str(), lockWatcher, &lockReleased, nullptr);// 7. 阻塞等待通知(watcher 回调触发时将 flag 设为 true)while (!lockReleased) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}acquired_ = true;return true;}return false;}// 释放锁void unlock() {if (acquired_) {zoo_delete(zh_, myNode_.c_str(), -1);  // 删除自身临时节点acquired_ = false;}}private:// watch 回调:前驱节点被删除时触发static void lockWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_DELETED_EVENT) {auto* flag = static_cast<std::atomic<bool>*>(context);*flag = true; // 通知等待线程:锁已释放}}zhandle_t* zh_;           // ZooKeeper 连接句柄std::string lockPath_;    // 锁的父路径(如 /locks/resource)std::string myNode_;      // 自己创建的顺序节点路径bool acquired_;           // 是否获得锁标志
};// 模拟临界区操作(各线程争夺执行)
void criticalSection(int id) {static std::mutex coutMutex;{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "进程" << id << " 进入临界区\n";}std::this_thread::sleep_for(std::chrono::seconds(2));{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "进程" << id << " 离开临界区\n";}
}// 每个线程工作函数:不断尝试加锁 → 执行临界区 → 解锁
void worker(zhandle_t* zh, int id) {DistributedLock lock(zh, "/locks/resource");while (true) {if (lock.tryLock()) {criticalSection(id);lock.unlock();}std::this_thread::sleep_for(std::chrono::seconds(1));}
}int main() {// 建立与 ZooKeeper 的连接zhandle_t* zh = zookeeper_init("localhost:2181", nullptr, 3000, nullptr, nullptr, 0);if (!zh) return 1;// 创建锁父节点(仅创建一次,无需判断是否已存在)zoo_create(zh, "/locks", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);zoo_create(zh, "/locks/resource", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);// 启动 3 个客户端线程,模拟分布式竞争std::thread t1(worker, zh, 1);std::thread t2(worker, zh, 2);std::thread t3(worker, zh, 3);t1.join();t2.join();t3.join();zookeeper_close(zh);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913680.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913680.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【openp2p】学习3:【专利分析】一种基于混合网络的自适应切换方法、装 置、设备及介质

本专利与开源项目无关,但可能是实际商用的一种专利。专利地址从此专利,可见p2p的重要性。透传服务可能是实时转发服务,提供中继能力 透传服务可以是指一种通过公网服务器将数据从第一客户端传递到另一个设备 或客户端的服务。这种服务通常用于克服网络中的障碍,如防火墙、…

OpenCV中DPM(Deformable Part Model)目标检测类cv::dpm::DPMDetector

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 OpenCV 中用于基于可变形部件模型&#xff08;DPM&#xff09; 的目标检测器&#xff0c;主要用于行人、人脸等目标的检测。它是一种传统的基于特…

macOS 26快捷指令更新,融入AI打造智能操作体验

快捷指令作为Mac系统中提升用户操作效率的得力助手&#xff0c;在macOS 26中迎来了一次具有突破性的重大更新。此次更新融入了先进的AI技术&#xff0c;推出“智能操作”&#xff08;Intelligent Actions&#xff09;功能&#xff0c;让快捷指令从简单的自动化工具升级为真正的…

InstructBLIP:迈向具备指令微调能力的通用视觉语言模型

温馨提示&#xff1a; 本篇文章已同步至"AI专题精讲" InstructBLIP&#xff1a;迈向具备指令微调能力的通用视觉语言模型 摘要 大规模的预训练与instruction tuning在构建通用语言模型方面已取得显著成效。然而&#xff0c;构建通用的视觉-语言模型仍然具有挑战性&…

基于dropbear实现嵌入式系统ssh服务端与客户端完整交互

以下基于 Dropbear 实现 SSH 服务端与客户端交互的完整步骤&#xff0c;涵盖服务端部署、客户端连接、认证配置及消息传输&#xff0c;结合了多篇权威资料的核心实践&#xff1a;环境准备与安装 服务端安装 • Linux 系统&#xff08;以 Ubuntu/CentOS 为例&#xff09; Ubuntu…

深圳安锐科技发布国内首款4G 索力仪!让斜拉桥索力自动化监测更精准高效

近日&#xff0c;深圳安锐科技正式发布国内首款无线自供电、一体化的斜拉索实时监测设备 “4G索力监测仪”&#xff0c;成功攻克了传统桥梁索体监测领域长期存在的实时性差、布设困难和成本高昂的行业难题&#xff0c;为斜拉桥、系杆拱桥提供全无线、自动化、云端实时同步的索力…

Pipeline 引用外部数据源最佳实践

场景解析在企业网络安全日志处理场景中&#xff0c;防火墙、入侵检测系统&#xff08;IDS&#xff09;等设备会持续产生大量日志&#xff0c;记录网络流量、访问请求、异常事件等基础信息&#xff0c;但这些原始日志仅能呈现表面现象&#xff0c;难以全面剖析安全威胁&#xff…

UI + MCP Client + MCP Server(并且链接多个Server)

项目结构前端项目--------->MCP Client----------->MCP Serverserver就不过多赘述了&#xff0c;他只是相当于添加了多个的tools 链接前后端 http.createServer创建一个服务器// ---------------------------------------------------------------- // server.js import …

香港站群服务器与普通香港服务器对比

在选择香港服务器时&#xff0c;用户常常会遇到"站群服务器"和"普通服务器"两种选项&#xff0c;虽然它们都基于香港数据中心的基础设施&#xff0c;但在 IP 地址配置、功能定位和管理复杂度、成本上存在显著差异&#xff0c;理解这些差异有助于用户根据实…

4.B树和B+树的区别?为什么MySQL选择B+树作为索引?

区别&#xff1a;1.数据存储位置B树每个节点都存储了索引和数据B树只有叶子节点存储数据&#xff0c;非叶子节点仅存储索引2.叶子节点的链接B树的所有叶子节点通过指针连接成一个双向链表&#xff0c;可以高效地进行范围查询或者顺序遍历B树则没有这样的连接关系&#xff0c;查…

转换狂魔,Modbus TCP转Profinet网关打通视觉传感线连接之路

在汽车零部件冲压生产线的世界中&#xff0c;液压机的压力稳定性是确保产品质量的秘密武器。然而&#xff0c;旧时代的人工巡检和传统监测方式却好似拖累现代化进程的沉重枷锁&#xff1a;效率低、成本高&#xff0c;还总是赶不上实时反馈的快车。这时&#xff0c;工厂决心大刀…

C++进阶—二叉树进阶

第一章&#xff1a;内容安排说明 map和set特性需要先铺垫二叉搜索树&#xff0c;而二叉搜索树也是一种树形结构二叉搜索树的特性了解&#xff0c;有助于更好的理解map和set的特性二叉树中部分面试题稍微有点难度&#xff0c;在前面讲解大家不容易接受&#xff0c;且时间长容易…

驱动下一代E/E架构的神经脉络进化—10BASE-T1S

汽车电子电气架构的演进正经历一场深刻的变革&#xff0c;“中央计算单元区域控制器”的架构模式已成为当前主流车型平台发展的明确方向。这种从传统的“功能域”&#xff08;Domain&#xff09;架构向“区域”&#xff08;Zonal&#xff09;架构的转型升级&#xff0c;旨在实现…

某学校系统中挖矿病毒应急排查

本篇文章主要记录某学校长期未运营维护的程序&#xff0c;被黑客发现了漏洞&#xff0c;但好在学校有全流量设备&#xff0c;抓取到了过程中的流量包 需要你进行上机以及结合流量分析&#xff0c;排查攻击者利用的漏洞以及上传利用成功的木马 文章目录靶机介绍1.使用工具分析共…

vue 、react前端页面支持缩放,echarts、地图点击左边不准的原因和解决办法

原因 由于以上都是通过canvas画布生成的&#xff0c;一旦初始化&#xff0c;就会按照比例进行缩放&#xff0c;但与此同时&#xff0c;比例尺并没有变化&#xff0c;导致坐标偏移 解决办法 设置一个zoomVal产量&#xff0c;在页面加载时计算缩放比例&#xff0c;然后在canvas容…

(LeetCode 每日一题) 1353. 最多可以参加的会议数目 (优先队列、小顶堆)

题目&#xff1a;1353. 最多可以参加的会议数目 思路&#xff1a;优先队列实现小顶堆&#xff0c;0(mx*logn) 在第i天&#xff0c;优先选endDay最小的那一个活动进行。那么遍历每一天&#xff0c;用小顶堆来维护每个活动的最后一天即可&#xff0c;细节看注释。 C版本&#xf…

Java结构型模式---代理模式

代理模式基础概念代理模式是一种结构型设计模式&#xff0c;其核心思想是通过创建一个代理对象来控制对另一个真实对象的访问。代理对象在客户端和真实对象之间起到中介作用&#xff0c;允许在不改变真实对象的前提下&#xff0c;对其进行增强或控制。代理模式的核心组件主题接…

MySQL流程控制函数全解析

MySQL 中的流程控制函数&#xff08;也称为条件函数&#xff09;允许你在 SQL 语句中进行逻辑判断&#xff0c;根据不同的条件返回不同的值或执行不同的操作。它们极大地增强了 SQL 的灵活性和表达能力&#xff0c;尤其在进行数据转换、结果格式化、条件聚合和复杂业务逻辑实现…

【7】PostgreSQL 事务

【7】PostgreSQL 事务前言使用事务事务内错误处理事务保存点DDL 事务前言 在 PostgreSQL 中&#xff0c;每一个操作都是一个事务。即使一个简单的查询(select)&#xff0c;这也是一个事务。 例如&#xff1a; postgres# select now();now --------------------…

Linux:多线程---深入互斥浅谈同步

文章目录1. 互斥1.1 为什么需要互斥1.2 互斥锁1.3 初谈互斥与同步1.4 锁的原理1.5 可重入VS线程安全1.6 死锁1.7 避免死锁的算法&#xff08;扩展&#xff09;序&#xff1a;在上一章中我们知道了线程控制的三个角度&#xff1a;线程创建、线程等待和线程终止&#xff0c;分别从…