文章目录
- 核心思想:组队出游,人到齐了才出发 🚌
- 最简单易懂的代码示例
- 代码解析
- 运行效果分析
- `CyclicBarrier` vs `CountDownLatch` 的关键区别
- CyclicBarrier在业务系统里面通常有什么常用的应用场景
- 核心应用模式
- 1. 数据并行处理与ETL(最经典)
- 2. 模拟高并发测试
- 3. 多线程数据计算与合并
- 4. 游戏服务器中的多玩家同步
- 总结
核心思想:组队出游,人到齐了才出发 🚌
想象一下,您和几个朋友约好一起去旅游。大家从各自的家里出发,约定在火车站门口集合。CyclicBarrier
就好比是这个“集合点”。
- 规则:必须所有人都到达火车站门口后,大家才能一起进站上车。先到的人必须在门口等着,直到最后一个人到达。
CyclicBarrier barrier = new CyclicBarrier(3);
- 这等于约定了这次出游的小队有 3个人。这个
3
就是需要到达“栅栏”(Barrier)的线程数量。
- 这等于约定了这次出游的小队有 3个人。这个
barrier.await();
(等待)- 每个朋友(线程)到达火车站门口时,就调用一次这个方法。
- 这个方法的意思是:“我到了,我开始等待其他人。” 然后这个线程就会被阻塞。
- 触发开栅:
- 当第3个朋友(最后一个线程)也到达并调用
await()
时,“集结”条件达成! - 栅栏会“打开”,所有之前在
await()
处等待的线程会被同时唤醒,然后大家一起继续执行后面的任务(比如进站)。
- 当第3个朋友(最后一个线程)也到达并调用
- 循环使用 (Cyclic):
- 最关键的是,这个栅栏是可以重复使用的。比如大家进站后,又可以约定在“检票口”作为下一个集合点,再次使用同一个
CyclicBarrier
等待所有人检票后一起上车。
- 最关键的是,这个栅栏是可以重复使用的。比如大家进站后,又可以约定在“检票口”作为下一个集合点,再次使用同一个
最简单易懂的代码示例
下面我们就用代码来模拟 3个朋友约定集合 的场景。我们还会用到 CyclicBarrier
的一个高级功能:当最后一个人到达时,可以指定一个额外的任务(比如由最后一个人高喊“人齐了,出发!”)。
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleCyclicBarrierDemo {public static void main(String[] args) {// 1. 创建一个 CyclicBarrier// 参数1: 参与的线程数量(小队人数为3)// 参数2: 当最后一个线程到达栅栏时,要执行的任务(可选)final int teamSize = 3;final CyclicBarrier barrier = new CyclicBarrier(teamSize, () -> {// 这个任务会由最后一个到达栅栏的线程来执行System.out.println("\n*** 所有人都到齐了!由我(最后到达者)来宣布:出发! ***\n");});// 创建一个线程池来管理我们的朋友线程ExecutorService executor = Executors.newFixedThreadPool(teamSize);System.out.println("--- 3个朋友各自从家里出发 ---");// 2. 模拟3个朋友出发for (int i = 0; i < teamSize; i++) {final String friendName = "朋友-" + (i + 1);executor.submit(() -> {try {// 模拟从家里到集合点的耗时int travelTime = new Random().nextInt(5000) + 1000; // 随机1-6秒System.out.println("[" + friendName + "] 出发了,大概需要 " + travelTime / 1000 + " 秒...");Thread.sleep(travelTime);System.out.println(">>> [" + friendName + "] 到达集合点,开始等待其他人...");// 3. 关键!调用 await() 表示自己已到达,并开始等待// 线程会在这里被阻塞,直到所有3个线程都调用了 await()barrier.await();// --- 当所有人都到达后,线程会从 await() 返回,继续执行 ---System.out.println("<<< [" + friendName + "] 跟随大部队一起出发!");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}// 关闭线程池executor.shutdown();}
}
代码解析
new CyclicBarrier(3, ...)
: 创建了一个需要3个线程“报到”的栅栏。同时还指定了一个“栅栏任务”,这个任务会在第3个线程报到时,由那个线程顺便完成。barrier.await()
: 这是每个线程的“报到点”。先到的线程会在这里停下(阻塞),耐心等待后到的同伴。barrierAction
(第二个参数): 当最后一个(第3个)线程调用await()
时,它不会立刻阻塞,而是先去执行这个我们预设好的任务(打印“所有人都到齐了…”),执行完之后,栅栏才会打开,所有线程(包括它自己)才能继续前进。
运行效果分析
当你运行程序时,你会看到:
- 3个朋友会不按顺序地、在不同时间点打印“到达集合点…”。
- 先到的1、2个朋友在打印完这句话后就不会有任何输出了,因为它们被
await()
阻塞了。 - 直到最后一个朋友也打印了“到达集合点…”,并调用了
await()
后,你会立刻看到:
a. 首先打印出*** 所有人都到齐了!... ***
这条消息(由最后到达的线程执行)。
b. 紧接着,3个线程会几乎同时打印出<<< [朋友-X] 跟随大部队一起出发!
的信息。
CyclicBarrier
vs CountDownLatch
的关键区别
特性 | CyclicBarrier (循环栅栏) | CountDownLatch (倒数门闩) |
---|---|---|
作用对象 | 一组线程相互等待 | 一个或多个线程等待其他线程 |
关系 | 朋友之间,地位平等 | 老板和员工,一个等多个 |
可重用性 | 可循环使用。所有线程通过后,计数器重置,可用于下一轮集合。 | 一次性。计数减到0后就失效了,无法重置。 |
功能 | 功能更强大,可以附加一个barrierAction 任务。 | 功能相对简单。 |
简单总结:CountDownLatch
是一个线程等其他所有线程完事,而 CyclicBarrier
是所有线程互相等,等齐了再一起干下一件事。
CyclicBarrier在业务系统里面通常有什么常用的应用场景
好的,这个问题非常好!从学习一个工具的API到理解它在真实业务系统中的价值,是成为一个优秀工程师的关键一步。
CyclicBarrier
的核心是“让一组线程相互等待,直到所有线程都到达一个公共的屏障点,然后再一起继续执行”。基于这个特性,它在业务系统中有几个非常经典和常用的应用场景。
核心应用模式
所有场景都可以归结为一个模式:将一个大任务拆分成多个独立的子任务,让多个线程并行处理,但在某些关键的“同步点”(屏障),这些线程必须等待彼此,直到所有线程都到达这个点,然后才能一起进入下一个阶段。
下面是几个具体的业务场景:
1. 数据并行处理与ETL(最经典)
这是 CyclicBarrier
最常见的用途,尤其是在大数据和数据仓库领域。想象一下,你需要处理一个巨大的数据文件(例如,几千万条用户数据),整个过程分为三个阶段:
- 数据读取(Read): 从文件中读取数据到内存。
- 数据处理(Process): 对内存中的数据进行清洗、转换、计算。
- 数据写入(Write): 将处理好的数据写入到数据库。
为了提高效率,你可以启动多个线程,每个线程负责处理文件的一部分。但这里的关键是,必须等所有线程都完成了上一个阶段,才能一起开始下一个阶段。
- 场景: 必须等所有线程都读取完毕,数据才算完整,才能开始处理。
- 场景: 必须等所有线程都处理完毕,才能开始写入,以保证数据的一致性。
CyclicBarrier
在这里的作用:
就像一个阶段性的流水线。
- 创建一个
CyclicBarrier
,参与方数量就是你的线程数。 - 每个线程完成自己的数据读取后,调用
barrier.await()
。 - 当最后一个线程也完成读取并调用
await()
后,所有线程被唤醒,一起进入数据处理阶段。 - 处理完后,再次调用
barrier.await()
,等待所有同伴,然后一起进入数据写入阶段。 CyclicBarrier
的“可循环使用”特性在这里得到了完美体现。
// 伪代码
void processData() {// 阶段1:读取数据readMyChunk();barrier.await(); // 等待所有线程读完// 阶段2:处理数据processMyChunk();barrier.await(); // 等待所有线程处理完// 阶段3:写入数据writeMyChunk();
}
2. 模拟高并发测试
在做性能压测时,我们常常需要模拟“在同一瞬间,有大量用户请求同时到达服务器”的场景,以测试系统的瞬时承载能力。
如果只是简单地用一个 for 循环启动几百个线程,那么这些线程的启动时间会有先后,无法做到真正的“同时并发”。
CyclicBarrier
在这里的作用:
就像一个赛跑的起跑线。
- 创建一个
CyclicBarrier
,参与方数量就是并发用户数(比如500)。 - 启动500个线程,每个线程模拟一个用户。
- 每个线程在真正发起HTTP请求之前,先执行各自的准备工作(比如准备参数、建立连接等)。
- 准备工作完成后,每个线程都调用
barrier.await()
。 - 此时,所有500个线程都会在“起跑线”上被阻塞,等待发令枪。
- 当最后一个(第500个)线程也准备好并调用
await()
时,栅栏打开,所有500个线程会几乎在同一时刻被唤醒,然后同时向服务器发起请求,从而达到了模拟瞬时高并发的目的。
3. 多线程数据计算与合并
在科学计算或金融分析等领域,一个复杂的计算任务可以被分解。例如,计算一个大矩阵,可以把矩阵切成好几块,分给不同线程计算。
- 场景:每个线程计算完自己的那一小块后,需要用同伴们计算出的结果来计算下一轮的迭代值。
CyclicBarrier
的作用:确保所有线程都完成了当前轮次的计算,并将结果保存在一个共享位置后,大家才能一起进入下一轮迭代。这保证了每一轮迭代的初始数据都是完整和同步的。
4. 游戏服务器中的多玩家同步
在网络游戏中,CyclicBarrier
也非常有用。
- 场景1:游戏开始前。一个游戏房间需要凑齐比如5个玩家才能开始。服务器可以为这个房间创建一个5个参与方的
CyclicBarrier
。每个玩家客户端加载完地图资源后,就向服务器报到(相当于调用await()
)。当第5个玩家也准备好后,服务器的栅栏打开,向所有5个客户端同时发送“游戏开始”的指令。 - 场景2:回合制游戏。在一回合结束后,需要等待所有玩家都确认“回合结束”,才能一起进入下一回合的准备阶段。
总结
应用场景 | 核心问题 | CyclicBarrier的作用 |
---|---|---|
数据并行处理 | 需要分阶段、按步骤地处理数据,且每一阶段都依赖上一阶段的全部结果。 | 作为阶段同步点,确保所有线程同步进入下一阶段。 |
高并发测试 | 需要模拟大量线程在同一时刻触发某个动作。 | 作为起跑线,将所有线程“压”在同一点,然后同时释放。 |
并行算法 | 算法需要多轮迭代,且每一轮的开始都依赖上一轮所有线程的计算结果。 | 作为迭代同步点,确保所有线程同步进入下一轮迭代。 |
游戏同步 | 需要等待所有参与者都达到某个状态(如加载完成、回合结束)后才能继续。 | 作为玩家状态同步点,确保游戏逻辑同步进行。 |
总而言之,当你遇到一个需要多个对等的线程相互协作,步调一致地完成一个分阶段任务时,CyclicBarrier
就是一个非常理想的选择。