[spring-cloud: 负载均衡]-源码分析

获取服务列表

ServiceInstanceListSupplier

ServiceInstanceListSupplier 接口是一个提供 ServiceInstance 列表的供应者,返回一个响应式流 Flux<List<ServiceInstance>>,用于服务发现。

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {String getServiceId();default Flux<List<ServiceInstance>> get(Request request) {return get();}static ServiceInstanceListSupplierBuilder builder() {return new ServiceInstanceListSupplierBuilder();}}

DelegatingServiceInstanceListSupplier

DelegatingServiceInstanceListSupplier 是一个抽象类,继承自 ServiceInstanceListSupplier,它通过委托给另一个 ServiceInstanceListSupplier 实例来实现其功能,同时支持选定服务实例的回调、初始化和销毁操作。

public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, SelectedInstanceCallback, InitializingBean, DisposableBean {protected final ServiceInstanceListSupplier delegate;public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {Assert.notNull(delegate, "delegate may not be null");this.delegate = delegate;}public ServiceInstanceListSupplier getDelegate() {return delegate;}@Overridepublic String getServiceId() {return delegate.getServiceId();}@Overridepublic void selectedServiceInstance(ServiceInstance serviceInstance) {if (delegate instanceof SelectedInstanceCallback selectedInstanceCallbackDelegate) {selectedInstanceCallbackDelegate.selectedServiceInstance(serviceInstance);}}@Overridepublic void afterPropertiesSet() throws Exception {if (delegate instanceof InitializingBean) {((InitializingBean) delegate).afterPropertiesSet();}}@Overridepublic void destroy() throws Exception {if (delegate instanceof DisposableBean) {((DisposableBean) delegate).destroy();}}}

负载均衡实现

ReactorLoadBalancer

ReactorLoadBalancer 是基于 Reactor 实现的响应式负载均衡器,通过 Mono<Response<T>> 异步选择服务实例。

public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {@SuppressWarnings("rawtypes")Mono<Response<T>> choose(Request request);default Mono<Response<T>> choose() {return choose(REQUEST);}}

ReactorServiceInstanceLoadBalancer

ReactorServiceInstanceLoadBalancer 是一个标记接口,继承自 ReactorLoadBalancer,专门用于选择 ServiceInstance 对象的负载均衡器。

// RandomLoadBalancer, RoundRobinLoadBalancer
public interface ReactorServiceInstanceLoadBalancer extends ReactorLoadBalancer<ServiceInstance> {}

核心代码逻辑

推荐阅读:[spring-cloud: @LoadBalanced & @LoadBalancerClient]-源码分析

1. BlockingLoadBalancerInterceptor

// LoadBalancerInterceptor, RetryLoadBalancerInterceptor 
public interface BlockingLoadBalancerInterceptor extends ClientHttpRequestInterceptor {}

LoadBalancerInterceptor

public class LoadBalancerInterceptor implements BlockingLoadBalancerInterceptor {private final LoadBalancerClient loadBalancer;private final LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}// 重点!@Overridepublic ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}}

2. BlockingLoadBalancerClient

ServiceInstanceChooser

ServiceInstanceChooser 接口用于通过负载均衡器选择与指定服务ID对应的服务实例,支持带请求上下文的选择。

public interface ServiceInstanceChooser {ServiceInstance choose(String serviceId);<T> ServiceInstance choose(String serviceId, Request<T> request);}

LoadBalancerClient

LoadBalancerClient 接口用于客户端负载均衡,选择服务实例并执行请求,同时提供将逻辑服务名重构为实际服务实例的 URI 的功能。

public interface LoadBalancerClient extends ServiceInstanceChooser {<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;URI reconstructURI(ServiceInstance instance, URI original);}
// BlockingLoadBalancerClientAutoConfiguration
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration// LoadBalancerClientFactoryprivate final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {this.loadBalancerClientFactory = loadBalancerClientFactory;}// 重点!@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {String hint = getHint(serviceId);LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, buildRequestContext(request, hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));// 通过 choose 方法来选择一个合适的 ServiceInstanceServiceInstance serviceInstance = choose(serviceId, lbRequest);if (serviceInstance == null) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));throw new IllegalStateException("No instances available for " + serviceId);}return execute(serviceId, serviceInstance, lbRequest);}private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {if (delegate instanceof HttpRequestLoadBalancerRequest) {HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();if (request != null) {RequestData requestData = new RequestData(request);return new RequestDataContext(requestData, hint);}}return new DefaultRequestContext(delegate, hint);}// 通过生命周期钩子函数来管理负载均衡请求的开始与结束,并处理可能的异常,确保负载均衡的执行过程有序@Overridepublic <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {if (serviceInstance == null) {throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);}DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));try {T response = request.apply(serviceInstance);Object clientResponse = getClientResponse(response);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));return response;}catch (IOException iOException) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));throw iOException;}catch (Exception exception) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));ReflectionUtils.rethrowRuntimeException(exception);}return null;}private <T> Object getClientResponse(T response) {ClientHttpResponse clientHttpResponse = null;if (response instanceof ClientHttpResponse) {clientHttpResponse = (ClientHttpResponse) response;}if (clientHttpResponse != null) {try {return new ResponseData(clientHttpResponse, null);}catch (IOException ignored) {}}return response;}private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),DefaultRequestContext.class, Object.class, ServiceInstance.class);}@Overridepublic URI reconstructURI(ServiceInstance serviceInstance, URI original) {return LoadBalancerUriTools.reconstructURI(serviceInstance, original);}@Overridepublic ServiceInstance choose(String serviceId) {return choose(serviceId, REQUEST);}// 重点!通过负载均衡器同步选择一个服务实例并返回@Overridepublic <T> ServiceInstance choose(String serviceId, Request<T> request) {ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);if (loadBalancer == null) {return null;}Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();if (loadBalancerResponse == null) {return null;}return loadBalancerResponse.getServer();}private String getHint(String serviceId) {LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);String defaultHint = properties.getHint().getOrDefault("default", "default");String hintPropertyValue = properties.getHint().get(serviceId);return hintPropertyValue != null ? hintPropertyValue : defaultHint;}}

3. LoadBalancerRequestFactory

LoadBalancerRequestFactory 类用于创建封装负载均衡请求的 LoadBalancerRequest 实例,支持请求转换器和负载均衡客户端的配置。

public class LoadBalancerRequestFactory {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,List<LoadBalancerRequestTransformer> transformers) {this.loadBalancer = loadBalancer;this.transformers = transformers;}public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {this.loadBalancer = loadBalancer;transformers = new ArrayList<>();}public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) {return new BlockingLoadBalancerRequest(loadBalancer, transformers,new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));}}
LoadBalancerRequestTransformer

LoadBalancerRequestTransformer 接口允许在负载均衡过程中根据不同的服务实例自定义转换 HttpRequest,如修改请求头、URL 等,同时通过 @Order 注解控制其执行顺序。

@Order(LoadBalancerRequestTransformer.DEFAULT_ORDER)
public interface LoadBalancerRequestTransformer {int DEFAULT_ORDER = 0;HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}

4. BlockingLoadBalancerRequest

BlockingLoadBalancerRequest 类实现了负载均衡请求接口,负责将原始 HTTP 请求封装为负载均衡请求,并支持应用请求转换器和执行负载均衡操作。

class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;private final ClientHttpRequestData clientHttpRequestData;BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,ClientHttpRequestData clientHttpRequestData) {this.loadBalancer = loadBalancer;this.transformers = transformers;this.clientHttpRequestData = clientHttpRequestData;}@Overridepublic ClientHttpResponse apply(ServiceInstance instance) throws Exception {HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);if (this.transformers != null) {for (LoadBalancerRequestTransformer transformer : this.transformers) {serviceRequest = transformer.transformRequest(serviceRequest, instance);}}return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);}@Overridepublic HttpRequest getHttpRequest() {return clientHttpRequestData.request;}static class ClientHttpRequestData {private final HttpRequest request;private final byte[] body;private final ClientHttpRequestExecution execution;ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {this.request = request;this.body = body;this.execution = execution;}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92061.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92061.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle 在线重定义

Oracle 在线重定义&#xff08;Online Redefinition&#xff09; 是一种功能&#xff0c;通过DBMS_REDEFINITION 包提供&#xff0c;允许DBA在不需要停止或显著影响数据库正常操作的情况下&#xff0c;对数据库表进行结构化修改。可以实现的功能将表移动到其它表空间增加、修改…

Web 开发 12

1 网址里的 “搜索请求” 结构 这张图是在教你怎么看懂 网址里的 “搜索请求” 结构&#xff0c;特别基础但超重要&#xff0c;对你学前端帮别人做搜索功能超有用&#xff0c;用大白话拆成 3 步讲&#xff1a; 1. 先看「协议&#xff08;Protocol&#xff09;」 HTTPS 就是浏…

网络安全 | 如何构建一个有效的企业安全响应团队

网络安全 | 如何构建一个有效的企业安全响应团队 一、前言 二、团队组建的基础要素 2.1 人员选拔 2.2 角色定位 三、团队应具备的核心能力 3.1 技术专长 3.2 应急处置能力 3.3 沟通协作能力 四、团队的运作机制 4.1 威胁监测与预警流程 4.2 事件响应流程 4.3 事后复盘与改进机制…

HTTP、WebSocket、TCP、Kafka等通讯渠道对比详解

在当今互联的数字世界中&#xff0c;通信渠道是系统、应用程序和设备之间数据交换的支柱。从传统的HTTP和TCP协议到专为特定场景设计的Kafka和MQTT等平台&#xff0c;这些通信方式满足了从实时消息传递到大规模数据流处理的多样化需求。本文将深入探讨主要的通信协议和平台。一…

臭氧、颗粒物和雾霾天气过程的大气污染物计算 CAMx模型

随着我国经济快速发展&#xff0c;我国面临着日益严重的大气污染问题。大气污染是工农业生产、生活、交通、城市化等方面人为活动的综合结果&#xff0c;同时气象因素是控制大气污染的关键自然因素。大气污染问题既是局部、当地的&#xff0c;也是区域的&#xff0c;甚至是全球…

数据结构(13)堆

目录 1、堆的概念与结构 2、堆的实现 2.1 向上调整算法&#xff08;堆的插入&#xff09; 2.2 向下调整算法&#xff08;堆的删除&#xff09; 2.3 完整代码 3、堆的应用 3.1 堆排序 3.2 Top-K问题 1、堆的概念与结构 堆是一种特殊的二叉树&#xff0c;根结点最大的堆称…

C++模板知识点3『std::initializer_list初始化时逗号表达式的执行顺序』

std::initializer_list初始化时逗号表达式的执行顺序 在使用Qt Creator4.12.2&#xff0c;Qt5.12.9 MinGW开发的过程中发现了一个奇怪的现象&#xff0c;std::initializer_list<int>在初始化构造时的执行顺序反了&#xff0c;经过一番测试发现&#xff0c;其执行顺序可正…

【Unity3D】Shader圆形弧度裁剪

片元着色器&#xff1a; float3 _Center float3(0, 0, 0); float3 modelPos i.modelPos;// float angle atan2(modelPos.y - _Center.y, modelPos.x - _Center.x); // 计算角度&#xff0c;范围-π到π float angle atan2(modelPos.y - _Center.y, modelPos.z - _Center.z)…

curl发送文件bodyParser无法获取请求体的问题分析

问题及现象 开发过程使用curlPUT方式发送少量数据, 后端使用NodeJSexpress框架bodyParser,但测试发现无法获取到请求体内容,现象表现为req.body 为空对象 {} 代码如下: const bodyParser require(body-parser); router.use(/api/1, bodyParser.raw({limit: 10mb, type: */*}))…

Vue3 学习教程,从入门到精通,Vue 3 内置属性语法知识点及案例代码(25)

Vue 3 内置属性语法知识点及案例代码 Vue 3 提供了丰富的内置属性&#xff0c;帮助开发者高效地构建用户界面。以下将详细介绍 Vue 3 的主要内置属性&#xff0c;并结合详细的案例代码进行说明。每个案例代码都包含详细的注释&#xff0c;帮助初学者更好地理解其用法。1. data …

机器学习基石:深入解析线性回归

线性回归是机器学习中最基础、最核心的算法之一&#xff0c;它为我们理解更复杂的模型奠定了基础。本文将带你全面解析线性回归的方方面面。1. 什么是回归&#xff1f; 回归分析用于预测连续型数值。它研究自变量&#xff08;特征&#xff09;与因变量&#xff08;目标&#xf…

OneCodeServer 架构深度解析:从组件设计到运行时机制

一、架构概览与设计哲学1.1 系统定位与核心价值OneCodeServer 作为 OneCode 平台的核心服务端组件&#xff0c;是连接前端设计器与后端业务逻辑的桥梁&#xff0c;提供了从元数据定义到应用程序执行的完整解决方案。它不仅是一个代码生成引擎&#xff0c;更是一个全生命周期管理…

Jwts用于创建和验证 ​​JSON Web Token(JWT)​​ 的开源库详解

Jwts用于创建和验证 ​​JSON Web Token&#xff08;JWT&#xff09;​​ 的开源库详解在 Java 开发中&#xff0c;提到 Jwts 通常指的是 ​​JJWT&#xff08;Java JWT&#xff09;库​​中的核心工具类 io.jsonwebtoken.Jwts。JJWT 是一个专门用于创建和验证 ​​JSON Web To…

如果发送的数据和接受的数据不一致时,怎么办?

那ART4222这个板卡举例&#xff0c;我之间输入一个原始数据“6C532A14”&#xff0c;但是在选择偶校验时&#xff0c;接收的是“6C532B14”&#xff0c;我发送的码率&#xff08;运行速度&#xff09;是100000&#xff0c;但接受的不稳定&#xff0c;比如&#xff1b;“100100.…

ISCC认证:可持续生产的新标杆。ISCC如何更快认证

在全球可持续发展浪潮中&#xff0c;ISCC&#xff08;国际可持续与碳认证&#xff09;体系已成为企业绿色转型的重要工具。这一国际公认的认证系统覆盖农业、林业、废弃物处理等多个领域&#xff0c;通过严格的可持续性标准、供应链可追溯性要求和碳排放计算规范&#xff0c;建…

想对学习自动化测试的一些建议

Python接口自动化测试零基础入门到精通&#xff08;2025最新版&#xff09;接触了不少同行&#xff0c;由于他们之前一直做手工测试&#xff0c;现在很迫切希望做自动化测试&#xff0c;其中不乏工作5年以上的人。 本人从事软件自动化测试已经近5年&#xff0c;从server端到web…

电子电气架构 ---智能电动汽车嵌入式软件开发过程中的block点

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…

createAsyncThunk

下面&#xff0c;我们来系统的梳理关于 Redux Toolkit 异步操作&#xff1a;createAsyncThunk 的基本知识点&#xff1a;一、createAsyncThunk 概述 1.1 为什么需要 createAsyncThunk 在 Redux 中处理异步操作&#xff08;如 API 调用&#xff09;时&#xff0c;传统方法需要手…

STM32F103C8T6 BC20模块NBIOT GPS北斗模块采集温湿度和经纬度发送到EMQX

云平台配置 访问下载页面&#xff1a;免费试用 EMQX Cloud 或 EMQX Enterprise | 下载 EMQX&#xff0c;根据需求选择对应版本下载。将下载的压缩包上传至服务器&#xff08;推荐存放于C盘根目录&#xff0c;便于后续操作&#xff09;&#xff0c;并解压至指定路径&#xff08…

YOLO11涨点优化:自研检测头, 新创新点(SC_C_11Detect)检测头结构创新,实现有效涨点

目标检测领域迎来重大突破!本文揭秘原创SC_C_11Detect检测头,通过空间-通道协同优化与11层深度结构,在YOLO系列上实现mAP最高提升5.7%,小目标检测精度暴涨9.3%!创新性结构设计+即插即用特性,为工业检测、自动驾驶等场景带来革命性提升! 一、传统检测头的三大痛点 在目…