用go从零构建写一个RPC(3)--异步调用+多路复用实现

在前两个版本中,我们实现了基础的客户端-服务端通信、连接池、序列化等关键模块。为了进一步提升吞吐量和并发性能,本版本新增了 异步发送机制多路复用支持,旨在减少资源消耗、提升连接利用率。
代码地址:https://github.com/karatttt/MyRPC

版本三新增特性

异步发送机制实现

背景:
在同步RPC调用中,客户端每发送一次请求都需阻塞等待响应,这在网络抖动或响应较慢时会严重降低系统吞吐量。因此,本版本引入了 异步任务模型,支持超时重试、指数退避、完成回调等能力,确保在客户端请求失败后可以自动重试、不中断主逻辑。

实现思路:

  • 实际上异步回调的功能很好实现,只需要将回调方法传入内部,当内部状态为成功或者完成的时候调用该callback方法即可。
  • 而异步超时失败重试的机制实际上是让客户端的发送请求交由另一个协程来做,客户端可以先执行其他的逻辑再阻塞等待future的结果,或者设置一个回调方法,或者不关心回复。异步发送实际上就是牺牲了可靠性,而重试是为了尽量提高这个可靠性。超时重试这个可以通过在协程内通过计时器重试,如果超时则在同一个协程中再进行一次发送,直到重试到大于最大重试次数。但是这样会导致等待重试的协程数量太多,对于某一时间段网络出现抖动的情况,出现了大量的重试,就会导致协程数剧增的情况。
  • 借鉴了RocketMQ的异步发送的机制,采用了一个协程统一管理需要重试的任务,并用一个延时队列来排序处理任务

Client的变动
为了区分同步发送,为异步增加了异步的proxy和异步的send方法

// 创建客户端代理
func NewHelloAsyncClientProxy(opts ...client.Option) HelloAsyncClientProxy {return &HelloAsyncClientProxyImpl{client: client.DefaultClient,opts:   opts,}
}
// 实现HelloAsync方法
func (c *HelloAsyncClientProxyImpl) HelloAsync(ctx context.Context, req *HelloRequest, opts ...client.Option) (*internel.Future, *common.RPCError) {msg := internel.NewMsg()msg.WithServiceName("helloworld")msg.WithMethodName("Hello")ctx = context.WithValue(ctx, internel.ContextMsgKey, msg)rsp := &HelloReply{}// 这里需要将opts添加前面newProxy时传入的optsnewOpts := append(c.opts, opts...)return c.client.InvokeAsync(ctx, req, rsp, newOpts...)
}
  • 这里是rpc.go中新增的代理以及实现方法,还没有体现异步发送的逻辑,接下来看InvokeAsync

InvokeAsync

func (c *client) InvokeAsync(ctx context.Context, reqBody interface{}, rspBody interface{}, opt ...Option) (*internel.Future, *common.RPCError) {future := internel.NewFuture()opts := DefaultOptionsfor _, o := range opt {o(opts)}go func() {var task *async.Taskif opts.Timeout > 0 {// 有超时时间的情况下,无论是否进行重试,将任务提交给全局管理器ctx, msg := internel.GetMessage(ctx)task = &async.Task{MethodName:  msg.GetMethodName(),Request:     reqBody,MaxRetries:  opts.RetryTimes,Timeout:     opts.Timeout,ExecuteFunc: c.makeRetryFunc(ctx, reqBody, rspBody, opts),OnComplete: func(err error) {// 最终结果回调到原始Futureif err != nil {future.SetResult(nil, &common.RPCError{Code:    common.ErrCodeRetryFailed,Message: err.Error(),})} else {future.SetResult(rspBody, nil)}},}// 提交任务到全局管理器task.Status = async.TaskStatusPendingfuture.Task = taskasync.GetGlobalTaskManager().AddTask(task)// 执行发送逻辑err := opts.ClientTransport.Send(ctx, reqBody, rspBody, opts.ClientTransportOption)if err == nil {future.SetResult(rspBody, nil)}} else {// 无超时时间的情况下,错误的话直接返回err := opts.ClientTransport.Send(ctx, reqBody, rspBody, opts.ClientTransportOption)if err == nil {future.SetResult(rspBody, nil)} else {future.SetResult(nil, &common.RPCError{Code:    common.ErrCodeClient,Message: err.Error(),})}}}()return future, nil
}
  • 我们先看看Future结构,再去理解上面的代码:
	type Future struct {mu        sync.Mutexdone      chan struct{}result    interface{}err       *common.RPCErrorcallbacks []func(interface{}, *common.RPCError)Task      *async.Task // 关联的异步任务}// SetResult 设置Future的结果func (f *Future) SetResult(result interface{}, err *common.RPCError) {f.mu.Lock()defer f.mu.Unlock()if f.isDone() {return}f.result = resultf.Task.Status = async.TaskStatusCompletedf.err = errclose(f.done)// 执行所有注册的回调for _, callback := range f.callbacks {callback(result, err)}}
  • 这个就是异步发送后返回的Future,result就是回包结果,callbacks就是客户端设置的回调方法,Task是后续添加到全局异步管理器的任务,后续再说
  • 而这个SetResult就是在得到结果后设置future的result,并且调用所有注册的回调方法,并置Task.Status = async.TaskStatusCompleted,这个关于task的我们后面再说
  • 接下来回到invokeAsync,对于没有设置超时时间的发送,我们直接在失败后返回客户端(客户端能够忍受异步的丢失,如果真的发生了长时间的阻塞,也不用担心这个协程不释放,因为我们的连接池会管理这个连接的生命周期),对于设置了超时时间的发送,我们需要在超时时间到达后进行重试,或者达到最大重试次数后进行失败反馈
  • 这里就做了一个全局的管理器,先创建一个Task将其添加到manager中,再进行消息的正常发送。

TaskManager

// Task 表示一个异步任务
type Task struct {MethodName  string             // 方法名Request      interface{}        // 请求参数RetryTimes   int                // 当前已重试次数MaxRetries   int                // 最大重试次数Timeout      time.Duration      // 单次任务超时时间NextRetryAt  time.Time         // 下次重试时间(用于堆排序)ExecuteFunc  func() error       // 重试时任务执行函数Status       TaskStatus        // 状态字段OnComplete   func(error)       // 最终完成回调mu           sync.Mutex // 保证状态变更的线程安全
}// 扫描循环(核心逻辑)
func (tm *TaskManager) scanLoop() {for {select {case <-tm.closeChan:returndefault:tm.processTasks()}}
}// 处理超时任务
func (tm *TaskManager) processTasks() {tm.mu.Lock()if tm.tasks.Len() == 0 {tm.mu.Unlock()// 无任务时休眠,直到被唤醒select {case <-tm.wakeChan:case <-time.After(10 * time.Second): // 防止长期阻塞}return}// 检查堆顶任务是否超时now := time.Now()task := (*tm.tasks)[0]if now.Before(task.NextRetryAt) {// 未超时,休眠到最近任务到期tm.mu.Unlock()time.Sleep(task.NextRetryAt.Sub(now))return}// 弹出超时任务task = heap.Pop(tm.tasks).(*Task)tm.mu.Unlock()// 执行重试逻辑go tm.retryTask(task)
}// 重试任务
func (tm *TaskManager) retryTask(task *Task) {task.mu.Lock()// 检查状态:如果任务已结束,直接返回,不用再次入队列if task.Status != TaskStatusPending {task.mu.Unlock()return}task.Status = TaskStatusRunning // 标记为执行中task.mu.Unlock()err := task.ExecuteFunc()if err == nil {task.OnComplete(nil)return}// 检查是否达到最大重试次数task.RetryTimes++if task.RetryTimes > task.MaxRetries {// 打印fmt.Println("request retry times exceed max retry times")task.OnComplete(err)return}// 计算下次重试时间(如指数退避)delay := time.Duration(math.Pow(2, float64(task.RetryTimes))) * time.Secondtask.NextRetryAt = time.Now().Add(delay) // 重新加入队列// 打印重试次数fmt.Println("request retry time : ", task.RetryTimes)tm.mu.Lock()heap.Push(tm.tasks, task)task.Status = TaskStatusPending // 恢复状态tm.mu.Unlock()tm.notifyScanner()
}
  • 以上是这个manager的关键代码,这个Task就是里面的元素,按照下一次重试时间排序放在manager的一个延时队列里面,优先处理目前需要重试的任务。task的ExecuteFunc我们在前面的方法中可以看到实际上就是retry发送,OnComplete就是将future的setResult使得客户端能得到反馈
  • 循环执行processTasks,对于堆顶任务进行retry
  • retry时先看这个task是不是已经执行成功了,是的话删除这个task,如果不是的话继续入队
  • 这样就可以保证只有一个协程在管理所有的超时任务,避免了每一个超时任务都需要一个协程来等待重试。

多路复用

背景:

  • 默认情况下,每个RPC调用使用一个连接,连接池虽然能缓解资源浪费,对于连接池中的每一个连接,实际上也是串行进行的,也就是说,如果前面的某一个连接处理时间太长,后续的请求只能等待该请求返回后才能复用该连接,也就是http1.1的队头阻塞问题。
  • 为此,引入 多路复用协议 —— 即在一个TCP连接内支持多个“逻辑流”,每个流由 RequestID 唯一标识,从而支持多个请求同时复用一条连接。

实现思路:
我们之前的frame结构如下:

header := FrameHeader{MagicNumber:    MagicNumber,Version:        Version,MessageType:    MessageTypeRequest,SequenceID:     sequenceID, ProtocolLength: uint32(len(protocolDataBytes)),BodyLength:     uint32(len(reqData)),}

实际上已经有了SequenceID这个字段,也就是说,我们可以通过这个SequenceID,来区分同一个连接中的不同的流,也就是说,客户端在同一个连接中,发送了不同的SequenceID的消息,服务端并发处理这些消息,并且保留这个SequenceID返回客户端,客户端的多个流识别这个SequenceID并读取结果

MuxConn(多路复用连接)结构

// 实现net.Conn接口的结构体,保证适配连接池的get和put
// 实际上也是一个连接,只是多了reqID从而可以派生出多个流,区分达到多路复用的目的
type MuxConn struct {conn         net.Conn                   // 原始连接pending      map[uint32]*pendingRequest // 每一个reqID(流)对应的等待通道closeChan    chan struct{}readerDone   chan struct{}writeLock    sync.MutexreqIDCounter uint64 // 分配递增的请求IDmu           sync.RWMutex
}
type pendingRequest struct {ch      chan MuxFrametimeout time.Time
}
func (mc *MuxConn) NextRequestID() uint64 {return atomic.AddUint64(&mc.reqIDCounter, 1)
}
  • 实际上这个MuxConn实现了net.Conn,也是一个连接,只是可以通过NextRequestID派生出多个流,并在这个conn上write特定reqID的请求
  • 可以看到pending这个结构,是一个map,k是reqID,v是一个ch,为什么要设计一个这样的map?因为我们可能同时存在多路并发,不同的客户端的对于同一个conn的请求,我们需要设计一个特有的ch来读取对应的reqID的响应是否到达,如果某一个reqID的响应到达了,发送到对应的ch,从而对应的客户端得到响应。如果多个流直接并发读取tcp的响应,必然会导致reqID乱序现象

connPool的改动
之前的连接池只是正常获取一个连接,当该连接处理完被归还后才置为空闲状态。而对于多路复用显然不是这个规则,对于正在使用的连接,若没有达到最大可以接受的流的量,我们仍然可以接受从池中返回这个连接并使用

对于之前的获取连接的逻辑,我们一次对于多路复用加入以下分支:

        // 1. 优先检查空闲连接if len(p.idleConns) > 0 {// 原逻辑。。。// 多路复用处理if p.isMux {if muxConn, exists := p.muxConns[conn]; exists {if p.streamCount[conn] < p.maxStreams {p.streamCount[conn]++MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return muxConn, nil}}// 如果不是多路复用连接或已达最大流数,回退到普通连接}p.mu.Unlock()return &pooledConnWrapper{conn, p}, nil}// 2. 检查是否可以创建新连接if int(atomic.LoadInt32(&p.activeCount)) < p.maxActive {// 原逻辑。。。// 多路复用连接初始化if p.isMux {if p.muxConns == nil {p.muxConns = make(map[*PooledConn]*mutilpath.MuxConn)p.streamCount = make(map[*PooledConn]int)}muxConn := mutilpath.NewMuxConn(rawConn, 1000)p.muxConns[pooledConn] = muxConnp.streamCount[pooledConn] = 1 // 新连接默认1个流MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return muxConn, nil}p.mu.Unlock()return &pooledConnWrapper{pooledConn, p}, nil}// 3. 新增情况:无空闲且活跃连接达到最大数,检查活跃连接的多路复用能力(仅在多路复用模式下)if p.isMux {for pc, muxConn := range p.muxConns {count := p.streamCount[pc]if count < p.maxStreams {p.streamCount[pc]++atomic.AddInt32(&p.activeCount, 1)pc.lastUsed = time.Now()MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return p.muxConns[pc], nil}}}
  • 对于情况一,若是空闲连接当然直接使用,并增加流数量,并对该连接分配reqID,在MuxConn2SequenceIDMap结构中保存
  • 对于情况二,无空闲连接,但是活跃连接数未满,创建新连接,增加流数量,并对该连接分配reqID,在MuxConn2SequenceIDMap结构中保存
  • 对于情况三,无空闲连接且活跃连接数已经满,检查所有的活跃连接的流数量是否未满,并且返回未满的连接,分配新的流
  • 对于Put逻辑,对应的应是归还流,当某个连接的流为0时,该连接为空闲状态,不再阐述

Send方法改动
之前的方法只需要send中正常序列化和编解码就可以,客户端发送完请求就阻塞(或者异步)等待响应,这里的多路复用模式则是在write前注册一个pendingRequest,监听特定的channel

// mux模式下,通过ch阻塞等待相应的流回包muxConn, _ := conn.(*mutilpath.MuxConn)seqID := msg.GetSequenceID()ch := muxConn.RegisterPending(seqID)defer muxConn.UnregisterPending(seqID)// 写数据err = c.tcpWriteFrame(ctx, conn, framedata)if err != nil {return &common.RPCError{Code:    common.ErrCodeNetwork,Message: fmt.Sprintf("failed to write frame: %v", err),}}// 读响应select {case frame := <-ch:rspDataBuf = frame.Datacase <-ctx.Done():return &common.RPCError{Code:    common.ErrCodeNetwork,Message: fmt.Sprintf("failed to read frame: %v", err),}}
  • 而客户端收到响应,路由到对应reqID的channel的逻辑在这里:

func (mc *MuxConn) readLoop() {defer close(mc.readerDone)for {select {case <-mc.closeChan:returndefault:}frame, err := codec.ReadFrame(mc.conn)if err != nil {// 协议错误处理fmt.Println("读取帧错误:", err)break}mc.dispatchFrame(frame)}
}func (mc *MuxConn) dispatchFrame(frame []byte) {mc.mu.RLock()// 截取流序号sequenceID := binary.BigEndian.Uint32(frame[4:8])pr, exists := mc.pending[uint32(sequenceID)]mc.mu.RUnlock()frameStruct := MuxFrame{Data: frame,}if exists {select {case pr.ch <- frameStruct:// 成功发送到等待通道default:// 通道已满,丢弃帧fmt.Println("丢弃帧 %s:通道已满", frame)}} else {// 直接丢弃或打印日志fmt.Printf("收到未匹配的帧,sequenceID=%d,丢弃\n", sequenceID)}
}

总结

在已有基础通信、连接池与序列化机制之上,通过引入异步发送机制与多路复用技术进一步提升RPC系统的吞吐量与并发性能,使得系统更加健壮。多路复用实际上也是http2.0实现的能力,这里相当于完成了http2.0的任务。以后的版本可以考虑对于性能再进行优化,如网络框架的改进以及更高效的数据结构的使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/84318.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FFmpeg 安装包全攻略:gpl、lgpl、shared、master 区别详解

这些 FFmpeg 安装包有很多版本和变种&#xff0c;主要区别在于以下几个方面&#xff1a; ✅ 一、从名称中看出的关键参数&#xff1a; 1. 版本号 master&#xff1a;开发版&#xff0c;最新功能&#xff0c;但可能不稳定。n6.1 / n7.1&#xff1a;正式版本&#xff0c;更稳定…

深度学习实战:从图像分类到文本生成的完整案例解析

1 图像分类案例 1.1 CIFAR10数据集介绍 cifar数据是torchvision第三方包提供的数据集 训练集5w 测试集1w y标签 10个类别 10分类问题 一张图形状 (32, 32, 3) import torch import torch.nn as nn from torchvision.datasets import CIFAR10 from torchvision.transforms i…

Android 添加系统服务的完整流程

[应用程序] (应用进程)│↓ 调用简单API [SoundManager] │ ├─ 代理模式门面模式&#xff08;应用进程&#xff09;│ ├─ 缓存数据 ←─ 装饰器模式&#xff08;应用进程&#xff09;│ └─ 转换异常 ←─ 适配器模式&#xff08;应用进程&#xff09;│↓ 通过Bind…

wan2.1代码笔记

GPU内存不够&#xff0c;可以先运行umt5&#xff0c;然后再运行wanpipeline&#xff0c;参考FLUX.1代码笔记&#xff0c;或者使用ComfyUI。 下面使用随机数代替umt5 embedding。 import torch from diffusers.utils import export_to_video from diffusers import Autoencoder…

环境搭建与工具配置

3.1 本地环境搭建 3.1.1 WAMP环境搭建漏洞靶场&#xff08;一、二&#xff09; WAMP&#xff08;Windows Apache MySQL PHP&#xff09;是搭建本地Web漏洞靶场的基础环境。 安装步骤&#xff1a; Apache&#xff1a;下载并安装最新版Apache HTTP Server&#xff0c;配置监…

STM32F446主时钟失效时DAC输出异常现象解析与解决方案

—### 现象概述 在STM32F446微控制器应用中&#xff0c;若主时钟&#xff08;HSE&#xff09;的晶体信号对地短路&#xff0c;但DAC&#xff08;数模转换器&#xff09;仍能输出变化信号&#xff0c;这一现象看似矛盾&#xff0c;实则与系统时钟切换机制密切相关。本文将从硬件…

React 如何封装一个可复用的 Ant Design 组件

文章目录 前言一、为什么需要封装组件&#xff1f;二、 仿antd组件的Button按钮三、封装一个可复用的表格组件 (实战)1. 明确需求2. 设计组件 API3. 实现组件代码4. 使用组件 三、封装组件的最佳实践四、进阶优化 总结 前言 作为一名前端开发工程师&#xff0c;在日常项目中&a…

STC89C52RC/LE52RC

STC89C52RC 芯片手册原理图扩展版原理图 功能示例LED灯LED灯的常亮效果LED灯的闪烁LED灯的跑马灯效果&#xff1a;从左到右&#xff0c;从右到左 数码管静态数码管数码管计数mian.cApp.cApp.hCom.cCom.hDir.cDir.hInt.cInt.hMid.cMid.h 模板mian.cApp.cApp.hCom.cCom.hDir.cDir…

踩坑记录:RecyclerView 局部刷新notifyItemChanged多次调用只触发一次 onBindViewHolder 的原因

1. 问题背景 在做项目的时候&#xff0c;RecyclerView需要使用局部刷新&#xff0c;使用 notifyItemChanged(position, payload) 实现局部刷新&#xff0c;但发现调用多次只执行了一次&#xff0c;第二个刷新不生效。 2. 错误示例&#xff08;只处理 payloads.get(0)&#xff…

OpenLayers 加载鹰眼控件

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图控件是一些用来与地图进行简单交互的工具&#xff0c;地图库预先封装好&#xff0c;可以供开发者直接使用。OpenLayers具有大部分常用的控件&#x…

WPF···

设置启动页 默认最后一个窗口关闭,程序退出,可以设置 修改窗体的icon图标 修改项目exe图标 双击项目名会看到代码 其他 在A窗体点击按钮打开B窗体,在B窗体设置WindowStartupLocation=“CenterOwner” 在A窗体的代码设置 B.Owner = this; B.Show(); B窗体生成在A窗体中间…

github公开项目爬取

import requestsdef search_github_repositories(keyword, tokenNone, languageNone, max_results1000):"""通过 GitHub API 搜索仓库&#xff0c;支持分页获取所有结果&#xff08;最多 1000 条&#xff09;:param keyword: 搜索关键词:param token: GitHub To…

防震基座在半导体晶圆制造设备抛光机详细应用案例-江苏泊苏系统集成有限公司

在半导体制造领域&#xff0c;晶圆抛光作为关键工序&#xff0c;对设备稳定性要求近乎苛刻。哪怕极其细微的振动&#xff0c;都可能对晶圆表面质量产生严重影响&#xff0c;进而左右芯片制造的成败。以下为您呈现一个防震基座在半导体晶圆制造设备抛光机上的经典应用案例。 企…

S32K开发环境搭建详细教程(一、S32K IDE安装注册)

一、S32K IDE安装注册 1、进入恩智浦官网https://www.nxp.com.cn/&#xff08;需要在官网注册一个账号&#xff09; 2、直接搜索 “Standard Software”&#xff0c;找到S32K3 Standard Software&#xff0c;点击进入 3、下载 (1)Automotive SW - S32K3 - S32 Design Studio…

Spring Cloud Gateway 微服务网关实战指南

上篇文章简单介绍了SpringCloud系列OpenFeign的基本用法以及Demo搭建&#xff08;Spring Cloud实战&#xff1a;OpenFeign远程调用与服务治理-CSDN博客&#xff09;&#xff0c;今天继续讲解下SpringCloud Gateway实战指南&#xff01;在分享之前继续回顾下本次SpringCloud的专…

MSP430G2553 USCI模块串口通信

1.前言 最近需要利用msp430连接蓝牙模块传递数据&#xff0c;于是死磕了一段时间串口&#xff0c;在这里记录一下 2.msp430串口模块 msp430的串口模块可以有USCI模块提供 在异步模式中&#xff0c; USCI_Ax 模块通过两个外部引脚&#xff0c; UCAxRXD 和 UCAxTXD&#xff0…

【产品经理从0到1】用户端产品设计与用户画像

思考 xx新闻的第一个版本应该做哪些事情呢&#xff1f; 用户端核心功能 用户端通用页面设计 思考 回想一下&#xff0c;大家在第一次使用一个新下载的App的时候会看到一些什么样的页面?这样的页面一般都是展示了一些什么内容? 引导页 概念 第一次安装App或者更新App后第…

多场景游戏AI新突破!Divide-Fuse-Conquer如何激发大模型“顿悟时刻“?

多场景游戏AI新突破&#xff01;Divide-Fuse-Conquer如何激发大模型"顿悟时刻"&#xff1f; 大语言模型在强化学习中偶现的"顿悟时刻"引人关注&#xff0c;但多场景游戏中训练不稳定、泛化能力差等问题亟待解决。Divide-Fuse-Conquer方法&#xff0c;通过…

佰力博科技与您探讨压电材料的原理与压电效应的应用

压电材料的原理基于正压电效应和逆压电效应&#xff0c;即机械能与电能之间的双向转换特性。 压电材料的原理源于其独特的晶体结构和电-机械耦合效应&#xff0c;具体可分为以下核心要点&#xff1a; 1. ‌正压电效应与逆压电效应的定义‌ ‌正压电效应‌&#xff1a;当压电…

算法备案审核周期

&#xff08;一&#xff09;主体备案审核 主体备案审核周期通常为7-10个工作日&#xff0c;监管部门将对企业提交的资质信息进行严格审查&#xff0c;审核重点包括&#xff1a; 营业执照的真实性、有效性及与备案主体的一致性。法人及算法安全责任人身份信息的准确性与有效性…