Flink 并行度的设置

在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下:


🧩 一、Flink 并行度的四个设置层级

层级描述设置方式
Operator Level为某个具体的算子设置并行度operator.setParallelism(n)
Execution Environment Level为整个流处理环境设置默认并行度env.setParallelism(n)
Client Level(提交作业时)通过命令行指定全局并行度flink run -p n
System Level(系统配置)flink-conf.yaml 中定义全局默认值parallelism.default: n

✅ 二、各层级设置详解与示例

1. Operator Level(算子级别)

  • 优先级最高
  • 可以为特定算子设置不同并行度,适用于数据倾斜或资源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");// 单独为 map 算子设置并行度为4
stream.map(new MyMapFunction()).setParallelism(4).print();
✅ 适用场景:
  • 某个算子计算密集,需要更多资源
  • 数据源分区数较少,但后续算子可并行化处理

2. Execution Environment Level(执行环境级别)

  • 设置整个 Job 的默认并行度
  • 如果未对某些算子单独设置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 所有算子默认并行度为4DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print(); // 默认并行度为4
✅ 适用场景:
  • 多数算子使用相同并行度
  • 统一配置便于管理和维护

3. Client Level(客户端提交作业时)

  • 使用命令行参数动态设置并行度
  • 不修改代码即可适配不同运行环境(如测试/生产)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
✅ 适用场景:
  • 快速调整不同集群资源配置
  • 测试阶段快速验证性能

4. System Level(系统级别)

  • flink-conf.yaml 中设置全局默认并行度
  • 对所有提交的作业生效(除非被更高级别覆盖)
🔧 示例(flink-conf.yaml):
parallelism.default: 4
✅ 适用场景:
  • 所有作业共享相同的默认资源配置
  • 避免手动重复设置

📊 三、并行度优先级对比表

设置方式是否推荐场景覆盖关系
Operator Level✅✅✅特定算子优化最高优先级
Execution Environment Level✅✅整体统一配置被 Operator 覆盖
Client Level (-p)动态部署被前两者覆盖
System Level (flink-conf.yaml)⚠️兜底默认值最低优先级

💡 四、并行度设置建议

✅ 推荐做法:

  • 开发/测试环境:使用 .setParallelism()-p 命令行设置较小值(如1~4)
  • 生产环境
    • 使用 flink-conf.yaml 设置基础并行度
    • 使用 env.setParallelism() 明确控制默认值
    • 为关键算子单独设置更高并行度(如窗口聚合、复杂逻辑)

⚙️ 示例组合:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 默认并行度env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(8) // Kafka Source 并行度设为8(等于topic分区数).map(new MyMapFunction()) // 使用默认并行度4.keyBy(keySelector).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowFunction()) // 可选 setParallelism().print();

🧠 五、并行度与资源的关系

并行度TaskManager 数量Slot 数量资源要求
≤ TM × slot✅ 正常运行✅ 正常运行资源充足
> TM × slot❌ 无法启动❌ 无法启动资源不足

✅ 建议:确保总并行度 ≤ 总 slot 数量


📈 六、实际调优建议

场景建议设置
Kafka Source并行度 = Kafka Topic 分区数
Map / FlatMap根据 CPU 利用率设置
Keyed Window Aggregation可适当提高并行度提升吞吐
Join / CoGroup视数据分布决定是否提高并行度
Sink若写入慢可适当增加并行度

✅ 七、完整示例(Java + Shell)

Java 设置(Env + Operator):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);env.fromElements("a", "b", "c").map(x -> x).setParallelism(2) // 覆盖默认值.print();env.execute("Parallelism Example");

Shell 设置(Client Level):

flink run -p 8 -c com.example.MyJob ./myjob.jar

✅ 八、总结

层级用途是否推荐使用
Operator Level控制单个算子并行度✅✅✅ 强烈推荐用于关键路径优化
Execution Environment Level设置默认并行度✅✅ 推荐作为基础配置
Client Level动态设置并行度✅ 适合多环境部署
System Level全局兜底配置⚠️ 推荐配合其他方式使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/81553.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV 笔记(39):频域中的拉普拉斯算子

1. 拉普拉斯算子 在该系列的第八篇文章中&#xff0c;我们曾经介绍过在二维空间拉普拉斯算子的定义为&#xff1a; 这是对函数 的二阶偏导数之和。 2. 拉普拉斯算子的傅里叶变换及其推导 在该系列的第三十二篇文章中&#xff0c;我们曾给介绍过下面的公式 二维连续傅里叶变换&…

入职软件开发与实施工程师了后........

时隔几个月没有创作的我又回来了&#xff0c;这几个月很忙&#xff0c;我一直在找工作&#xff0c;在自考&#xff08;顺便还处理了一下分手的事&#xff09;&#xff0c;到处奔波&#xff0c;心力交瘁。可能我骨子里比较傲吧。我不愿意着急谋生&#xff0c;做我不愿意做的普通…

多卡跑ollama run deepseek-r1

# 设置环境变量并启动模型 export CUDA_VISIBLE_DEVICES0,1,2,3 export OLLAMA_SCHED_SPREAD1 # 启用多卡负载均衡 ollama run deepseek-r1:32b 若 deepseek-r1:32b 的显存需求未超过单卡容量&#xff08;如单卡 24GB&#xff09;&#xff0c;Ollama 不会自动启用多卡 在run…

09、底层注解-@Import导入组件

09、底层注解-Import导入组件 Import是Spring框架中的一个注解&#xff0c;用于将组件导入到Spring的应用上下文中。以下是Import注解的详细介绍&#xff1a; #### 基本用法 - **导入配置类** java Configuration public class MainConfig { // 配置内容 } Configuration Impo…

题解:P12207 [蓝桥杯 2023 国 Python B] 划分

链接 题目描述 给定 40 个数&#xff0c;请将其任意划分成两组&#xff0c;每组至少一个元素。每组的权值为组内所有元素的和。划分的权值为两组权值的乘积。请问对于以下 40 个数&#xff0c;划分的权值最大为多少。 5160 9191 6410 4657 7492 1531 8854 1253 4520 9231126…

配置ssh服务-ubuntu到Windows拷贝文件方法

背景&#xff1a; 在工作中&#xff0c;需要频繁从ubuntu到Windows拷贝文件&#xff0c;但有时间总是无法拷出&#xff0c;每次重启虚拟机又比较麻烦并且效率较低。可以使用scp服务进行拷贝&#xff0c;不仅稳定而且高效&#xff0c;现将配置过程进行梳理&#xff0c;以供大家参…

线程池模式与C#中用法

一、线程池模式解析 1. 核心概念 线程池是一种 管理线程生命周期的技术&#xff0c;主要解决以下问题&#xff1a; 减少线程创建/销毁开销&#xff1a;复用已存在的线程 控制并发度&#xff1a;避免无限制创建线程导致资源耗尽 任务队列&#xff1a;有序处理异步请求 2. …

设置IDEA打开新项目使用JDK17

由于最近在学习Spring-AI&#xff0c;所以JDK8已经不适用了&#xff0c;但是每次创建新项目都还是JDK8&#xff0c;每次调来调去很麻烦 把Projects和SDKs都调整为JDK17即可 同时&#xff0c;Maven也要做些更改&#xff0c;主要是添加build标签 <build><plugins>&…

初识MySQL · 索引

目录 前言&#xff1a; 重温磁盘 认识索引 为什么这么做&#xff0c;怎么做 重谈page 聚簇索引VS非聚簇索引 回表查询 索引分类 前言&#xff1a; 前文我们主要是介绍了MySQL的一些基本操作&#xff0c;增删查改一类的操作都介绍了&#xff0c;并且因为大多数情况下&am…

MySQL——7、复合查询和表的内外连接

复合查询和表的内外连接 1、基本查询回顾2、多表查询3、自连接4、子查询4.1、单行子查询4.2、多行子查询4.3、多列子查询4.4、在from子句中使用子查询4.5、合并查询 5、表的内连和外连5.1、内连接5.2、外连接5.2.1、左外连接5.2.2、右外连接 1、基本查询回顾 1.1、查询工资高于…

MYSQL故障排查和环境优化

一、MySQL故障排查 1. 单实例常见故障 &#xff08;1&#xff09;连接失败类问题 ERROR 2002 (HY000): Cant connect to MySQL server 原因&#xff1a;MySQL未启动或端口被防火墙拦截。 解决&#xff1a;启动MySQL服务&#xff08;systemctl start mysqld&#xff09;或开放…

7GB显存如何部署bf16精度的DeepSeek-R1 70B大模型?

构建RAG混合开发---PythonAIJavaEEVue.js前端的实践-CSDN博客 服务容错治理框架resilience4j&sentinel基础应用---微服务的限流/熔断/降级解决方案-CSDN博客 conda管理python环境-CSDN博客 快速搭建对象存储服务 - Minio&#xff0c;并解决临时地址暴露ip、短链接请求改…

数字图像处理——图像压缩

背景 图像压缩是一种减少图像文件大小的技术&#xff0c;旨在在保持视觉质量的同时降低存储和传输成本。随着数字图像的广泛应用&#xff0c;图像压缩在多个领域如互联网、移动通信、医学影像和卫星图像处理中变得至关重要。 技术总览 当下图像压缩JPEG几乎一统天下&#xff…

抖音视频怎么去掉抖音号水印

你是不是经常遇到这样的烦恼&#xff1f;看到喜欢的抖音视频&#xff0c;想保存下来分享给朋友或二次创作&#xff0c;却被抖音号水印挡住了画面&#xff1f;别着急&#xff0c;今天教你几种超简单的方法&#xff0c;轻松去除水印&#xff0c;高清无水印视频一键保存&#xff0…

RISC-V 开发板 MUSE Pi Pro PCIE 测试以及 fio 崩溃问题解决

视频讲解&#xff1a; RISC-V 开发板 MUSE Pi Pro PCIE 测试以及 fio 崩溃问题解决 板子上有一个m.2的pcie插槽&#xff0c;k1有三个pcie控制器&#xff0c;pcie0和usb3复用一个phy&#xff0c;所以实际开发板就两个&#xff0c;测试的话&#xff0c;上一个nvme硬盘&#xff0c…

超级管理员租户资源初始化与授权管理设计方案

背景说明 在多租户系统中&#xff0c;资源&#xff08;如功能模块、系统菜单、服务能力等&#xff09;需按租户维度进行授权管理。超级管理员在创建新租户时&#xff0c;需要初始化该租户的资源授权信息。 两种可选方案 方案描述方案 A&#xff1a;前端传入选中的资源列表创…

stm32week16

stm32学习 十一.中断 4.使用中断 EXTI的配置步骤&#xff1a; 使能GPIO时钟设置GPIO输入模式使能AFIO/SYSCFG时钟设置EXTI和IO对应关系设置EXTI屏蔽&#xff0c;上/下沿设置NVIC设计中断服务函数 HAL库的使用&#xff1a; 使能GPIO时钟&#xff1a;__HAL_RCC_GPIOx_CLK_EN…

什么是RDMA?

什么是RDMA&#xff1f; RDMA(RemoteDirect Memory Access)技术全称远程直接内存访问&#xff0c;就是为了解决网络传输中服务器端数据处理的延迟而产生的。它将数据直接从一台计算机的内存传输到另一台计算机&#xff0c;无需双方操作系统的介入。这允许高吞吐、低延迟的网络…

golang 安装gin包、创建路由基本总结

文章目录 一、安装gin包和热加载包二、路由简单场景总结 一、安装gin包和热加载包 首先终端新建一个main.go然后go mod init ‘项目名称’执行以下命令 安装gin包 go get -u github.com/gin-gonic/gin终端安装热加载包 go get github.com/pilu/fresh终端输入fresh 运行 &…

【数据结构篇】链式结构二叉树

目录&#xff1a; 一 二叉链的概念与结构&#xff1a; 1.1 概念&#xff1a; 1.2 结构&#xff1a; 二 二叉链的实现&#xff1a; 2.1 二叉树的构建&#xff1a; 2.2 二叉树的遍历&#xff1a; 2.2.1 前序遍历&#xff1a; 2.2.2 中序遍历&#xff1a; 2.2.3 后序遍历…