flink如何基于Pekko实现RPC调用

摘要

通过阅读flink源码,了解flink是如何基于Pekko实现远程RPC调用的

Pekko实现远程调用

Flink 的 RPC 框架底层是构建在 Pekko 的 actor 模型之上的,了解Pekko如何使用,对后续源码的阅读有帮助。

Apache Pekko(原为 Akka 的一个分支)是一个强大的工具包,用于构建并发、分布式和可扩展的系统。它基于经典的 Actor 模型,提供了一种事件驱动、非阻塞的编程范式,使开发者能够更轻松地构建容错性强、模块化清晰的分布式应用。

引入依赖

确保你使用的是 Apache Pekko 的 Maven 依赖:

<dependencies><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-actor_2.13</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-remote_2.13</artifactId><version>1.0.2</version></dependency>
</dependencies>

定义消息类(RPC 通信协议)

public class HelloRequest implements java.io.Serializable {  public final String message;  public HelloRequest(String message) {  this.message = message;  }  
}
public class HelloResponse implements java.io.Serializable {  public final String reply;  public HelloResponse(String reply) {  this.reply = reply;  }  @Override  public String toString() {  return reply;  }  
}

HelloRequestHelloResponse 是在使用 Pekko远程通信 时的消息协议类,也就是你定义的“请求消息”和“响应消息”。它们是通过网络在客户端与服务端之间传输的,所以必须满足可序列化(Serializable)的要求。

服务端代码(远程服务)

HelloActor.java

public class HelloActor extends AbstractActor {  @Override  public Receive createReceive() {  return receiveBuilder()  .match(HelloRequest.class, req -> {  System.out.println("服务端收到消息: " + req.message);  // 回复客户端  getSender().tell(new HelloResponse("你好,客户端,我收到了:" + req.message), getSelf());  })  .build();  }  public static Props props() {  return Props.create(HelloActor.class);  }  
}

在 Pekko 中,HelloActor 相当于传统 RPC 框架中的服务实现类,但其处理逻辑是基于 消息驱动模型 而非方法调用。Pekko 的核心设计理念是:Actor 只对接收到的消息做出反应,并保持自身状态独立和可并发执行

以下是关键点说明:

  • createReceive() 方法 定义了该 Actor 支持的消息类型和对应的处理逻辑。使用 receiveBuilder().match(...).build() 来设置“消息类型 → 响应处理”的映射。
  • getSender().tell(...) 表示将处理结果异步返回给消息发送者,它等价于传统 RPC 中的“返回值”机制,只不过是通过消息的方式返回。
  • Props.create(...) 返回一个 Props 实例,描述了如何构造该 Actor。这类似于构造函数的封装工厂。Props 是 Actor 的构造“配方”,用于 ActorSystem.actorOf(...) 创建真正的 Actor 实例。

ServerApp.java

public class ServerApp {  public static void main(String[] args) {  // 使用硬编码配置启动远程 ActorSystem        Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 25520            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ServerSystem", config);  // 启动 HelloActor,名字是 helloActor,供客户端远程访问  ActorRef actorRef = system.actorOf(HelloActor.props(), "helloActor");  System.out.println("服务端已启动,等待远程调用...");  }  
}

代码说明

  • 用 Java 代码动态构造 Pekko 配置(替代 application.conf 文件)
  • pekko.actor.serialize-messages = on 强制所有 Actor 之间发送的消息都走序列化流程(即使是本地通信)
  • ActorSystem.create(...) 创建了一个名为 ServerSystem 的远程 Actor 系统。
  • 指定 IP 和端口为 127.0.0.1:25520,就像传统 RPC 服务绑定地址。
  • 启动一个名为 helloActor 的 actor,客户端稍后通过这个名字进行访问。

客户端代码

ClientApp.java

public class ClientApp {  public static void main(String[] args) throws Exception {  // 使用硬编码配置启动客户端 ActorSystem,端口 0 表示随机  Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 0            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ClientSystem", config);  // 远程 actor 路径,相当于 RPC 服务地址 + 接口名  String remotePath = "pekko://ServerSystem@127.0.0.1:25520/user/helloActor";  // 选择远程 actor,相当于创建客户端 stub        ActorSelection selection = system.actorSelection(remotePath);  // 使用 ask 模式发送消息,并接收响应(模拟同步 RPC 调用)  CompletionStage<Object> future =  Patterns.ask(selection, new HelloRequest("这是来自客户端的问候"), Duration.ofSeconds(10));  // 等待响应结果(阻塞)  future.thenApply(response -> {  if (response instanceof HelloResponse helloResponse) {  return "客户端收到回复: " + helloResponse.reply;  } else {  return "收到未知回复: " + response;  }  })  .exceptionally(ex -> "调用失败: " + ex.getMessage())  .thenAccept(System.out::println).toCompletableFuture().join();  system.terminate();  }  
}

代码说明:

  • ActorSelection是一种 actor地址定位方式,它类似于 DNS 查询,可以根据路径去“找”一个远程 actor
  • Patterns.ask(...) 就像传统 RPC 的同步调用,它封装了发送、等待响应的过程。Duration.ofSeconds(3) 指定超时时间。.get() 阻塞等待结果,实际底层是异步实现。
    Pekko(或 Akka)中,如果你不需要请求-响应(ask),而只是发送消息给 Actorfire-and-forget),你可以直接使用 ActorRef.tell(...) 方法。
// 从 ActorSystem 中选择一个路径为 "/user/helloActor" 的 Actor(可能还没拿到真实引用)
// 注意:这个路径必须匹配一个已存在的 Actor,否则会 resolve 失败
ActorSelection selection = actorSystem.actorSelection("/user/helloActor");// 异步解析 selection,尝试获取对应 Actor 的真正引用 ActorRef(带超时)
CompletionStage<ActorRef> futureRef = selection.resolveOne(Duration.ofSeconds(3));// 当成功获取 ActorRef 后,使用 tell 发送一条消息,不需要返回(fire-and-forget)
futureRef.thenAccept(ref -> ref.tell("你好", ActorRef.noSender()));

flink的RPC框架如何使用

Flink 基于Pekko实现了自己RPC框架。当需要组件间需要使用RPC服务时,只需要定义接口、编写服务端接口逻辑即可。FlinkRpc框架自己会完成接收远程请求、调度线程、安全并发、处理生命周期等工作,让你像写本地对象一样写分布式服务。

本来想直接使用flinkrpc模块创建一个简单的demo项目来说明的,但是由于Flink使用了自定义的类加载器(如 SubmoduleClassLoader)来隔离不同模块(尤其是用户代码、插件、RPC 的动态 proxy 等)导致类不可见的问题

org.flink.MyServiceGateway referenced from a method is not visible from class loader: org.apache.flink.core.classloading.SubmoduleClassLoader

所以找了flink其中一个rpc服务来进行说明

Dispatcher组件

Dispatcher集群调度的中枢组件,它的作用相当于一个集群控制器,负责接收作业、分配作业、启动作业执行组件、以及监控作业生命周期。虽然Dispatcher只是在JobManager内使用,类似
伪分布式一样,但其创建与使用流程和真正的远程RPC组件是一样的。

DIspatcher在集群启动的时候,通过DispatcherFactory创建,StandaloneSession模式下,工厂实现类为SessionDispatcherFactory

下面以Dispatcher组件为例进行说明如何基于flinkrpc框架实现一个rpc服务。

rpc框架使用流程

使用流程大致如下:

  1. 定义 RpcGateway 接口作为rpc协议
  2. 继承 RpcEndpoint或者FencedRpcEndpoint 并实现RpcGateWay接口
  3. 使用 RpcService 注册服务(启动服务端)
  4. 使用RpcService连接服务端(获取client)

步骤1.定义 RpcGateway 接口

在这里插入图片描述

Dispatcher的RPC接口类是DispatcherGateway, FencedRpcGatewayRpcGateway的子接口。 rpc方法的返回值必须是 CompletableFuture<T> 类型,这是 Flink RPC 框架的设计要求

步骤2. 实现服务端

StandaloneSession模式下,Dispatcher的实现类是StandaloneDispatcher,该类是Dispacher的子类。Dispatcher类继承FencedRpcEndpoint类并实现DispatcherGateway接口
在这里插入图片描述

RpcEndpointFlink自研RPC`框架中用于实现远程服务端逻辑的抽象类,它帮你处理 RPC 生命周期、消息分发、线程安全调度等问题,其子类只需专注于“我要提供什么服务”即可。

步骤3. 启动服务

通过工厂创建了Dispatcher对象后,调用其start()方法启动服务
在这里插入图片描述

步骤4. 远程调用

提交job的时候,会调用dispatchersubmitJob启动并调度该作业。
在这里插入图片描述

gateway是一个DispatcherGateway对象,通过下面的代码获得到的,相当于Client。
在这里插入图片描述

通过该对象调用接口方法即可发起远程调用。由于Dispatcher的客户端代码从创建到使用的代码分的太散了,不方便说明,下面通过一个简单的示例来描述Client的创建流程。

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);MyServiceGateway gateway = gatewayFuture.get();gateway.sayHello("Flink").thenAccept(System.out::println);

MyServiceGateway.class就是定义的RpcGateway接口, gateway是一个远程代理对象了,调用它就等于远程 RPC 调用!

Client是如何发送消息的

已知flink底层是利用Pekko来实现rpc调用的,再次回顾flink rpc示例代码中可以想到

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);
MyServiceGateway gateway = gatewayFuture.get();
gateway.sayHello("Flink").thenAccept(System.out::println);

该gateway对象发起远程调用,本质上应该是使用了类似下面的代码来发送消息的

CompletionStage<Object> future =  Patterns.ask(selection, "Flink", Duration.ofSeconds(3));

这个gateway对象是由rpcService.connect返回的. rpcService是一个RpcService接口对象,其实现就4个,排除掉测试用的就剩一个 PekkoRpcService了。

connect方法的源码
继续看connect方法的源码,首先会先调用resolveActorAddress解析入参的rpc地址"akka://flink@127.0.0.1:6123/user/myService"得到一个ActorRef对象

private CompletableFuture<ActorRef> resolveActorAddress(String address) {  final ActorSelection actorSel = actorSystem.actorSelection(address);  return actorSel.resolveOne(configuration.getTimeout())  .toCompletableFuture()  .exceptionally(  error -> {  throw new CompletionException(  new RpcConnectionException(  String.format(  "Could not connect to rpc endpoint under address %s.",  address),  error));  });  
}

获取到ActorRef后,使用 Java 的 动态代理机制创建一个实现了MyServiceGateway接口的代理对象 proxy
在这里插入图片描述

既然是动态代理,那就得看Handler方法里面的逻辑了,创建Handler的invocationHandlerFactory代码如下:
在这里插入图片描述

查看对应的invoke方法会看到实际发消息的是invokeRpc
在这里插入图片描述

所以最终actor.tell是在这里被调用的

转换成rpc参数的逻辑如下,只是将被调用方法所需的参数与信息封装成MethodInvocation对象
在这里插入图片描述

Server是接收处理消息的

前面的代码已经知道了client通过Pekko的actor发送了消息,现在要看Server这边是怎么处理的了(找到Actor处理RpcInvocation消息)。

服务端需要继承RpcEndpoint类,并在构建的时候传递rpcService对象`

final RpcService rpcService = ...; // 通常通过 PekkoRpcServiceUtils 创建
final String endpointId = "myService";MyServiceEndpoint endpoint = new MyServiceEndpoint(rpcService, endpointId);
endpoint.start();  // 启动 RPC 服务端

查看RpcEndpoint的构造函数,可以看到利用rpcService对象启动了一个rpcServer
在这里插入图片描述

继续往下看前需要了解Actor是如何处理消息的:
​ActorSystem​​ 是Pekko应用的中央控制枢纽,作为单例容器管理所有Actor的层级结构和生命周期。当发送消息给远程Actor时,ActorSystem会自动将消息序列化并通过网络传输到目标节点,在远程节点反序列化后放入目标ActorMailbox队列,最终由目标节点的ActorSystem调度线程执行消息处理,整个过程对开发者完全透明,如同本地调用一般。

可以粗略的认为:一个Actor等同于一个Server端(轻量级),Actor内有一个队列,当有新的消息从客户端发送过来就放到该队列中。然后有一个线程不断从队列中取消息,然后调用该 Actor 的 createReceive() 所定义的行为处理消息。

了解的Actor是如何接收信息后,继续看PekkoRpcServicestartServer方法,其中调用下面的方法,通知另一个Actor来创建本RpcEndpoin对应的Actor
在这里插入图片描述

那么就要找出负责创建Actor的这个supervisor(Actor)在哪里,才能继续往下看了。

很容易就可以看到PekkoRpcService对象它的构造函数中调用下面的函数找到对应的Actor的具体类型
在这里插入图片描述

查看SupervisroActor类的createReceive()就可以看到真正创建actor的逻辑了

@Override  
public Receive createReceive() {  return receiveBuilder()  .match(StartRpcActor.class, this::createStartRpcActorMessage)  .matchAny(this::handleUnknownMessage)  .build();  
}

flink rpc框架中,所有RpcEndpoint对应的Actor的类型都是PekkoRpcActor, 只是名字不一样而已。在PekkoRpcActorCreateReceive()可以看到与Client发送过来的RPC消息相对应的处理逻辑。
在这里插入图片描述

在这里插入图片描述

通过反射调用方法,此处的rpcEndpoint就是我们继承了RcpEndpoint的对象
在这里插入图片描述

到此,我们就知道了服务端的业务方法是如何被调用了。

RpcService的作用

在前面介绍 Flink 中 Client 与 Server 如何工作的过程中,我们可以看到其底层是通过 Pekko实现远程通信的。但在调用流程中,业务代码中并没有直接与 ActorSystemActorRef 等 Pekko 原生类打交道。这是因为 Flink 通过一层抽象 RpcService优雅地屏蔽了底层通信实现的细节

// 1. 创建 RpcService(基于 Pekko 实现)
RpcService rpcService = ...;// 2. 实例化 Dispatcher(继承自 RpcEndpoint)
StandaloneDispatcher dispatcher = new StandaloneDispatcher(rpcService, ...);// 3. 注册服务端
DispatcherGateway dispatcherGateway = rpcService.startServer(dispatcher);// 4. 客户端连接(可在其他进程中执行)
rpcService.connect("pekko://flink@host:6123/user/dispatcher", DispatcherGateway.class);

如果没有 RpcService 这一层抽象,Flink 的组件(如 Dispatcher、JobMaster)之间想要通信,就必须直接操作 Pekko 的底层 API,比如:

  • 使用 ActorSystem 创建 ActorRef
  • 使用 tell()ask() 发送消息;
  • 管理消息序列化和远程地址;
  • 处理超时、线程调度等复杂细节。

这会导致:

  • Actor 概念侵入业务逻辑,开发就需要学习Actor相关的知识;
  • 接口强耦合通信实现,未来若切换通信框架非常困难;
  • 本地调用与远程调用流程不统一,维护复杂。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/85723.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/85723.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka节点注册冲突问题分析与解决

一、核心错误分析 ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner does not match org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode NodeExists问题本质&#xff1a;ZooKeeper中已存在ID为1的broker节…

突破PPO训练效率瓶颈!字节跳动提出T-PPO,推理LLM训练速度提升2.5倍

突破PPO训练效率瓶颈&#xff01;字节跳动提出T-PPO&#xff0c;推理LLM训练速度提升2.5倍 在大语言模型&#xff08;LLM&#xff09;通过长思维链&#xff08;CoT&#xff09;展现出强大推理能力的当下&#xff0c;强化学习&#xff08;RL&#xff09;作为关键技术却面临训练…

【Python】dictionary

1 字典功能 字典是可变容器模型&#xff0c;且可存储任意类型对象&#xff1b; 字典的每个键值对 <key: value> 用冒号 : 分割&#xff0c;每个对之间用逗号(,)分割&#xff0c;整个字典包括在花括号 {} 中 ,格式如下所示&#xff1a; d {key1 : value1, key2 : value…

【python】If 语句

1 使用if 进行条件判断 1.1 检查字符串是否相等 car bmw car BMW # FALSEcar bmw car.upper() BMW # true # 变小写用方法&#xff1a;lower1.2 检查字符串是否不相等 my_car yadeaif my_car ! Audi:print("Buy one! Buy one! Buy one!")1.3 比较数字 answe…

Knife4j 使用详解

一、概述 Knife4j 是一款基于 Swagger 的开源 API 文档工具&#xff0c;旨在为 Java 开发者提供更美观、功能更强大的 API 文档生成、展示和调试体验。它是 Swagger-Bootstrap-UI 的升级版&#xff0c;通过增强 UI 界面和扩展功能&#xff0c;解决了原生 Swagger UI 界面简陋、…

Java excel坐标计算

package com.common.base.util.excel;/*** excel 坐标计算*/ public class UtilExcelPosi {/*** deepseek生成 ExcelProperty(index UtilExcelPosi.pA)*/public final static int pA 0;public final static int pB 1;public final static int pC 2;public final static i…

【JavaWeb】Servlet+JSP 实现分页功能

文章目录 思路数据抽出功能设计 功能模块工具类前端内容用户端数据处理 思路 数据抽出 需要显示的数据&#xff0c;查询的数据抽出&#xff1b;进行分页显示&#xff0c;需要统计抽出的件数&#xff0c;然后根据页面显示尺寸调整显示页面内容&#xff1b; 功能设计 翻页需要…

SpringBoot-准备工作-工程搭建

目录 1.创建空项目 2.检查项目jdk版本 3.检查Maven的全局配置 4.配置项目的字符集 5.创建SpringBoot工程 1.创建空项目 2.检查项目jdk版本 3.检查Maven的全局配置 4.配置项目的字符集 5.创建SpringBoot工程

01、python实现matlab的插值算法,以及验证

import numpy as np from scipy.interpolate import griddata import sys def griddata_wrapper(x, y, v, xq, yq, method): """ 包装scipy的griddata函数,支持单个点或多个点的插值 """ try: # 将输入转换为numpy数组…

React ahooks——useRequest

目录 简介 1. 核心功能 2. 基本用法 3. 高级用法 &#xff08;1&#xff09;轮询请求&#xff08;Polling&#xff09; &#xff08;2&#xff09;防抖&#xff08;Debounce&#xff09; &#xff08;3&#xff09;依赖刷新&#xff08;refreshDeps&#xff09; &#x…

re正则、Xpath、BeautifulSouplxml 区别

目录 1. re 正则表达式2. XPath3. BeautifulSoup + lxml4. 功能特性对比5.对比与建议在网页数据解析中,正则表达式(re)XPath(常结合lxml)BeautifulSoup(常依赖解析器如lxml)是三种主流技术,各有核心差异和适用场景。 1. re 正则表达式 优势:文本匹配效率高,尤其适用于…

教师办工专用 资源包|课件+手抄报+PPT模板+常用表格 PDF格式93GB

如果家里亲戚或朋友有走上教育之路的人&#xff0c;给他这份整合可以减轻不少工作负担&#xff0c;更快地适应教育的节奏。也可以发给孩子的老师让他在平时做个班级活动的参考 《老师教学办工资源包》包括手抄报大全、教学计划、工作总结、培训手册、课程表等教学、办公常用资…

算法第37天| 完全背包\518. 零钱兑换 II\377. 组合总和 Ⅳ\57. 爬楼梯

完全背包 完全背包和01背包的区别 纯完全背包&#xff0c;遍历背包和物品的顺序是可以对调的&#xff0c;只要求得出最大价值&#xff0c;不要求凑成总和的元素的顺序&#xff1b; 01背包&#xff0c;遍历背包和物品的顺序是不可以对调的&#xff08;一维不行&#xff0c;二维…

七彩喜智慧康养平台:重构银发生活的数字守护网

随着社会老龄化程度的不断加深&#xff0c;如何让老年人安享幸福晚年成为社会关注的焦点。 在这一背景下&#xff0c;七彩喜智慧康养平台应运而生&#xff0c;以创新的科技手段和贴心的服务理念&#xff0c;为老年人的生活带来了诸多好处&#xff0c;发挥着重要作用&#xff0…

【设计模式】用观察者模式对比事件订阅(相机举例)

&#x1f4f7; 用观察者模式对比事件订阅(相机举例) 标签&#xff1a;WPF、C#、Halcon、设计模式、观察者模式、事件机制 在日常开发中&#xff0c;我们经常使用 事件机制&#xff08;Event&#xff09; 来订阅图像采集信号。然而当系统日益复杂&#xff0c;多个模块同时需要响…

【数据分析九:Association Rule】关联分析

一、数据挖掘定义 数据挖掘&#xff1a; 从大量的数据中挖掘那些令人感兴趣的、有用的、隐含的、先前未知的 和可能有用的 模式或知识 &#xff0c;并据此更好的服务人们的生活。 二、四类任务 数据分析有哪些任务&#xff1f; 今天我们来讲述其中的关联分析 三、关联分析 典…

AWS Security Hub邮件告警设置

问题 需要给AWS Security Hub设置邮件告警。 前提 已经启用AWS Security Hub。 AWS SNS 创建一个AWS Security Hub告警主题SecurityHub-Topic&#xff0c;如下图&#xff1a; 创建完成后&#xff0c;订阅该主题。 AWS EventBridge 设置规则名SecurityHubFindings-Rules…

(OSGB转3DTiles强大工具)ModelSer--强大的实景三维数据分布式管理平台

1. ModelSer 能帮我们做什么 1.1 最快速的 osgb 发布 3dtiles 服务 测试的速度大于 10G/分钟&#xff0c;且速度基本是线性的&#xff08;100G10分钟&#xff0c;1T100分钟&#xff09;。支持城市级倾斜数据半天内完成服务发布&#xff0c;并支持数据的单块更新。 1.2 支持所见…

《HTTP权威指南》 第5-6章 Web服务器和代理

基本Web服务器请求的步骤 1、建立连接 接受一个客户端连接&#xff0c;或者如果不希望与这个客户端建立连接&#xff0c;就将其关闭。 处理新连接客户端主机名识别&#xff1a;反向DNS查找&#xff0c;将IP地址转换为客户端主机名过ident确定客户端用户&#xff1a;客户端支持…

微信二次开发,对接智能客服逻辑

接口友情链接&#xff0c;点击即可访问。 ## 设备创建与复用机制 首次调用/login/getLoginQrCode需传空appId触发设备创建&#xff0c;响应返回固定设备ID。后续登录必须复用此ID以避免风控&#xff08;同一微信号绑定固定设备&#xff09;。设备类型可选ipad/mac&#xff0c;当…