Java并发26:Phaser高级同步器 - 多阶段并发控制
Phaser简介 Phaser = CountDownLatch + CyclicBarrier的增强版 核心特性: ✅ 可重用(类似CyclicBarrier) ✅ 动态注册/注销线程(优于CyclicBarrier) ✅ 多阶段(支持多个屏障点) ✅ 层次结构(树形Phaser) Phaser phaser = new Phaser(3); // 3个参与者 // 工作线程 new Thread(() -> { doWork1(); phaser.arriveAndAwaitAdvance(); // 等待第1阶段 doWork2(); phaser.arriveAndAwaitAdvance(); // 等待第2阶段 }).start(); 核心方法 Phaser phaser = new Phaser(3); // 到达并等待 int phase = phaser.arriveAndAwaitAdvance(); // 阻塞 // 到达但不等待 int phase = phaser.arrive(); // 不阻塞 // 到达并注销 int phase = phaser.arriveAndDeregister(); // 注销自己 // 动态注册 phaser.register(); // 增加参与者 phaser.bulkRegister(5); // 一次注册5个 // 查询 int phase = phaser.getPhase(); // 当前阶段 int parties = phaser.getRegisteredParties(); // 参与者数 int arrived = phaser.getArrivedParties(); // 已到达数 int unarrived = phaser.getUnarrivedParties(); // 未到达数 boolean terminated = phaser.isTerminated(); // 是否终止 应用场景 场景1:多阶段任务 public class MultiPhaseTask { public void execute() { int parties = 3; Phaser phaser = new Phaser(parties); for (int i = 0; i < parties; i++) { int id = i; new Thread(() -> { System.out.println("Thread-" + id + ": Phase 1"); phaser.arriveAndAwaitAdvance(); // 阶段1结束 System.out.println("Thread-" + id + ": Phase 2"); phaser.arriveAndAwaitAdvance(); // 阶段2结束 System.out.println("Thread-" + id + ": Phase 3"); phaser.arriveAndAwaitAdvance(); // 阶段3结束 }).start(); } } } 场景2:动态参与者 public class DynamicParties { public void run() { Phaser phaser = new Phaser(1); // 初始1个(主线程) for (int i = 0; i < 5; i++) { phaser.register(); // 动态注册 new Thread(() -> { doWork(); phaser.arriveAndDeregister(); // 完成后注销 }).start(); } phaser.arriveAndAwaitAdvance(); // 主线程等待 System.out.println("所有任务完成"); } } 场景3:批处理 public class BatchProcessor { public void process(List<Data> dataList, int batchSize) { Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Batch " + phase + " completed"); return registeredParties == 0; // 无参与者时终止 } }; for (int i = 0; i < dataList.size(); i += batchSize) { List<Data> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size())); phaser.register(); executor.submit(() -> { processBatch(batch); phaser.arriveAndDeregister(); }); } phaser.register(); phaser.arriveAndAwaitAdvance(); // 等待所有批次完成 phaser.arriveAndDeregister(); } } onAdvance回调 Phaser phaser = new Phaser(3) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed, parties: " + registeredParties); return phase >= 2 || registeredParties == 0; // 3个阶段后终止 } }; 返回值: ...