flink 伪代码

 

import java.util.*;
import java.util.concurrent.*;// 核心接口定义
interface StreamOperator {void open();void processElement(Object element);void close();
}interface SourceFunction extends StreamOperator {void run(SourceContext ctx);
}interface SinkFunction extends StreamOperator {void invoke(Object value);
}// 运行时组件
class JobGraph {private List<StreamOperator> operators = new ArrayList<>();public void addOperator(StreamOperator operator) {operators.add(operator);}public List<StreamOperator> getOperators() {return operators;}
}class ExecutionGraph {private List<ExecutionVertex> vertices = new ArrayList<>();public void addVertex(ExecutionVertex vertex) {vertices.add(vertex);}public List<ExecutionVertex> getVertices() {return vertices;}
}class ExecutionVertex {private StreamOperator operator;private int parallelism;public ExecutionVertex(StreamOperator operator, int parallelism) {this.operator = operator;this.parallelism = parallelism;}public StreamOperator getOperator() {return operator;}
}// 主控节点
class JobManager {private ResourceManager resourceManager = new ResourceManager();private Map<String, JobMaster> runningJobs = new ConcurrentHashMap<>();public String submitJob(JobGraph jobGraph) {String jobId = UUID.randomUUID().toString();JobMaster jobMaster = new JobMaster(jobId, jobGraph);runningJobs.put(jobId, jobMaster);jobMaster.start(resourceManager);return jobId;}
}class JobMaster {private String jobId;private JobGraph jobGraph;private CheckpointCoordinator checkpointCoordinator;public JobMaster(String jobId, JobGraph jobGraph) {this.jobId = jobId;this.jobGraph = jobGraph;this.checkpointCoordinator = new CheckpointCoordinator();}public void start(ResourceManager resourceManager) {// 构建执行图ExecutionGraph executionGraph = buildExecutionGraph(jobGraph);// 申请资源List<TaskSlot> slots = resourceManager.allocateResources(executionGraph);// 部署任务deployTasks(executionGraph, slots);// 启动检查点协调器checkpointCoordinator.start(jobId, executionGraph);}private ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {ExecutionGraph executionGraph = new ExecutionGraph();for (StreamOperator operator : jobGraph.getOperators()) {executionGraph.addVertex(new ExecutionVertex(operator, 2)); // 默认并行度2}return executionGraph;}private void deployTasks(ExecutionGraph executionGraph, List<TaskSlot> slots) {int slotIndex = 0;for (ExecutionVertex vertex : executionGraph.getVertices()) {for (int i = 0; i < vertex.getParallelism(); i++) {Task task = new Task(vertex.getOperator());slots.get(slotIndex++ % slots.size()).deployTask(task);}}}
}// 资源管理
class ResourceManager {private List<TaskManager> taskManagers = new ArrayList<>();public ResourceManager() {// 初始化3个TaskManagerfor (int i = 0; i < 3; i++) {taskManagers.add(new TaskManager(i));}}public List<TaskSlot> allocateResources(ExecutionGraph executionGraph) {List<TaskSlot> slots = new ArrayList<>();for (TaskManager tm : taskManagers) {slots.addAll(tm.getAvailableSlots());}return slots.subList(0, Math.min(slots.size(), executionGraph.getVertices().size()));}
}// 工作节点
class TaskManager {private int id;private List<TaskSlot> slots = new ArrayList<>();public TaskManager(int id) {this.id = id;// 每个TaskManager有2个slotslots.add(new TaskSlot(id + "-1"));slots.add(new TaskSlot(id + "-2"));}public List<TaskSlot> getAvailableSlots() {return new ArrayList<>(slots);}
}class TaskSlot {private String id;private Task runningTask;public TaskSlot(String id) {this.id = id;}public void deployTask(Task task) {this.runningTask = task;task.start();}
}// 任务执行
class Task implements Runnable {private StreamOperator operator;private Thread executionThread;public Task(StreamOperator operator) {this.operator = operator;}public void start() {executionThread = new Thread(this);executionThread.start();}@Overridepublic void run() {operator.open();// 模拟数据处理循环while (true) {Object element = fetchNextElement(); // 从上游获取数据if (element != null) {operator.processElement(element);}}}private Object fetchNextElement() {// 实际从网络或本地队列获取数据return Math.random() > 0.5 ? new Object() : null;}
}// 容错机制
class CheckpointCoordinator {public void start(String jobId, ExecutionGraph executionGraph) {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {triggerCheckpoint(jobId, executionGraph);}, 0, 10, TimeUnit.SECONDS); // 每10秒触发检查点}private void triggerCheckpoint(String jobId, ExecutionGraph executionGraph) {System.out.println("Triggering checkpoint for job: " + jobId);// 1. 通知所有任务开始检查点for (ExecutionVertex vertex : executionGraph.getVertices()) {// 实际实现中会通过RPC通知TaskManager}// 2. 等待所有任务确认// 3. 持久化检查点元数据}
}// 示例应用
public class SimpleFlinkDemo {public static void main(String[] args) {// 1. 创建作业图JobGraph jobGraph = new JobGraph();// 创建数据源SourceFunction source = new SourceFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void run(SourceContext ctx) {// 实际产生数据流}@Overridepublic void processElement(Object element) {// 源操作符不需要处理元素}};// 创建处理算子StreamOperator mapper = new StreamOperator() {@Override public void open() {}@Override public void close() {}@Overridepublic void processElement(Object element) {System.out.println("Processing: " + element);// 实际处理逻辑}};// 创建输出算子SinkFunction sink = new SinkFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void invoke(Object value) {System.out.println("Output: " + value);}@Overridepublic void processElement(Object element) {invoke(element);}};// 构建作业图jobGraph.addOperator(source);jobGraph.addOperator(mapper);jobGraph.addOperator(sink);// 2. 提交作业JobManager jobManager = new JobManager();String jobId = jobManager.submitJob(jobGraph);System.out.println("Job submitted with ID: " + jobId);// 保持主线程运行try {Thread.sleep(60000);} catch (InterruptedException e) {e.printStackTrace();}}
}

1.创建作业图list:source数据源,mapper处理算子,sink输出算子提交

2.加入jobmanager

3.jobmaster 添加一个作业 id:job,里main含有job图

4.生成执行图,里面装的是执行ExecutionVertex

5.给执行图分配slot

6.部署task

执行检查

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/921323.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/921323.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一招快速识别你的电脑是机械硬盘还是固态硬盘

你是否经常觉得电脑开机慢、软件打开卡顿&#xff1f;其中一个关键原因&#xff0c;可能就在于你使用的是机械硬盘&#xff08;HDD&#xff09;还是固态硬盘&#xff08;SSD&#xff09;。固态硬盘读写速度快&#xff0c;能显著提升系统响应速度&#xff1b;而机械硬盘虽然容量…

52核心52线程,Intel下一代CPU憋了个大的

被逼急了的 Intel&#xff0c;可能正在憋大招&#xff01;如大伙儿所见&#xff0c;Intel 这两年日子已经不能用「惨」来形容。其过去引以为傲的 PC 处理器&#xff0c;特别是高性能桌面处理器领域&#xff0c;如今算是彻底被 AMD 打懵了。无他&#xff0c;己方产品是连年摆烂&…

【LeetCode 热题 100】1. 两数之和——(解法二)哈希表

Problem: 1. 两数之和 文章目录整体思路完整代码时空复杂度时间复杂度&#xff1a;O(N)空间复杂度&#xff1a;O(N)整体思路 这段代码旨在高效地解决 “两数之和” 问题。与 O(N^2) 的暴力枚举法相比&#xff0c;此版本采用了一种经典的 “空间换时间” 策略&#xff0c;利用 …

MySQL主从同步--主从复制进阶

MySQL支持一台主库同时向多台从库进行复制&#xff0c;从库同时也可以作为其他从服务器的主库&#xff0c;实现链状复制。1、MySQL支持的binlog二进制日志复制类型- 基于语句&#xff08;statement&#xff09;的复制在主服务器上执行SQL语句&#xff0c;在从服务器上执行同样的…

WPF外部打开html文件

注意&#xff1a;这是一份提供WPF外部浏览器打开html的方法&#xff0c;而不是WPF内部嵌入html 需要通过浏览器打开&#xff0c;否则无法使用地址栏拼接参数的形式操作html 下面是打开html的方法↓string localHtmlPath "C:\Users\pangb\Downloads\Help\帮助文档 - 副本.…

Go初级之十:错误处理与程序健壮性

Go初级之十&#xff1a;错误处理与程序健壮性为什么选这个主题&#xff1f; 错误处理是 Go 语言中一个非常独特且重要的设计哲学。它体现了 Go 的“显式错误处理”思想&#xff0c;与其它语言&#xff08;如 Java/Python&#xff09;的异常机制不同。在实际开发中&#xff0c;几…

Xsens解码人形机器人训练的语言

随着人形机器人在现实世界的应用中变得越来越普遍&#xff0c;了解实现其类似人类运动的技术至关重要。在Xsens我们满怀热情地探索这一领域&#xff0c;致力于为人形机器人训练开发最佳的动作捕捉解决方案。为了帮助您更好地理解所遇到的术语&#xff0c;我们创建了一份概述&am…

25年下载chromedriver.140

前提&#xff1a; 因为我需要用seleium模拟浏览器获取数据&#xff0c;需要用到这个chromedriver 驱动。 1.chrome浏览器版本号 先检查你的chrome 的版本号是多少&#xff0c;就下载对应的 chromedriver 【三个点】--->【帮助】------>【关于 Google chrome 】 我的版本…

深度学习玩游戏, 模型玩游戏,大模型+游戏 llm+game, 机器学习玩游戏,人工智能游戏陪伴,模型陪玩游戏

1. 论文地址 Think in Games: Learning to Reason in Games via Reinforcement Learning with Large Language Models 2. 中文&#xff1a; Think in Games&#xff1a;做一个在王者荣耀中会玩和思考的Agent 3. 我记得几年前&#xff0c;相关文章还是使用dqn算法。玩雅利达小…

并查集|栈

lc1668不能直接跳class Solution { public:int maxRepeating(string sequence, string word) {int k 0, n sequence.size(), wn word.size(), t 0;for (int i 0; i < n - wn; i) {if (sequence.substr(i, wn) word) {t 1;int j i wn;while (j wn < n &&…

问题三ai思路

好的&#xff0c;我把“路线A&#xff1a;分类建模择时”的代码按功能分段给出&#xff0c;并为每段配上简明解释。你可以将这些段落依次粘贴到已完成清洗后的 df 变量之后直接运行。 0. 依赖导入&#xff08;一次即可&#xff09; 作用&#xff1a;导入所需库&#xff1b;后续…

Java第十四幕集合啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦

集合1 Collection接口1.1 集合概述集合是一个装对象的容器。集合中只能存放引用数据类型的对象。集合中有一些大小是固定的&#xff0c;有一些是不固定的。有一些是有序的&#xff0c;有些是无序的。有些可以有重复元素&#xff0c;有一些不可以有重复元素1.2 集合常用方法publ…

硬件基础:串口通信

数据传输方式&#xff08;按位传输方式&#xff09;并行通信通过多条数据线同时传输多个数据位&#xff0c;速度较快但成本高&#xff0c;抗干扰能力弱&#xff0c;适用于短距离通信&#xff0c;如早期的打印机接口。串行通信通过单条或少数数据线逐位传输数据&#xff0c;线路…

从Java全栈到云原生:一场技术深度对话

从Java全栈到云原生&#xff1a;一场技术深度对话 面试官与应聘者互动记录 面试官&#xff1a;你好&#xff0c;欢迎来到我们的面试。先简单介绍一下你自己吧。 应聘者&#xff1a;您好&#xff0c;我叫李明&#xff0c;28岁&#xff0c;硕士学历&#xff0c;有5年Java全栈开发…

158-EEMD-HHT算法

158-EEMD-HHT#EMD #希尔伯特变换-&#xff08;Hilbert- Huang Transform&#xff0c;HHT&#xff09;#集合经验模态分解 EEMD #时频分析 #边际谱代码描述1、利用 集合经验模态分解&#xff08;EEMD&#xff09;方法对信号进行分解&#xff0c;得到模态分量 IMF&#xff1b;2、计…

C#开发中的 token

C# 开发中的 Token 详解 C# 开发中的 Token 详解与示例 1. CancellationToken - 异步取消令牌 示例 1:基础取消机制 示例 2:Web API 中的请求取消 2. JWT Token - 身份验证令牌 示例 1:JWT Token 生成与验证 示例 2:ASP.NET Core JWT 认证配置 3. Access Token - API 访问令…

旅游安全急救实训室助力应急处置技能实战化

随着旅游行业的快速发展&#xff0c;游客安全需求日益突出&#xff0c;应急处置能力已成为旅游服务人才的核心素养之一。在中职教育旅游服务与管理专业中&#xff0c;旅游安全急救实训室作为关键教学场所&#xff0c;正发挥着不可替代的作用。一、旅游安全急救实训室的建设背景…

分布式微服务--ZooKeeper的客户端常用命令 Java API 操作

一、ZooKeeper 客户端常用命令 1. 启动与退出 bin/zkCli.sh -server 127.0.0.1:2181 # 连接客户端 quit # 退出客户端2. 节点操作 # 查看子节点 ls / ls -s / ls /app# 查看节点详细信息 ls2 /app stat /app# 创建节点 create /node1 "…

PID控制技术深度剖析:从基础原理到高级应用(六)

PID 控制技术深度剖析&#xff1a;从基础原理到高级应用 最近在项目中有要开始进行PID的控制了&#xff0c;隔了很久没有做PID控制的东西了&#xff0c;所以想正好借这个机会&#xff0c;温习一下和PID有关的内容。 系列文章目录 PID控制技术深度剖析&#xff1a;从基础原理到…

PCL关键点提取

1. 核心概念:什么是关键点?为什么需要关键点? 关键词:信息冗余、计算效率、突出特征 “想象一下,我们有一片密集的点云,包含几十万个点。如果我们直接在每个点上都计算像FPFH这样的局部特征,计算量会非常大,极其耗时,而且很多点所处的区域(比如平坦的墙面)特征非常…