清晰理解 Amdahl’s Law(阿姆达尔定律),这是一条描述并行计算加速能力的核心定律。
定义公式:
S = 1 ( 1 − P ) + P N S = \frac{1}{(1 - P) + \frac{P}{N}} S=(1−P)+NP1
- S S S:加速比(Speedup)
- P P P:程序中可以并行的比例(0 ≤ P ≤ 1)
- N N N:使用的处理器数量
解读:
假设:
- 程序分为两部分:
- 串行部分(1 - P):不能并行,只能在单核上运行;
- 并行部分(P):可在多核上同时运行。
示例:
假设程序有:
- 90% 的代码可以并行(P = 0.9)
- 使用 4 核处理器(N = 4)
带入公式:
S = 1 ( 1 − 0.9 ) + 0.9 4 = 1 0.1 + 0.225 = 1 0.325 ≈ 3.08 S = \frac{1}{(1 - 0.9) + \frac{0.9}{4}} = \frac{1}{0.1 + 0.225} = \frac{1}{0.325} \approx 3.08 S=(1−0.9)+40.91=0.1+0.2251=0.3251≈3.08
➡ 即使并行部分很多,也无法得到理想的4倍加速,因为还有10%是串行的。
结论与限制:
- 当 N → ∞ N \to \infty N→∞,加速上限变为:
S max = 1 1 − P S_{\text{max}} = \frac{1}{1 - P} Smax=1−P1 - 举例:如果 P = 0.95 P = 0.95 P=0.95,那么最多加速到 20 倍,不管你有多少核!
实际意义:
- **强扩展性(Strong Scaling)**分析的重要工具。
- 提醒我们:优化串行部分同样重要。
- 说明“更多核” ≠ “更高效”,受限于串行瓶颈。
import matplotlib.pyplot as plt
import numpy as np
# Define a range of processor counts
N = np.linspace(1, 64, 200) # from 1 to 64 processors
# Define different values of P (parallelizable portion)
P_values = [0.5, 0.75, 0.9, 0.95, 0.99]
# Create plot
plt.figure(figsize=(10, 6))
for P in P_values:S = 1 / ((1 - P) + P / N)plt.plot(N, S, label=f'P = {P}')
plt.title("Amdahl's Law: Speedup vs Number of Processors")
plt.xlabel("Number of Processors (N)")
plt.ylabel("Speedup (S)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
这张图展示了 Amdahl’s Law 的核心含义:随着处理器数量增加,程序加速比的提升会逐渐趋于上限,而这个上限由程序中串行部分的比例决定。
图中要点说明:
- 每条曲线代表不同的并行比例 P P P(越接近 1 表示程序越适合并行化)
- 横轴:处理器数量 N N N(1 到 64)
- 纵轴:加速比 S S S
观察分析:
并行比例 P P P | 最大加速极限 lim N → ∞ S \lim_{N \to \infty} S limN→∞S |
---|---|
0.5 | 1 1 − 0.5 = 2 \frac{1}{1 - 0.5} = 2 1−0.51=2 |
0.75 | 1 0.25 = 4 \frac{1}{0.25} = 4 0.251=4 |
0.90 | 1 0.10 = 10 \frac{1}{0.10} = 10 0.101=10 |
0.95 | 1 0.05 = 20 \frac{1}{0.05} = 20 0.051=20 |
0.99 | 1 0.01 = 100 \frac{1}{0.01} = 100 0.011=100 |
Rule 1:尽可能地并行化程序(Parallelize as Much as Humanly Possible)
这是最基本的并行编程原则:
“尽你所能,并行化!”
原因在于,并行计算的性能收益很大程度上取决于你能有效地将工作拆分成可以同时执行的子任务(参见 Amdahl’s Law)。
四大灾难(The 4 Horsemen of the Apocalypse)
1. Starvation(饥饿)
并发工作不足,导致资源利用率低。
- 比如你有 8 核 CPU,但只生成了 2 个任务去运行。
- 其余 6 个核处于闲置状态 → 浪费计算资源。
** 解法:** 更细粒度的任务划分、更好的负载均衡。
2. Latency(延迟)
因资源访问距离远导致的响应延迟。
- 比如数据在远程服务器或其他 NUMA 节点上。
- 即使并行了,访问远程内存或 I/O 的延迟也会成为瓶颈。
** 解法:** 预取、局部性优化(locality optimization)、async I/O、future/promise。
3. Overheads(开销)
管理并行任务/资源带来的额外负担。
- 比如线程创建/销毁、任务调度、同步、内存分配等。
- 有时这些管理代码在“关键路径”上,可能比串行还慢。
** 解法:** 线程池、task-stealing、避免过度切分任务。
4. Contention(资源争用)
多个线程/任务竞争同一资源导致的等待。
- 比如多个线程锁住同一个队列、文件、数据库连接等。
- 这会让任务阻塞,降低并行效率。
** 解法:** - 避免共享(no sharing is best sharing)
- 使用 lock-free 数据结构、分片(sharding)、读写锁
总结口诀:
并行是加速的源泉,但四骑士是效率的噩梦。
现实中并行编程常见的一些深层次结构性问题。这些问题不仅仅是代码写得对不对,而是并行模型本身的局限性。我们来一一理解:
现实问题解析(Real-world Problems)
1. 并行性不足(Insufficient parallelism imposed by the programming model)
即使硬件足够强大,编程模型本身可能强制你“等别人”,导致资源闲置:
- OpenMP:
#pragma omp parallel for
在每次并行循环后都有一个隐式 barrier(栅栏)→ 所有线程必须等到最慢的线程完成才能继续。 - MPI:每一步模拟后,通常需要一个**全局同步通信(barrier)**来更新状态。
本质问题:强同步点,阻碍了持续并行化。
2. 过度同步(Over-synchronization)
并不是算法要求同步这么多,而是编程模型(或粗暴实现)同步了太多不该同步的东西。
- 在 MPI 中,不同节点(rank)往往会因为逻辑写法或者惯性思维,被迫走“锁步”逻辑(step lock-step):
Rank 0 send → Rank 1 recv → all wait
结果:并发资源未被充分利用。最快的线程/节点被最慢的拖着走。
3. 缺乏协调(Insufficient coordination between on-node and off-node parallelism)
即使你写了并行代码,也常常只针对一部分层级(on-node / off-node):
- MPI 管远程节点(off-node)
- OpenMP 管本地线程(on-node)
- 加速器用 CUDA / SYCL 管 GPU
如果你这三者之间缺乏统一协调和协同优化,就很难实现真正高效的混合并行(hybrid parallelism)。
4. 并行模型割裂(Distinct programming models for different types of parallelism)
这点尤为痛苦:
- 节点间通信:MPI
- 节点内线程:OpenMP / std::thread
- GPU 并行:CUDA / OpenCL / SYCL
问题在于:你必须使用完全不同的 API、语义和调试工具链,几乎像写三套程序一样,非常容易出错、难以维护、难以调优。
总结建议:
这些问题说明了传统并行模型的局限,也解释了为什么:
- 高性能计算(HPC)程序维护困难;
- 跨平台 / 异构计算开发成本高;
- 新兴的异步并行模型(如 HPX, Kokkos, RAJA, SYCL)正在兴起。
如果你正在学习并行计算或实际工程中需要设计并行架构,可以优先考虑: - 减少强同步点
- 最小化共享状态
- 使用任务并行而非仅仅线程并行
- 学习统一模型如 HPX、SYCL 或 C++ Parallelism TS
并行编程中一个非常重要的限制性模式:Fork/Join 并行模型,以及它在 C++17 中的标准并行算法中的影响。
什么是 Fork/Join 并行模型?
Fork/Join 是一种非常经典、简单的并行执行模式,流程如下:
- Fork(分叉):将一个任务拆分成多个子任务,并行执行;
- Join(汇合):所有子任务执行完之后,主线程等待它们结束,然后继续执行后续逻辑。
示例(伪代码):
parallel_for(0, N, [](int i) {do_work(i);
});
// 等所有 do_work(i) 完成之后,才继续后续代码
为什么 Fork/Join 模型有问题?
1. 强制同步点(Join)影响性能
即使某些子任务已经完成,也不能继续执行下一阶段,必须等所有任务结束。
- 类似 OpenMP
#pragma omp parallel for
中的隐式 barrier。 - C++17
std::execution::par
也是类似模式。
2. 不支持异步流水线(pipelining)
Fork/Join 不支持“一个任务的结果立即传递给下一个”,缺少流式/事件驱动模型的灵活性。
C++17 中的情况:
虽然 C++17 增加了标准并行算法:
std::for_each(std::execution::par, vec.begin(), vec.end(), func);
但是这些都是 Fork/Join 模式!
- 算法会将
vec
拆分并发执行; - 一旦调用结束,意味着所有并发任务都执行完成了。
所以,它虽然并行了,但仍然是“同步完成”的。
与现代异步模型的对比(如 HPX):
- Fork/Join:结构简单,但强同步。
- Asynchronous Task Graph(异步任务图):如 HPX/TBB/async/
co_await
- 支持任务依赖、懒执行、延迟调度
- 没有全局同步点,吞吐量更高,延迟更低
总结
模型 | 优点 | 缺点 |
---|---|---|
Fork/Join | 实现简单,语义清晰 | 有全局同步点,性能瓶颈 |
异步任务图 | 高吞吐,支持流水线 | 实现更复杂,调试更困难 |
如果你希望更深入了解“如何用现代 C++ 编写异步/事件驱动的并行代码”,我可以: |
- 演示
std::async
/std::future
- 讲解
HPX
、unifex
、std::execution
提案 - 或者对比不同并行库(OpenMP vs TBB vs HPX)
Rule 2:使用支持 SLOW 的编程环境
SLOW = Small, Light-weight Overhead Work
→ 也就是说,我们需要一个支持极小、极轻量任务的并行运行时。
思维实验:Overheads vs. Grain Size
你提到的图展示的是:
在不同粒度(每个线程的工作量)下,不同开销(每个线程的开销)对总执行时间的影响
- 小任务粒度 + 低开销环境 → 最优扩展性
- 任务过多时(比如上千万):
- 无法创建那么多系统线程(kernel threads)
- 调度和同步成本极高
- 难以编写和维护
面临的问题:
问题 | 说明 |
---|---|
无法使用那么多 pthread | 系统线程数量有限,内核调度开销大 |
无法管理上千万个任务 | 思维负担大,bug 容易产生 |
需要任务调度抽象机制 | 必须引入运行时调度器,例如 HPX/TBB/task system |
解决方案:使用任务抽象系统
支持 SLOW 的并行环境应该提供:
- 轻量级任务调度(例如用户级线程、fiber、coroutine)
- 任务窃取(work stealing)调度算法
- 惰性执行 / 延迟调度(减少无用调度)
- 自动粒度控制(例如 HPX 的
adaptive_chunk_size
)
推荐工具(支持 SLOW 的环境):
工具/库 | 特性 |
---|---|
HPX | 真正异步、基于 futures、细粒度调度 |
Intel TBB | 自动任务粒度控制 + work stealing |
OpenMP Task | 基本任务调度,但缺乏高级抽象 |
std::async / std::execution | C++ 标准支持,仍偏粗粒度 |
总结:
- 传统的线程 = 粗粒度、高开销,不适合千万级任务
- 现代异步并行编程必须支持大量小任务
- C++ 开发中,应选择具有用户级任务调度器的框架来支持高并发下的高效执行
C++ 中的 future
(未来值) 概念及其在异步编程中的作用。以下是详细解释和理解:
什么是 future?
future 是一个代表“尚未计算出结果”的对象。
换句话说,它是一个“占位符” —— 将来某个时候会有值,但现在你还不能访问它。
#include <future>
#include <iostream>
int universal_answer() { return 42; }
void deep_thought() {std::future<int> promised_answer = std::async(&universal_answer);// ...做其它事情(比如思考750万年)std::cout << promised_answer.get() << std::endl; // 会等待结果并输出 42
}
特性总结
特性 | 说明 |
---|---|
透明地同步 | future.get() 会自动等待结果准备好(不需要显式管理线程)。 |
隐藏线程细节 | 不需要自己启动、管理或 join 线程,std::async 会帮你处理。 |
便于管理异步任务 | 可以把多个异步任务组合在一起。 |
使并发更易转化为并行 | 因为你声明了“我需要这个值”,系统可以选择在另一个线程中并行计算它。 |
图示逻辑(解释幻灯片中图):
Locality 1(主线程/消费者)
- 创建
future
并触发async
- 如果你调用
.get()
但结果未就绪,消费者线程会挂起等待 - 等到有结果时被唤醒继续执行
Locality 2(工作线程/生产者) - 在后台线程计算结果
- 完成后把结果交给
future
总结
std::future
是 C++ 异步编程的基础设施之一,能有效:
- 异步启动任务(通过
std::async
等) - 延迟或等待结果(通过
.get()
) - 降低并发编程难度(无需直接操作线程)
你可以把它当成“一张票” —— 将来凭票可领取某个值,现在可以先做别的事。
递归并行(Recursive Parallelism)的代码示例。
代码示例说明:
T tree_node::traverse() {if (has_children()) {std::array<T, 8> results; // 假设每个节点有8个子节点for (int i = 0; i != 8; ++i)results[i] = children[i].traverse(); // 递归遍历每个子节点return combine_results(results, compute_result());}return compute_result(); // 如果是叶子节点,直接计算结果并返回
}
- 递归遍历树结构:
该函数是对树的递归遍历。每个节点调用自己的traverse()
方法,来处理子节点的计算。 - 判断是否有子节点:
has_children()
判断当前节点是否有子节点。如果有,进入递归过程。 - 对子节点递归调用:
用一个std::array
存放8个子节点的结果,遍历每个子节点并递归调用traverse()
。 - 叶子节点处理:
如果当前节点没有子节点(叶子节点),就直接调用compute_result()
计算并返回该节点的结果。 - 合并结果:
当所有子节点都递归完成后,将子节点的结果数组results
与当前节点自身的计算结果compute_result()
合并,得到当前节点的最终结果。
递归并行(Recursive Parallelism)的核心思想:
- 代码示例里,子节点的递归调用写成了顺序执行,但实际上可以把对8个子节点的递归调用并行执行,也就是说:
- 8个
children[i].traverse()
调用可以同时启动,互不阻塞。 - 等待所有子节点的结果都完成后,再合并计算。
- 8个
- 这种方式充分利用多核或分布式系统的并行计算能力,加速树结构的遍历和计算。
总结:
- 递归遍历树:从根节点开始,对每个节点调用
traverse()
。 - 叶子节点直接计算:没有子节点的节点直接计算结果。
- 中间节点合并:有子节点时,先递归获取所有子节点的结果,然后合并这些结果并加上自己计算的结果。
- 递归并行:可以让子节点递归调用并行执行,提升性能。
这段使用异步 future
实现递归树遍历的代码添加详细注释。
T traverse(tree_node const &t)
{if (t.has_children()) {// 用future数组保存8个子节点的异步任务结果std::array<std::future<T>, 8> results;// 对8个子节点,分别异步调用traverse递归遍历for (int i = 0; i != 8; ++i)results[i] = std::async(traverse, t.children[i]);// 计算当前节点自身的结果T r = t.compute_result();// 等待所有异步任务完成,确保子节点结果已准备好wait_all(results);// 把所有子节点结果和当前节点结果合并后返回return t.combine_results(results, r);}// 叶子节点,直接计算并返回结果return t.compute_result();
}
代码逐步理解
- 判断是否有子节点
if (t.has_children())
判断当前节点是否是内部节点(有子节点)。 - 异步启动子节点递归调用
- 使用
std::future<T>
数组保存异步任务。 std::async(traverse, t.children[i])
异步调用traverse
函数遍历第i
个子节点,立即返回future
。- 这样8个子节点的遍历操作几乎同时开始,形成真正的并行。
- 使用
- 计算当前节点自身结果
在等待子节点完成前,先计算当前节点自身的结果,利用计算资源。 - 等待所有子节点任务完成
wait_all(results);
等待所有future
对象完成,确保子节点的计算结果可用。 - 合并结果返回
把所有子节点的结果results
和当前节点自身的结果r
一起传给combine_results()
,得到当前节点的最终结果。 - 叶子节点直接计算
如果没有子节点,直接计算当前节点结果并返回。
总结:
- 这段代码充分利用
std::async
实现递归并行,每个节点的子节点递归遍历可以并发执行。 - 利用异步任务,避免阻塞,提升树遍历效率。
- 代码清晰表达了递归并行模型:先异步发起子任务,再计算自身,最后等待合并。
这个代码片段是在实现递归树遍历的异步版本,用到了一些更高级的异步组合技术,比如 when_all
和 then
,它比之前的 wait_all
更加符合现代异步编程模型,支持链式调用和更灵活的结果处理。
代码解释与理解
future<T> traverse(tree_node const &t)
{if (t.has_children()) {// 用future数组保存8个子节点的异步遍历任务std::array<future<T>, 8> results;// 异步递归调用遍历每个子节点for (int i = 0; i != 8; ++i)results[i] = async(traverse, t.children[i]);// 这里不是阻塞等待,而是把子节点的异步结果和当前节点计算结果组合起来// 先把所有子节点的future合并成一个future(当所有子节点完成时触发)// 再把当前节点的计算结果 t.compute_result() 也作为参数传入return when_all(results, t.compute_result()).then(// 当所有子节点完成并拿到当前节点的结果后,调用combine_results进行合并[](auto f, auto r) { return combine_results(f, r); });}// 叶子节点,直接返回计算结果(future封装的结果)return t.compute_result();
}
关键点解析:
- 返回值变成
future<T>
表示这个函数是异步的,调用它不会立即得到结果,而是返回一个future
,未来某个时间点可以得到计算结果。 async(traverse, t.children[i])
异步启动子节点的递归遍历,返回对应的future<T>
。when_all(results, t.compute_result())
- 这是一个组合函数,等待所有
results
中的future
完成(即所有子节点的遍历结束),同时也把当前节点的compute_result()
作为参数一并传入。 - 返回一个新的
future
,它会在所有子任务和当前节点计算都完成时被触发。
- 这是一个组合函数,等待所有
.then(...)
- 这是一个回调,
when_all
返回的future
完成后会调用这个函数。 - 参数
(auto f, auto r)
分别代表子节点的结果集合和当前节点的计算结果。 - 调用
combine_results(f, r)
来合并所有结果。
- 这是一个回调,
- 叶子节点
如果没有子节点,直接返回当前节点计算的结果,这里应该返回的是future<T>
,也就是说compute_result()
本身返回的是future<T>
,或者会被自动包装成future<T>
。
这样做的好处:
- 纯异步,非阻塞,调用者可以继续做其他事情,等待最终结果。
- 使用了任务组合(
when_all
)和回调链式操作(then
),代码简洁且符合现代异步编程范式。 - 充分利用多核并行能力,树的每个子树都并行计算。
- 避免了显式的等待(
wait_all
),更灵活。
这段代码用到了 C++20 协程(coroutine) 的 co_await
和 co_return
,实现了递归树的异步遍历
future<T> traverse(tree_node const &t)
{if (t.has_children()) {std::array<future<T>, 8> results; // 8个子节点的future数组for (int i = 0; i != 8; ++i)results[i] = async(traverse, t.children[i]); // 异步递归遍历每个子节点// co_await等待所有子节点结果,co_await等待当前节点计算结果,然后combineco_return t.combine_results(co_await results, co_await t.compute_result());}// 叶子节点,直接co_return当前节点计算结果co_return t.compute_result();
}
逐行解析和理解
- 函数返回类型是
future<T>
,且用到了协程关键字co_await
和co_return
,表明这是一个协程函数,支持异步等待。 if (t.has_children()) {
判断当前节点是否有子节点。std::array<future<T>, 8> results;
创建一个长度为8的future
数组,用于存放8个子节点的异步计算结果。for (int i = 0; i != 8; ++i) results[i] = async(traverse, t.children[i]);
异步启动8个子节点的traverse
调用,每个返回一个future<T>
。co_await results
这里假设可以对std::array<future<T>, 8>
做co_await
,它会等待所有子节点的异步任务完成,并得到所有结果。co_await t.compute_result()
等待当前节点自身的异步计算结果。t.combine_results(...)
把子节点们的结果和当前节点的计算结果合并。co_return
返回最终结果。- 叶子节点(没有子节点)直接调用
co_return t.compute_result();
返回当前节点结果。
核心理解:
- 递归调用是异步的,每个子节点都通过
async
异步启动traverse
,所有子任务同时进行,充分利用并行计算能力。 co_await
用来异步等待这些子任务完成,无需阻塞线程,提升效率。co_return
用来返回协程最终结果,包装为future<T>
。- 代码逻辑清晰,简洁且符合现代 C++ 协程编程风格。
额外说明:
- 注意:标准 C++ 并没有直接支持对
std::array<future<T>, 8>
使用co_await
,通常需要实现一个能等待所有子future
完成的辅助函数,比如类似when_all
的协程适配版本。 t.compute_result()
也必须返回future<T>
,或者能被co_await
,即异步计算。- 这个写法非常适合于异步任务图形化展开,适合树形结构的高效异步遍历。
如果需要,我可以帮你写一个co_await
支持等待std::array<future<T>, 8>
的辅助函数示例,或者一个完整的、可运行的异步递归遍历协程示例。你觉得怎么样?
这段代码加上详细注释,并解释代码的整体流程和协程机制的工作原理:
#include <coroutine>
#include <exception>
#include <iostream>
// task<T> 是一个协程返回类型,封装了协程句柄和协程状态
template <typename T>
struct task {struct promise_type; // promise_type 必须声明,用于协程框架using handle_type = std::coroutine_handle<promise_type>;// promise_type 定义了协程的生命周期和状态管理struct promise_type {T value_; // 用来存储协程返回值std::exception_ptr exception_; // 用来存储异常// 返回协程句柄封装的 task 对象task get_return_object() { return task{handle_type::from_promise(*this)};}// 协程启动时不挂起,立即运行协程体std::suspend_never initial_suspend() { return {}; }// 协程结束时挂起(挂起等待外部显式销毁),保证协程状态有效std::suspend_always final_suspend() noexcept { return {}; }// 如果协程中抛出异常,会捕获并保存到 exception_void unhandled_exception() { exception_ = std::current_exception(); }// 协程通过 co_return 返回值时调用,保存结果void return_value(T v) { value_ = v; }};handle_type coro_; // 协程句柄,指向协程状态// 构造函数:接受协程句柄explicit task(handle_type h) : coro_(h) {}// 析构时销毁协程,释放资源~task() {if (coro_) coro_.destroy();}// awaiter 接口:await_ready 返回 true 表示协程结果已经准备好,不挂起bool await_ready() noexcept { return true; }// await_suspend 不会被调用,因为 await_ready 已经返回 truevoid await_suspend(std::coroutine_handle<>) noexcept {}// await_resume 返回协程结果或重新抛出异常T await_resume() {if (coro_.promise().exception_) std::rethrow_exception(coro_.promise().exception_);return coro_.promise().value_;}
};
// 一个模拟异步计算的协程,直接返回传入的值
task<int> compute_result(int v) { co_return v;
}
// 递归协程函数,计算深度值的和
task<int> traverse(int depth) {if (depth == 0) {// 叶子节点,直接等待计算结果(这里直接返回 depth)int res = co_await compute_result(depth);co_return res;} else {// 非叶子节点,递归计算左右子树int left = co_await traverse(depth - 1);int right = co_await traverse(depth - 1);// 计算当前节点的值int self = co_await compute_result(depth);// 返回所有结果的和co_return left + right + self;}
}
int main() {auto t = traverse(3); // 调用递归协程// 直接同步等待并获取结果,输出std::cout << "Result: " << t.await_resume() << "\n";
}
depth=3/ \depth=2 depth=2/ \ / \depth=1 depth=1 depth=1 depth=1/ \ / \ / \ / \
d=0 d=0 d=0 d=0 d=0 d=0 d=0 d=0
代码执行流程和协程机制解释
- 调用
traverse(3)
:- 生成一个协程状态(栈帧和相关数据结构),返回一个
task<int>
对象,包含协程句柄。 - 因为
initial_suspend()
返回std::suspend_never
,协程立即开始执行。
- 生成一个协程状态(栈帧和相关数据结构),返回一个
- 递归执行:
- 如果
depth != 0
,会递归调用traverse(depth-1)
两次,分别生成两个子协程。 - 每个子协程也是立即执行,直到
depth==0
,叶子节点调用compute_result(0)
。
- 如果
compute_result
协程:- 只是一个简单的协程,直接用
co_return v;
返回输入值。 co_await compute_result(...)
表示等待异步计算完成,因await_ready()
返回true
,不会挂起,立即返回结果。
- 只是一个简单的协程,直接用
- 协程间的等待(
co_await
):- 因为
await_ready()
返回true
,所以每次co_await
都是同步执行。 - 子协程完成后会将结果返回给调用方。
- 因为
- 返回和结果累积:
- 每层
traverse
调用将子树计算结果相加,再加上当前节点的计算结果,co_return
返回总和。
- 每层
main()
里的同步获取结果:- 调用
await_resume()
,直接获得最终结果(深度为3的树的所有节点值和)。 - 打印输出。
- 调用
- 协程销毁:
- 因为
final_suspend()
返回std::suspend_always
,协程结束时会挂起,保持状态有效。 task
对象析构时调用coro_.destroy()
释放协程资源。
- 因为
重点理解点
- 协程句柄 (
std::coroutine_handle
) 是对协程状态的引用,负责启动、挂起、恢复和销毁协程。 - promise_type 定义了协程的生命周期控制、返回值保存、异常处理等。
initial_suspend()
和final_suspend()
控制协程开始和结束时是否挂起。co_await
通过 awaiter 接口决定是否挂起协程,何时恢复,如何获取结果。- 这段代码实现了一个同步递归异步执行的示例,异步用协程机制实现,实际不会并行,因为所有
co_await
立即返回结果。
如果你想实现真正的异步并行递归,还需要在await_ready()
返回false
并实现异步调度逻辑(比如事件循环、线程池调度等)。这段代码主要演示了协程语法和递归调用的配合,逻辑清晰,容易理解。
1. 异步通道(Asynchronous Channels)是什么?
- 高层通信抽象
异步通道封装了通信的操作细节,是通信的一种高级抽象。它让程序员不用关注底层的细节,而专注于数据的发送和接收。 - 特别适合异步边界交换
在并行计算中,不同计算节点之间常常需要交换数据。异步通道特别适合这种跨计算单元、异步的通信场景。 - 仿照 Go 语言的通道(Go-channels)设计
Go 语言的通道是一种很流行的并发编程模型,允许不同协程之间通过通道异步传递数据。这里借鉴了这种设计理念。
2. 跨“locality”通信
- “Locality”是什么?
这里的 locality 指的是计算资源的局部范围,比如一个节点、一个核、或者一个进程。 - 通道可以在一个 locality 创建,在另一个 locality 访问
这意味着可以在不同计算节点间通过通道传递消息,实现分布式通信。 - 类似点对点双向通信(P2P communicators)
这和 MPI(消息传递接口)中的点对点通信概念类似,但这里更抽象,更异步。
3. 异步特性
- channel::get() 和 channel::set() 返回 futures
这两种操作分别是从通道读取和向通道写入数据。它们返回 future,意味着调用者可以在等待数据准备好或发送完成期间继续执行其他任务,真正实现了异步非阻塞操作。
总结
异步通道是一个高级的异步通信机制,灵感来源于 Go 语言通道,允许在分布式或并行环境中以异步方式交换数据。它将通信操作封装成返回 futures 的异步接口,方便并行任务高效协作。
future的二维模板(2D stencil)计算的主循环和单步操作。下面我帮你拆解理解关键点。
1. Futurized 2D Stencil: 主循环 (Main Loop)
hpx::future<void> step_future = make_ready_future();
for (std::size_t t= 0; t != steps; ++t)
{step_future = step_future.then([t](hpx::future<void> &&){return perform_one_time_step(t);});
}
step_future.get(); // 等待所有步骤完成
make_ready_future()
创建一个已经完成的 future,作为初始依赖。- 每次循环中,调用
step_future.then(...)
,将上一次步骤的完成作为下一步执行的前提。 - 这样构造一个异步的执行链,每一步执行完后,才执行下一步。
- 最后用
step_future.get()
阻塞等待所有步骤结束。
2. 单步执行:更新边界(Update Boundaries)
hpx::future<void> partition::perform_one_time_step(int t)
{// 从邻居处异步获取上边界数据hpx::future<void> top_boundary_future= channel_up_from.get(t).then([](hpx::future<std::vector<double>> && up_future){std::vector<double> data = up_future.get(); // 不阻塞,直接获取结果// 处理“幽灵区”(ghost-zone)数据// 向邻居发送更新后的幽灵区数据return channel_up_to.set(data);});// 下面还会处理底部、左右边界以及内部区域(见下一节)
}
- 通过异步通道
channel_up_from.get(t)
从上邻居获取数据,得到一个 future。 - 用
.then()
注册回调,回调中处理数据并异步发送给上邻居。 - “幽灵区”是并行计算中邻居分区边界的冗余数据,用于保证计算正确。
- 该过程全部异步执行,不阻塞主线程。
3. 单步执行:内部计算(Interior)
hpx::future<void> interior_future =hpx::parallel::for_loop(par(task), min+1, max-1, [](int idx){// 对每个内部点应用 stencil 操作});
- 使用 HPX 的并行循环
parallel::for_loop
并行地对内部(非边界)点执行 stencil 计算。 - 该操作本身返回一个 future,表示并行计算的完成。
4. 单步执行:汇合(Wrap-up)
return when_all(top_boundary_future, bottom_boundary_future,left_boundary_future, right_boundary_future,interior_future);
- 将所有边界异步操作和内部计算的 futures 合并成一个 future,只有当所有操作完成时该 future 才完成。
- 这样保证一步计算的所有部分都完成。
5. Futurization 技术简介
- Futurization 是一种自动将同步代码转换成异步“未来式(futurized)”代码的技术。
- 通过延迟实际执行,避免不必要的同步阻塞。
- 它将原本顺序执行的代码转成一个代表算法依赖关系的执行树。
- 执行该执行树会获得与原始代码相同的结果,但执行速度会更快(因为最大限度并行)。
6. Futurization 的示意对比
直写代码 (Straight Code) | Futurized 代码 (Futurized Code) |
---|---|
T func() { ... } | future<T> func() { ... } |
T n = func(); | future<T> n = func(); |
future<T> n = async(&func, ...); | |
future<future<T>> n = async(&func, ...); * |
- Futurized 代码中,返回类型变为
future<T>
,表明结果是异步获得的。 - 直接执行代码转变为构建异步任务和依赖的过程。
- 嵌套的
future<future<T>>
会自动展开成future<T>
,方便使用。
总结
- 主循环中,利用 future.then() 构建依赖链,串行执行多个时间步的异步计算。
- 每个时间步,分成异步边界通信和并行内部计算,并用 when_all() 合并完成状态。
- Futurization 技术帮助从直写同步代码无缝转到高效异步执行,利用并行计算和通信最大化性能。