Golang | 搜索哨兵-对接分布式gRPC服务

  • 哨兵(centennial)负责接待客人,直接与调用方对接。
  • 哨兵的核心组件包括service HUB和connection pool。
  • service HUB用于与服务中心通信,获取可提供服务的节点信息。
  • connection pool用于缓存与index worker的连接,避免每次搜索时重新建立连接。
  • 连接池初始化为空map。
  • 提供函数获取指定endpoint的GRPC连接。
  • 函数首先检查本地缓存中是否有可用连接,若无则创建新连接。
  • 创建连接时默认立即返回,可选阻塞模式直到连接可用。
  • 连接建立后放入缓存并返回。
  • 哨兵提供添加、删除和搜索三个核心功能。
  • 添加功能:随机选择一台index worker添加新文档。
  • 删除功能:遍历所有endpoint,并行删除指定文档。
  • 搜索功能:将搜索请求发送到所有endpoint,合并搜索结果。
  • 使用channel进行并发搜索结果的收集。
  • 上游并发写入channel,下游读取channel数据到切片。
  • 使用wait group等待所有搜索任务完成。
  • 关闭channel后仍可读取,确保读取到所有数据。
package index_serviceimport ("context""fmt""github.com/jmh000527/criker-search/index_service/service_hub""github.com/jmh000527/criker-search/types""github.com/jmh000527/criker-search/utils""google.golang.org/grpc""google.golang.org/grpc/connectivity""google.golang.org/grpc/credentials/insecure""sync""sync/atomic""time"
)// Sentinel 哨兵前台,与外部系统对接的接口。
type Sentinel struct {hub      service_hub.ServiceHub // 从 Hub 中获取 IndexServiceWorker 的集合。可以直接访问 ServiceHub,也可能通过代理模式进行访问。connPool sync.Map               // 与各个 IndexServiceWorker 建立的 gRPC 连接池。缓存连接以避免每次请求都重新建立连接,提升效率。
}// NewSentinel 创建并返回一个 Sentinel 实例。
//
// 参数:
//   - etcdServers: 一个字符串数组,包含了 etcd 服务器的地址。
//
// 返回值:
//   - *Sentinel: 一个新的 Sentinel 实例。
func NewSentinel(etcdServers []string) *Sentinel {return &Sentinel{// hub: GetServiceHub(etcdServers, 10), // 直接访问 ServiceHubhub:      service_hub.GetServiceHubProxy(etcdServers, 3, 100), // 使用代理模式访问 ServiceHubconnPool: sync.Map{},                                          // 初始化 gRPC 连接池}
}// GetGrpcConn 向指定的 endpoint 建立 gRPC 连接。
// 如果连接已经存在于缓存中且状态可用,则直接返回缓存的连接。
// 如果连接状态不可用或不存在,则重新建立连接并存储到缓存中。
//
// 参数:
//   - endpoint: 要连接的 gRPC 服务的地址。
//
// 返回值:
//   - *grpc.ClientConn: 返回与 endpoint 建立的 gRPC 连接,如果连接失败则返回 nil。
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {v, exists := sentinel.connPool.Load(endpoint)// 连接缓存中存在if exists {conn := v.(*grpc.ClientConn)// 如果连接状态不可用,则从连接缓存中删除if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {utils.Log.Printf("连接到 endpoint %s 的状态为 %s", endpoint, conn.GetState().String())conn.Close()sentinel.connPool.Delete(endpoint)} else {return conn}}// 连接到服务,控制连接超时ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer cancel()// 获取 gRPC 连接// grpc.Dial 是异步连接,连接状态为正在连接。// 如果设置了 grpc.WithBlock 选项,则会阻塞等待(等待握手成功)。// 需要注意的是,当未设置 grpc.WithBlock 时,ctx 超时控制对其无任何效果。grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())if err != nil {utils.Log.Printf("连接到 %s 的 gRPC 失败,错误: %s", endpoint, err.Error())return nil}utils.Log.Printf("连接到 %s 的 gRPC 成功", endpoint)// 将 gRPC 连接缓存到连接池中sentinel.connPool.Store(endpoint, grpcConn)return grpcConn
}// AddDoc 向集群中的 IndexService 添加文档。如果文档已存在,会先删除旧文档再添加新文档。
//
// 参数:
//   - doc: 要添加的文档,类型为 types.Document。
//
// 返回值:
//   - int: 成功添加的文档数量。
//   - error: 如果在添加文档时出现错误,返回相应的错误信息。
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {// 根据负载均衡策略,选择一个 IndexService 节点,将文档添加到该节点endpoint := sentinel.hub.GetServiceEndpoint(IndexService)if len(endpoint) == 0 {return 0, fmt.Errorf("未找到服务 %s 的有效节点", IndexService)}// 创建到该节点的 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {return 0, fmt.Errorf("连接到 %s 的 gRPC 失败", endpoint)}// 创建 gRPC 客户端并进行调用client := NewIndexServiceClient(grpcConn)affected, err := client.AddDoc(context.Background(), &doc)if err != nil {return 0, err}utils.Log.Printf("成功向 worker %s 添加 %d 个文档", endpoint, affected.Count)return int(affected.Count), nil
}// DeleteDoc 从集群中删除与 docId 对应的文档,返回成功删除的文档数量(通常不会超过 1)。
//
// 参数:
//   - docId: 要删除的文档的唯一标识符。
//
// 返回值:
//   - int: 成功删除的文档数量。
func (sentinel *Sentinel) DeleteDoc(docId string) int {// 获取该服务的所有 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var n int32wg := sync.WaitGroup{}wg.Add(len(endpoints))for _, endpoint := range endpoints {// 并行地向各个 IndexServiceWorker 删除对应的 docId 的文档。// 正常情况下,只有一个 worker 上有该文档。go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 失败", endpoint)return}client := NewIndexServiceClient(grpcConn)affected, err := client.DeleteDoc(context.Background(), &DocId{docId})if err != nil {utils.Log.Printf("从 worker %s 删除文档 %s 失败,错误: %s", endpoint, docId, err)return}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("从 worker %s 删除文档 %s 成功", endpoint, docId)}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}// Search 执行检索操作,并返回文档列表。
//
// 参数:
//   - query: 指定的检索查询条件,类型为 *types.TermQuery。
//   - onFlag: 开启的标志位,类型为 uint64。
//   - offFlag: 关闭的标志位,类型为 uint64。
//   - orFlags: OR 标志位的切片,类型为 []uint64。
//
// 返回值:
//   - []*types.Document: 经过检索的文档列表,可能为空。
//
// 详细描述:
//  1. 从服务中心获取所有的 endpoints。
//  2. 使用 goroutines 并行地对每个 endpoint 执行检索操作。
//  3. 将每个检索结果发送到 resultChan 通道中。
//  4. 在另一个 goroutine 中,从 resultChan 通道中读取结果,并将其存储在 docs 切片中。
//  5. 等待所有的检索操作完成后,关闭 resultChan,并等待从 resultChan 中读取完所有结果。
//  6. 返回存储的文档列表。
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {// 获取该服务所有的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return nil}// 用于存储检索结果的切片和通道docs := make([]*types.Document, 0, 1000)resultChan := make(chan *types.Document, 1000)// 使用 WaitGroup 并行开启协程去每个 endpoint 执行检索操作var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 获取 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 连接失败", endpoint)return}client := NewIndexServiceClient(grpcConn)// 执行检索请求searchResult, err := client.Search(context.Background(), &SearchRequest{Query:   query,OnFlag:  onFlag,OffFlag: offFlag,OrFlags: orFlags,})if err != nil {utils.Log.Printf("向 worker %s 执行查询 %s 失败,错误: %s", endpoint, query, err)return}if len(searchResult.Results) > 0 {utils.Log.Printf("向 worker %s 执行查询 %s 成功,获取到 %v 个文档", endpoint, query, len(searchResult.Results))for _, result := range searchResult.Results {resultChan <- result}}}(endpoint)}// 启动另一个 goroutine 从 resultChan 中获取结果signalChan := make(chan struct{})go func() {for doc := range resultChan {docs = append(docs, doc)}// 读取完成,通知主 goroutinesignalChan <- struct{}{}}()// 等待所有检索操作完成wg.Wait()// 关闭 resultChan 通道close(resultChan)// 等待结果读取完毕<-signalChanreturn docs
}// Count 获取所有服务中的搜索条目数量。
//
// 参数:
//   - 无参数。
//
// 返回值:
//   - int: 所有服务中的文档总数量。
//
// 详细描述:
//  1. 从服务中心获取所有的 endpoints。
//  2. 使用 goroutines 并行地对每个 endpoint 执行计数操作。
//  3. 将每个 worker 中的文档数量累加到总计数中。
//  4. 等待所有计数操作完成后,返回文档总数量。
func (sentinel *Sentinel) Count() int {var n int32// 获取所有服务的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 获取 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn != nil {client := NewIndexServiceClient(grpcConn)// 执行计数请求affected, err := client.Count(context.Background(), new(CountRequest))if err != nil {utils.Log.Printf("从 worker %s 获取文档数量失败: %s", endpoint, err)}if affected.Count > 0 {// 累加计数atomic.AddInt32(&n, affected.Count)utils.Log.Printf("worker %s 共有 %d 个文档", endpoint, affected.Count)}}}(endpoint)}// 等待所有计数操作完成wg.Wait()return int(atomic.LoadInt32(&n))
}// Close 关闭各个grpc client连接,关闭etcd client连接
func (sentinel *Sentinel) Close() (err error) {sentinel.connPool.Range(func(key, value any) bool {conn := value.(*grpc.ClientConn)err = conn.Close()return true})sentinel.hub.Close()return
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/85370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS3实现的账号密码输入框提示效果

以下是通过CSS3实现输入框提示效果的常用方法&#xff0c;包含浮动标签和动态提示两种经典实现方案&#xff1a; 一、浮动标签效果 <div class"input-group"><input type"text" required><label>用户名</label> </div><…

maven编译时跳过test过程

如果代码里有无法在打包环境中测试的部分&#xff0c;则直接运行mvn clean package&#xff0c;因为测试失败&#xff0c;会导致打包失败。目前有两种方式可以跳过测试&#xff1a; 1. mvn clean package -DskipTests&#xff0c;这会跳过执行阶须&#xff0c;但仍会生成测试所…

美业+智能体,解锁行业转化新密码(2/6)

摘要&#xff1a;中国美业市场近年蓬勃发展&#xff0c;规模持续扩大&#xff0c;预计不久将突破万亿级别&#xff0c;但同时也面临着诸多挑战&#xff0c;如获客成本攀升、服务质量不稳定、难以满足消费者多元化个性化需求等。智能体技术的出现为美业带来了新的发展机遇&#…

设计模式——责任链设计模式(行为型)

摘要 责任链设计模式是一种行为型设计模式&#xff0c;旨在将请求的发送者与接收者解耦&#xff0c;通过多个处理器对象按链式结构依次处理请求&#xff0c;直到某个处理器处理为止。它包含抽象处理者、具体处理者和客户端等核心角色。该模式适用于多个对象可能处理请求的场景…

react/vue移动端项目,刷新页面404的原因以及解决办法

一 、 项目 移动端 二、背景 1、问题描述&#xff1a;react/vue移动端项目&#xff0c;正常的页面操作跳转&#xff0c;不会出现404的问题&#xff0c;但是一旦刷新&#xff0c;就会出现404报错 2、产生原因&#xff1a; React Router是客户端的路由&#xff0c;当再次刷新时…

数据结构-算法学习C++(入门)

目录 03二进制和位运算04 选择、冒泡、插入排序05 对数器06 二分搜索07 时间复杂度和空间复杂度08 算法和数据结构09 单双链表09.1单双链表及反转09.2合并链表09.2两数相加09.2分隔链表 013队列、栈、环形队列013.1队列013.2栈013.3循环队列 014栈-队列的相互转换014.1用栈实现…

用JS实现植物大战僵尸(前端作业)

1. 先搭架子 整体效果&#xff1a; 点击开始后进入主场景 左侧是植物卡片 右上角是游戏的开始和暂停键 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevic…

深入理解设计模式之代理模式

深入理解设计模式之&#xff1a;代理模式 一、什么是代理模式&#xff1f; 代理模式&#xff08;Proxy Pattern&#xff09;是一种结构型设计模式。它为其他对象提供一种代理以控制对这个对象的访问。代理对象在客户端和目标对象之间起到中介作用&#xff0c;可以在不改变目标…

Ubuntu设置之初始化

安装SSH服务 # 安装 OpenSSH Server sudo apt update sudo apt install -y openssh-server# 检查 SSH 服务状态 sudo systemctl status ssh # Active: active (running) since Sat 2025-05-31 17:13:07 CST; 6s ago# 重启服务 sudo systemctl restart ssh自定义分辨率 新…

【仿生机器人】极具前瞻性的架构——认知-情感-记忆“三位一体的仿生机器人系统架构

基于您的深度需求分析&#xff0c;我将为您设计一个全新的"认知-情感-记忆"三位一体的仿生机器人系统架构。以下是经过深度优化的解决方案&#xff1a; 一、核心架构升级&#xff08;三体认知架构&#xff09; 采用量子纠缠式架构设计&#xff1a; 认知三角&#xf…

Python量化交易12——Tushare全面获取各种经济金融数据

两年前写过Tushare的简单使用&#xff1a; Python量化交易08——利用Tushare获取日K数据_skshare- 现在更新一下吧&#xff0c;这两年用过不少的金融数据库&#xff0c;akshare&#xff0c;baostock&#xff0c;雅虎的&#xff0c;pd自带的......发现还是Tushare最稳定最好用&…

python打卡day39@浙大疏锦行

知识点回顾 图像数据的格式&#xff1a;灰度和彩色数据模型的定义显存占用的4种地方 模型参数梯度参数优化器参数数据批量所占显存神经元输出中间状态 batchisize和训练的关系 1. 图像数据格式 - 灰度图像 &#xff1a;单通道&#xff0c;像素值范围通常0-255&#xff0c;形状为…

源码解析(二):nnUNet

原文 &#x1f600; nnU-Net 是一个用于生物医学图像分割的自配置深度学习框架&#xff0c;可自动适应不同的数据集。可用于处理和训练可能规模庞大的二维和三维医学图像。该系统分析数据集属性并配置优化的基于 U-Net 的分割流程&#xff0c;无需手动参数调整或深度学习专业知…

clickhouse如何查看操作记录,从日志来查看写入是否成功

背景 插入表数据后&#xff0c;因为原本表中就有数据&#xff0c;一时间没想到怎么查看插入是否成功&#xff0c;因为对数据源没有很多的了解&#xff0c;这时候就想怎么查看下插入是否成功呢&#xff0c;于是就有了以下方法 具体方法 根据操作类型查找&#xff0c;比如inse…

udp 传输实时性测量

UDP&#xff08;用户数据报协议&#xff09;是一种无连接的传输协议&#xff0c;适用于实时性要求较高的应用&#xff0c;如视频流、音频传输和游戏等。测量UDP传输的实时性可以通过多种工具和方法实现&#xff0c;以下是一些常见的方法和工具&#xff1a; 1. 使用 iperf 测试…

pikachu通关教程- over permission

如果使用A用户的权限去操作B用户的数据&#xff0c;A的权限小于B的权限&#xff0c;如果能够成功操作&#xff0c;则称之为越权操作。 越权漏洞形成的原因是后台使用了 不合理的权限校验规则导致的。 水平越权 当我们以Lucy账号登录&#xff0c;查询个人信息时&#xff0c;会有…

nc 命令示例

nc -zv 实用示例 示例 1&#xff1a;测试单个 TCP 端口&#xff08;最常见&#xff09; 目标&#xff1a; 检查主机 webserver.example.com 上的 80 端口 (HTTP) 是否开放。 nc -zv webserver.example.com 80成功输出&#xff1a; Connection to webserver.example.com (19…

Redis是什么

注&#xff1a;本人不懂Redis是什么&#xff0c;问的大模型&#xff0c;让它用生动浅显的语言向我解释。为了防止忘记&#xff0c;我把它说的记录下来。接下来的解释都是大模型生成的&#xff0c;如果有错误的地方欢迎指正 。 Redis 是什么&#xff1f;&#xff08;一句话解释&…

CVE-2021-28164源码分析与漏洞复现

漏洞概述 漏洞名称&#xff1a;Jetty 路径解析逻辑漏洞导致 WEB-INF 敏感信息泄露 漏洞编号&#xff1a;CVE-2021-28164 CVSS 评分&#xff1a;7.5 影响版本&#xff1a;Jetty 9.4.37 - 9.4.38 修复版本&#xff1a;Jetty ≥ 9.4.39 漏洞类型&#xff1a;路径遍历/信息泄露 C…

颠覆传统!单样本熵最小化如何重塑大语言模型训练范式?

颠覆传统&#xff01;单样本熵最小化如何重塑大语言模型训练范式&#xff1f; 大语言模型&#xff08;LLM&#xff09;的训练往往依赖大量标注数据与复杂奖励设计&#xff0c;但最新研究发现&#xff0c;仅用1条无标注数据和10步优化的熵最小化&#xff08;EM&#xff09;方法…