18.TaskExecutor获取ResourceManagerGateway

TaskExecutor获取ResourceManagerGateway

  • TaskExecutorResourceManager 之间的交互机制较为复杂,核心可以拆分为三个阶段:

    • 首次发现与注册
    • 连接建立
    • 心跳维持

    本文聚焦连接建立阶段,详细分析底层 RPC 连接的实现原理。

回顾:startRegistration方法

在注册过程中,TaskExecutor 会调用如下逻辑,通过 rpcService.connectResourceManager 建立远程通信连接:

//其中,targetType 是 ResourceManagerGateway.class。重点关注 rpcService.connect 方法。
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));
}

PekkoRpcService

  • Flink 内部的 RPC 框架由 Pekko(Akka) 支撑,PekkoRpcService 就是基于 Pekko 实现的通信组件,负责不同组件之间的远程通信。
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {return connectInternal(address,clazz,(ActorRef actorRef) -> {Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);return new FencedPekkoInvocationHandler<>(   // 关键:创建 InvocationHandleraddressHostname.f0,addressHostname.f1,actorRef,configuration.getTimeout(),configuration.getMaximumFramesize(),configuration.isForceRpcInvocationSerialization(),null,() -> fencingToken,  // 支持 fencingToken 防止旧节点通信captureAskCallstacks,flinkClassLoader);});
}

核心方法:connectInternal

connectInternal 方法的任务是:

  • 通过目标组件的 RPC 地址,获取 ActorRef(类似 NIO 中的 selector-channel);
  • 与目标 Actor(如 ResourceManager)完成一次握手校验;
  • 基于 InvocationHandler 生成远程组件的代理对象(Gateway),用于后续透明 RPC 调用。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(String address,Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =actorRefFuture.thenCompose((ActorRef actorRef) -> Patterns.ask(actorRef,new RemoteHandshakeMessage(clazz, getVersion()),   // 发送握手请求configuration.getTimeout().toMillis()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class)));final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);@SuppressWarnings("unchecked")C proxy = (C) Proxy.newProxyInstance(getClass().getClassLoader(),new Class<?>[] {clazz},invocationHandler);return proxy;   // 返回 ResourceManagerGateway 动态代理},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}

获取 ActorRef:resolveActorAddress

private CompletableFuture<ActorRef> resolveActorAddress(String address) {final ActorSelection actorSel = actorSystem.actorSelection(address);return actorSel.resolveOne(configuration.getTimeout()).toCompletableFuture().exceptionally(error -> {throw new CompletionException(new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.", address),error));});
}
  • 根据 address 定位到目标组件的 Actor(类似根据地址寻找远程服务端点)。
  • 异步获取目标组件的 ActorRef,这是后续所有远程消息传递的核心通信对象。
  • 如果解析失败,立即包装为 RpcConnectionException 抛出,阻断注册链路。
  • 特别注意
    该方法返回的 CompletableFuture<ActorRef> 是由 Akka 内部线程异步完成的(即依赖 ActorSystem 自身的调度机制)。
    因此,无需显式调用 executor 来管理异步逻辑,整个链式流程天然是异步的,并由 Akka 自身的事件机制驱动完成。
    这也是 AkkaPekko)模型的设计优势:
    组件间通信与任务调度完全解耦,基于 ActorRef 的消息传递自动异步非阻塞。

RemoteHandshakeMessage:初次握手阶段

  • 作用:
    在建立正式通信前,TaskExecutor 必须先与 ResourceManager 进行协议层握手,确保:

    • 版本一致;
    • 所请求的网关类型(如 ResourceManagerGateway)是对方支持的。
  • 源码

 private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {if (!isCompatibleVersion(handshakeMessage.getVersion())) {sendErrorIfSender(new HandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",handshakeMessage.getVersion(), getVersion())));} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {sendErrorIfSender(new HandshakeException(String.format("The rpc endpoint does not support the gateway %s.",handshakeMessage.getRpcGateway().getSimpleName())));} else {//告诉taskExecutor,可以连接getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());}}
握手处理逻辑:PekkoRpcActor.handleHandshakeMessage()
  1. 版本校验

    • 如果通信双方的 Flink 版本不一致(可能是跨版本集群或配置错误),直接拒绝握手并返回错误信息。
  2. 接口类型校验

    • 检查请求方希望通信的 Gateway 接口(即 ResourceManagerGateway)是否被当前端点支持。
    • 不支持的网关说明连接请求本质错误,拒绝握手。
  3. 握手成功

    • 前两步校验都通过,表明可以安全建立通信。

    • 此时向对方返回:

      java复制编辑
      getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
      

      即告诉发起方(如 TaskExecutor):可以正式建立连接。

actorRefFuture.thenCombineAsync:构建 ResourceManagerGateWay代理

核心目的:
  • 根据 ResourceManager 的 ActorRef 构建其 RPC 代理(即 ResourceManagerGateway 的动态代理对象)
源码解析
final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();@SuppressWarnings("unchecked")C proxy = (C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());
整体过程
  1. 等待 actorRefFuture 和 handshakeFuture 都完成:
    • actorRefFuture:已经获取了 ResourceManager 的 ActorRef。
    • handshakeFuture:握手成功,确认可以通信。
  2. 生成 InvocationHandler:
    • 实际是封装了 actorRef 和通信参数的一个代理调用处理器。
    • 后续所有发往 ResourceManager 的方法调用,都会被转化成消息,通过这个 handler 发送到 actorRef 对应的远程组件。
  3. 构建代理对象:
    • 使用 JDK 动态代理(Proxy.newProxyInstance)创建了一个ResourceManagerGateway 的动态代理
    • 对用户代码来说,这个代理就是一个普通的 ResourceManagerGateway,只是内部通过 actorRef 做远程消息发送。
  4. 返回代理对象(proxy):
    • proxy 就是一个“可直接远程调用 ResourceManager”的接口对象。

总结

  • TaskExecutor 已获取 ResourceManager 的代理网关(即 ResourceManagerGateway 代理对象);
  • 该代理对象封装了与 ResourceManager 通信所需的 actorRef 和 RPC 协议细节;
  • TaskExecutor 接下来只需要通过该网关对象,正式发起注册请求

这一阶段的核心工作是:

  • 建立连接(即通过 rpcService.connect 拿到 ResourceManager 的 actorRef 并创建代理网关);
  • 完成握手(确保版本兼容和接口匹配);
  • 生成代理(通过动态代理对外提供 ResourceManagerGateway 接口)。

下一步就是:

  • TaskExecutor 通过该网关对象向 ResourceManager 发起注册;
  • ResourceManager 受理注册请求;
  • 建立心跳与 slot 汇报等稳定的会话机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/90286.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/90286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kafka查看消息的具体内容 kafka-dump-log.sh

目录kafka 消息查看1. 直接查看日志文件内容步骤&#xff1a;2. 使用 Kafka 工具查看日志主要参数说明常用命令&#xff1a;输出说明&#xff1a;3. 注意事项kafka 消息日志文件详解我们有时候遇到这样的需求&#xff0c;需要查看下kafka消息的内容。 kafka 消息查看 查看 Ka…

Spring Cloud OpenFeign 常用注解_笔记

Spring Cloud OpenFeign 提供了一种声明式、模板化的HTTP客户端&#xff0c;可以通过简单的接口描述远程调用&#xff0c;而不必手动编写低级的 HTTP 客户端代码。FeignClient用法参考&#xff1a;FeignClient用法-笔记-CSDN博客。这里梳理Spring Cloud OpenFeign 常用注解。 1…

移动端自动化Appium框架

文章目录环境搭建JAVAAndroid SDKGenymotion模拟器环境搭建 JAVA 1、安装JDK 从官网下载所需安装包&#xff0c;默认安装即可。 https://www.oracle.com/cn/java/technologies/downloads/ 2、配置环境变量 设置 - 编辑系统环境变量 - 环境变量。 系统变量下新建JAVA_HOME&a…

算法第26天|贪心算法:用最少数量的箭引爆气球、无重叠区间、划分字母区间

今日总结 用最少数量的箭引爆气球 题目链接&#xff1a;452. 用最少数量的箭引爆气球 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 整体思路&#xff1a; 1、统一度量 &#xff1a; 将所有区间按照左端点进行排序&#xff1a; 用到了二维的sort&#xff0c;在类中需…

最新版的electron通信规则

介绍: 以前electron require(electron/remote).fs 就能调用node中的各种api,最新版可能为了安全考虑,除了主main.js入口文件以外,其他的地方都不能调用node中的api,比如里面的各种函数,如fs,path等。这节课来教大家最新版本的electron如何进行通信。 结构: 了解通信之前…

Python爬虫实战:研究PyPLN库相关技术

1. 引言 随着全球化的发展,葡萄牙语作为世界第六大语言,其在互联网上的文本数据量不断增长。如何从海量的葡萄牙语文本中提取有价值的信息,成为自然语言处理领域的重要研究方向。 PyPLN (Python Natural Language Processing Toolkit) 是一个专门针对葡萄牙语设计的自然语言…

层次分析法代码笔记

层次分析法 一、核心 在层次分析法中&#xff0c;通过 算术平均法、几何平均法、特征值法 计算指标权重&#xff0c;再通过 一致性检验 确保判断矩阵逻辑合理&#xff0c;为多准则决策提供量化依据。 二、代码 &#xff08;一&#xff09;一致性检验&#xff08;判断矩阵合理性…

[精选] 2025最新生成 SSH 密钥和 SSL 证书的标准流程(Linux/macOS/Windows系统服务器通用方案)

[精选] 2025最新生成 SSH 密钥和 SSL 证书的标准流程&#xff08;Linux/macOS/Windows系统服务器通用方案&#xff09; 在现代网络中&#xff0c;SSH&#xff08;安全外壳协议&#xff09;和 SSL&#xff08;安全套接层协议&#xff09;是保证数据传输安全和身份验证的重要技术…

开发框架安全ThinkPHPLaravelSpringBootStruts2SpringCloud复现

PHP-ThinkphpLaravelThinkPHP是一套开源的、基于PHP的轻量级Web应用开发框架综合工具&#xff1a;武器库-Thinkphp专检&#xff08;3-6版本&#xff09;如何判断是TP6框架开发的web程序&#xff0c;基于源码、路径、图标、基于报错可发现dex.php?xxx 在其6.0.13版本及以前/?c…

uniapp+vue3小程序点击保存图片、保存二维码

介绍 步骤1:引入必要的API 在script部分,确保引入了uni的相关API,如uni.downloadFile和uni.saveImageToPhotosAlbum。 步骤2:下载图片到本地 在toInvite函数中,使用uni.downloadFile将图片下载到本地,并获取本地路径。 步骤3:处理权限和保存逻辑 在saveToAlbum函数…

Golang中GROM多表关联跟原生SQL多表关联区别

文章目录前言一、GROM多表关联二、原生Sql多表关联前言 对比GROM多表关联和原生Sql多表关联 一、GROM多表关联 适用于返回全部数据需要逻辑外键&#xff08;不会在数据库创建任何约束&#xff09;适合三个表以下的关联有几张表就会查询几次 type Product struct {gorm.Model …

设计模式六:工厂模式(Factory Pattern)

概念定义一个创建对象的接口&#xff0c;但让子类决定实例化哪个类。实现示例#include <iostream> #include <memory>// 产品基类 class Product { public:virtual void use() 0;virtual ~Product() default; };// 具体产品A class ConcreteProductA : public Pr…

应用层自定义协议【序列化+反序列化】

文章目录再谈 “协议”重新理解read、write、recv、send和tcp为什么支持全双工Server.cc网络版计算机实现Socket封装&#xff08;模板方法类&#xff09;socket.hpp定制协议JsonJson安装定义一个期望的报文格式Protocol.hppParser.hppCalculator.hpp完整的处理过程Client.cc三层…

dify创建OCR工作流

实现ocr识别文件内容&#xff0c;引用dify的一个插件&#xff0c;插件名称&#xff1a;mineru 引用在线版本mineru 具体操作说明&#xff0c;参见视频&#xff1a; 第六篇&#xff1a;DifyOCR&#xff0c;扫描件最优解_哔哩哔哩_bilibili 引用本地部署mineru 上面的这种使用…

备受关注的“Facebook Email Scraper”如何操作?

Facebook Email Scraper&#xff08;脸书邮箱提取工具&#xff09;是一类用于从Facebook平台提取公开邮箱信息的工具&#xff0c;其核心功能是通过解析用户主页、群组、页面等公开内容&#xff0c;识别并提取其中包含的邮箱地址&#xff0c;为用户提供结构化的联系方式数据。这…

【网络原理】万字长文解密UDP/TCP——手把手教你理解网络通信

目录 1.前言 2.正文 2.1UDP协议 2.1.1UDP协议端格式 2.1.2UDP的特点 2.1.3理解UDP的“不可靠” 2.1.4面向数据报 2.1.5基于UDP的应用层协议 2.2TCP协议 2.2.1TCP协议端格式 2.2.2TCP十个核心机制 2.2.2.1确认应答 2.2.2.2超时重传 确认应答超时重传 vs 三次握手 …

MATLAB软件使用频繁,企业如何做到“少买多用”?

在制造企业的工程计算、算法研发、系统建模等场景中&#xff0c;MATLAB 已成为不可或缺的核心工具。 无论是动力学建模、控制算法开发&#xff0c;还是信号处理和数据可视化&#xff0c;MATLAB 的高频使用场景覆盖了从研发部门到测试部门的多个岗位。然而&#xff0c;企业 IT 负…

数据结构自学Day13 -- 快速排序--“分而治之”

&#x1f536; 一、快速排序&#xff08;Quick Sort&#xff09;&#x1f4cc; 基本思想&#xff1a;分而治之&#xff1a;每次从数组中选一个“基准”&#xff08;pivot&#xff09;&#xff0c;把比它小的放左边&#xff0c;大的放右边。对左右子数组递归排序。&#x1f9e0;…

Linux 进程与服务管理~进程基础、进程查看、进程控制、服务管理、开机启动​​

在 Linux 系统中,进程与服务管理是运维和开发的核心技能之一。进程是程序运行的实例,服务是长期运行的后台进程(守护进程)。掌握进程与服务的管理方法,能有效排查系统问题、优化资源使用。以下从 ​​进程基础、进程查看、进程控制、服务管理、开机启动​​ 五大模块详细讲…

论文笔记 | Beyond Pick-and-Place: Tackling Robotic Stacking of Diverse Shapes

论文地址&#xff1a;Beyond Pick-and-Place: Tackling Robotic Stacking of Diverse Shapes 概述&#xff1a;本文提出 RGB-Stacking 基准测试&#xff0c;研究如何仅凭 RGB 摄像头视觉和本体感知&#xff0c;实现机器人对 复杂几何物体的高效堆叠。通过结合仿真专家训练、交互…