- 哨兵(centennial)负责接待客人,直接与调用方对接。
- 哨兵的核心组件包括service HUB和connection pool。
- service HUB用于与服务中心通信,获取可提供服务的节点信息。
- connection pool用于缓存与index worker的连接,避免每次搜索时重新建立连接。
- 连接池初始化为空map。
- 提供函数获取指定endpoint的GRPC连接。
- 函数首先检查本地缓存中是否有可用连接,若无则创建新连接。
- 创建连接时默认立即返回,可选阻塞模式直到连接可用。
- 连接建立后放入缓存并返回。
- 哨兵提供添加、删除和搜索三个核心功能。
- 添加功能:随机选择一台index worker添加新文档。
- 删除功能:遍历所有endpoint,并行删除指定文档。
- 搜索功能:将搜索请求发送到所有endpoint,合并搜索结果。
- 使用channel进行并发搜索结果的收集。
- 上游并发写入channel,下游读取channel数据到切片。
- 使用wait group等待所有搜索任务完成。
- 关闭channel后仍可读取,确保读取到所有数据。
package index_serviceimport ("context""fmt""github.com/jmh000527/criker-search/index_service/service_hub""github.com/jmh000527/criker-search/types""github.com/jmh000527/criker-search/utils""google.golang.org/grpc""google.golang.org/grpc/connectivity""google.golang.org/grpc/credentials/insecure""sync""sync/atomic""time"
)
type Sentinel struct {hub service_hub.ServiceHub connPool sync.Map
}
func NewSentinel(etcdServers []string) *Sentinel {return &Sentinel{hub: service_hub.GetServiceHubProxy(etcdServers, 3, 100), connPool: sync.Map{}, }
}
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {v, exists := sentinel.connPool.Load(endpoint)if exists {conn := v.(*grpc.ClientConn)if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {utils.Log.Printf("连接到 endpoint %s 的状态为 %s", endpoint, conn.GetState().String())conn.Close()sentinel.connPool.Delete(endpoint)} else {return conn}}ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer cancel()grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())if err != nil {utils.Log.Printf("连接到 %s 的 gRPC 失败,错误: %s", endpoint, err.Error())return nil}utils.Log.Printf("连接到 %s 的 gRPC 成功", endpoint)sentinel.connPool.Store(endpoint, grpcConn)return grpcConn
}
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {endpoint := sentinel.hub.GetServiceEndpoint(IndexService)if len(endpoint) == 0 {return 0, fmt.Errorf("未找到服务 %s 的有效节点", IndexService)}grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {return 0, fmt.Errorf("连接到 %s 的 gRPC 失败", endpoint)}client := NewIndexServiceClient(grpcConn)affected, err := client.AddDoc(context.Background(), &doc)if err != nil {return 0, err}utils.Log.Printf("成功向 worker %s 添加 %d 个文档", endpoint, affected.Count)return int(affected.Count), nil
}
func (sentinel *Sentinel) DeleteDoc(docId string) int {endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var n int32wg := sync.WaitGroup{}wg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 失败", endpoint)return}client := NewIndexServiceClient(grpcConn)affected, err := client.DeleteDoc(context.Background(), &DocId{docId})if err != nil {utils.Log.Printf("从 worker %s 删除文档 %s 失败,错误: %s", endpoint, docId, err)return}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("从 worker %s 删除文档 %s 成功", endpoint, docId)}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return nil}docs := make([]*types.Document, 0, 1000)resultChan := make(chan *types.Document, 1000)var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 连接失败", endpoint)return}client := NewIndexServiceClient(grpcConn)searchResult, err := client.Search(context.Background(), &SearchRequest{Query: query,OnFlag: onFlag,OffFlag: offFlag,OrFlags: orFlags,})if err != nil {utils.Log.Printf("向 worker %s 执行查询 %s 失败,错误: %s", endpoint, query, err)return}if len(searchResult.Results) > 0 {utils.Log.Printf("向 worker %s 执行查询 %s 成功,获取到 %v 个文档", endpoint, query, len(searchResult.Results))for _, result := range searchResult.Results {resultChan <- result}}}(endpoint)}signalChan := make(chan struct{})go func() {for doc := range resultChan {docs = append(docs, doc)}signalChan <- struct{}{}}()wg.Wait()close(resultChan)<-signalChanreturn docs
}
func (sentinel *Sentinel) Count() int {var n int32endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn != nil {client := NewIndexServiceClient(grpcConn)affected, err := client.Count(context.Background(), new(CountRequest))if err != nil {utils.Log.Printf("从 worker %s 获取文档数量失败: %s", endpoint, err)}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("worker %s 共有 %d 个文档", endpoint, affected.Count)}}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Close() (err error) {sentinel.connPool.Range(func(key, value any) bool {conn := value.(*grpc.ClientConn)err = conn.Close()return true})sentinel.hub.Close()return
}