✨✨ 欢迎大家来到景天科技苑✨✨
🎈🎈 养成好习惯,先赞后看哦~🎈🎈
🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑
文章目录
- Rust线程池
- rayon 线程池
- 一、Rayon核心API详解
- 1.1 常用方法
- 1.2 常见并行组合子(Combinators)
- 1.3 自定义并行任务
- 二、Rayon线程池ThreadPoolBuilder
- 1. new() 方法
- 2. num_threads() 方法
- 3. thread_name() 方法
- 4. build() 方法
- 5. build_global 方法
- 6. 其他方法
- 7. 它还提供了一些回调函数的设置
- 9. 注意事项
Rust线程池
线程池是一种并发编程的设计模式,它由一组预先创建的线程组成,用于执行多个任务。
线程池的主要作用是在任务到达时,重用已创建的线程,避免频繁地创建和销毁线程,从而提高系统的性能和资源利用率。
线程池通常用于需要处理大量短期任务或并发请求的应用程序。
线程池的优势包括:
• 减少线程创建和销毁的开销:线程的创建和销毁是一项昂贵的操作,线程池通过重用线程减少了这些开销,提高了系统的响应速度和效率。
• 控制并发度:线程池可以限制同时执行的线程数量,从而有效控制系统的并发度,避免资源耗尽和过度竞争。
• 任务调度和负载均衡:线程池使用任务队列和调度算法来管理和分配任务,确保任务按照合理的方式分配给可用的线程,实现负载均衡和最优的资源利用。
rayon 线程池
Rayon 是 Rust 中的一个并行计算库,它可以让你更容易地编写并行代码,以充分利用多核处理器。
Rayon 提供了一种简单的 API,允许你将迭代操作并行化,从而加速处理大规模数据集的能力。
除了这些核心功能外,它还提供构建线程池的能力。
rayon::ThreadPoolBuilder 是 Rayon 库中的一个结构体,用于自定义和配置 Rayon线程池的行为。
线程池是 Rayon 的核心部分,它管理并行任务的执行。
通过使用ThreadPoolBuilder,你可以根据你的需求定制 Rayon 线程池的行为,以便更好地适应你的并行计算任务。
在创建线程池之后,你可以使用 Rayon 提供的方法来并行执行任务,利用多核处理器的性能优势。
一、Rayon核心API详解
Rayon最核心的API是并行迭代器(ParallelIterator),其中包含丰富的方法集。
1.1 常用方法
par_iter():创建并行迭代器(只读)。
par_iter_mut():创建可变并行迭代器。
into_par_iter():消耗数据源,创建并行迭代器。
示例:
use rayon::prelude::*;fn main() {let mut nums = vec![10, 20, 30, 40];// 并行修改元素值nums.par_iter_mut().for_each(|x| *x += 1);println!("{:?}", nums);
}
不能直接在 par_iter 中嵌套 par_iter,否则会阻塞或 panic。使用独立线程池可以避免嵌套并行死锁。
1.2 常见并行组合子(Combinators)
map():并行映射。
filter():并行过滤。
reduce():并行归约,合并结果。
fold():类似reduce,但支持初始状态和结果合并。
find_any() / find_first():并行查找元素。
示例(并行筛选与转换):
use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行筛选出偶数并求平方let squares: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("筛选后元素个数:{}", squares.len());
}
这段代码,报错就是要计算的数据超过i32类型的最大值导致的
我们在创建squares的时候,类型Vec<_>,编译器会默认为i32,计算的数据很大,迭代0到1000000,然后计算偶数的平方,超过i32最大值,导致报错
🚩 解决方案:
创建squares时,指定更大的数据类型:
use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行筛选出偶数并求平方//将squares指定更大的数据类型let squares: Vec<u128> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("筛选后元素个数:{}", squares.len());
}
1.3 自定义并行任务
Rayon提供了更底层的接口,让你可以手动并行执行任务。
1.3.1 join方法
执行两个并行任务,等待任务完成后继续执行。
use rayon::join;fn fib(n: usize) -> usize {if n < 2 {return n;}let (a, b) = join(|| fib(n - 1),|| fib(n - 2));a + b
}fn main() {let result = fib(20);println!("斐波那契数:{}", result);
}
1.3.2 scope方法
并行执行多个互相独立的任务,生命周期灵活控制。
use rayon::scope;fn main() {let mut a = 0;let mut b = 0;scope(|s| {s.spawn(|_| {a = expensive_compute_a();});s.spawn(|_| {b = expensive_compute_b();});});println!("结果:{}, {}", a, b);
}fn expensive_compute_a() -> i32 {100
}
fn expensive_compute_b() -> i32 {200
}
二、Rayon线程池ThreadPoolBuilder
rayon::ThreadPoolBuilder 是 Rayon 库中用于 自定义线程池配置 的结构体,适用于对并发行为有更精细控制需求的场景。
Rayon 默认使用一个全局线程池(rayon::spawn、par_iter 默认使用它),但在某些情况下,我们希望:
控制线程池线程数量;
设置线程名、线程栈大小;
使用多个独立的线程池隔离并发任务;
嵌套 Rayon 调用时避免死锁(多线程池互不干扰);
这时,就需要用到 ThreadPoolBuilder。
ThreadPoolBuilder 是以设计模式中的构建者模式设计的, 以下是一些ThreadPoolBuilder 的主要方法:
1. new() 方法
创建一个新的 ThreadPoolBuilder 实例
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new();
}
2. num_threads() 方法
设置线程池的线程数量。
你可以通过这个方法指定线程池中的线程数,以控制并行度。
默认情况下,Rayon 会根据 CPU 内核数量自动设置线程数。
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().num_threads(4); //设置线程池中的线程数量为4
}
3. thread_name() 方法
为线程池中的线程设置一个名称,这可以帮助你在调试时更容易识别线程。
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().thread_name(|i| format!("worker-{}", i));
}
查看每次执行的线程名
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::thread;fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("my-pool-{}", i)).build().unwrap();pool.install(|| {let v: Vec<_> = (0..1000).into_par_iter().map(|x| {// 获取当前线程名let thread_thread = thread::current();let thread_name = thread_thread.name().unwrap_or("unknown");println!("元素 {} 由线程 {} 处理", x, thread_name);x * x}).collect();println!("结果: {:?}", &v[..10]);});
}
4. build() 方法
通过 build 方法来创建线程池。
这个方法会将之前的配置应用于线程池并返回一个 rayon::ThreadPool 实例。
use rayon::ThreadPoolBuilder;
fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("worker-{}", i)).build().unwrap(); // 使用unwrap()来处理潜在的错误
}
5. build_global 方法
通过 build_global 方法创建一个全局的线程池
不推荐你主动调用这个方法初始化全局的线程池,使用默认的配置就好,记得全局的线程池只会初始化一次。多次调用会 panic
fn main() {rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
}
6. 其他方法
ThreadPoolBuilder 还提供了其他一些方法,用于配置线程池的行为,
如 stack_size() 用于设置线程栈的大小。
设置每个线程的栈大小(单位:字节):
let builder = rayon::ThreadPoolBuilder::new().stack_size(8 * 1024 * 1024); // 8MB
适用于递归深度大、调用栈复杂的程序。
7. 它还提供了一些回调函数的设置
start_handler() 用于设置线程启动时的回调函数等。
线程启动时调用的回调函数,可以用于初始化日志、TLS 等:
let builder = rayon::ThreadPoolBuilder::new().start_handler(|idx| {println!("线程 {} 启动", idx);});
spawn_handler 实现定制化的函数来产生线程。
panic_handler 提供对panic 处理的回调函数。
exit_handler 提供线程退出时的回调。
let builder = rayon::ThreadPoolBuilder::new().exit_handler(|idx| {println!("线程 {} 退出", idx);});
下面这个例子演示了使用 rayon 线程池计算斐波那契数列:
//使用ranyon线程池计算斐波那契数列
fn fib(n: u128) -> u128 {if n == 0 {return 0;}if n == 1 {return 1;}let (a, b) = rayon::join(|| fib(n - 1),|| fib(n - 2));a + b
}fn rayon_threadpool() {let pool = rayon::ThreadPoolBuilder::new().num_threads(10).build().unwrap();pool.install(|| {let result = fib(20);println!("result = {}", result);});
}fn main() {rayon_threadpool();
}
• rayon::ThreadPoolBuilder 用来创建一个线程池。设置使用 10 个线程
• pool.install() 在线程池中运行 fib
• rayon::join 用于并行执行两个函数并等待它们的结果。它使得你可以同时执行两个独立的任务,然后等待它们都完成,以便将它们的结果合并到一起。
通过在 join 中传入 fib 递归任务, 实现并行计算 fib 数列
与直接 spawn thread 相比, 使用 rayon 的线程池有以下优点:
• 线程可重用, 避免频繁创建/销毁线程的开销
• 线程数可配置, 一般根据 CPU 核心数设置
• 避免大量线程造成资源竞争问题
9. 注意事项
build_global() 只能调用一次;多次调用会 panic;
自定义线程池的 .install() 不能跨线程池嵌套调用 .par_iter();
不建议将阻塞操作(如IO)放入线程池中执行,Rayon主要用于CPU密集型任务;
一旦线程池创建完成,线程数不可更改,需重新构建。