Java并发28:CompletableFuture异步编程 - 优雅的异步处理

Future的局限性 Future<Integer> future = executor.submit(() -> { return compute(); }); Integer result = future.get(); // 阻塞等待 // 问题: // 1. 无法主动完成 // 2. 无法链式调用 // 3. 无法组合多个Future // 4. 无法异常处理回调 CompletableFuture核心API 1. 创建 // 无返回值 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Running"); }); // 有返回值 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; }); // 指定线程池 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; }, executorService); 2. 回调 CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World") // 转换 .thenAccept(System.out::println) // 消费 .thenRun(() -> System.out.println("Done")); // 运行 3. 异常处理 CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Error"); }) .exceptionally(ex -> { System.err.println("Exception: " + ex.getMessage()); return "default"; // 默认值 }) .thenAccept(result -> System.out.println(result)); 4. 组合 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20); // 两个都完成 future1.thenCombine(future2, (a, b) -> a + b) .thenAccept(System.out::println); // 30 // 任一完成 future1.applyToEither(future2, x -> x * 2) .thenAccept(System.out::println); // 全部完成 CompletableFuture.allOf(future1, future2).join(); // 任一完成 CompletableFuture.anyOf(future1, future2).join(); 实战案例 案例1:异步查询聚合 public CompletableFuture<UserInfo> getUserInfo(Long userId) { CompletableFuture<User> userFuture = CompletableFuture.supplyAsync( () -> userService.getUser(userId)); CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync( () -> orderService.getOrders(userId)); CompletableFuture<Address> addressFuture = CompletableFuture.supplyAsync( () -> addressService.getAddress(userId)); return CompletableFuture.allOf(userFuture, ordersFuture, addressFuture) .thenApply(v -> { User user = userFuture.join(); List<Order> orders = ordersFuture.join(); Address address = addressFuture.join(); return new UserInfo(user, orders, address); }); } 案例2:超时控制 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { sleep(5000); // 模拟耗时操作 return "result"; }); try { String result = future.get(3, TimeUnit.SECONDS); // 3秒超时 } catch (TimeoutException e) { future.cancel(true); // 超时取消 return "default"; } // JDK 9+:orTimeout future.orTimeout(3, TimeUnit.SECONDS) .exceptionally(ex -> "default"); 方法对比 方法 描述 是否阻塞 线程池 thenApply 转换结果 否 同当前 thenApplyAsync 转换结果 否 ForkJoinPool thenAccept 消费结果 否 同当前 thenRun 执行操作 否 同当前 thenCompose 链式异步 否 同当前 thenCombine 组合两个 否 同当前 get() 获取结果 是 - join() 获取结果 是 - 最佳实践 1. 指定线程池 ...

2025-11-20 · maneng

Java并发27:CopyOnWriteArrayList - 写时复制容器

核心原理:写时复制(Copy-On-Write) public class CopyOnWriteArrayList<E> { private volatile Object[] array; // 底层数组 public boolean add(E e) { synchronized (lock) { Object[] oldArray = getArray(); int len = oldArray.length; Object[] newArray = Arrays.copyOf(oldArray, len + 1); // 复制 newArray[len] = e; setArray(newArray); // 切换 return true; } } public E get(int index) { return get(getArray(), index); // 无锁读取 } } 核心思想: 读:直接读,无锁 写:复制整个数组,修改副本,切换引用 适用场景 场景1:黑白名单 public class BlackList { private final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); public void add(String ip) { list.add(ip); // 很少写 } public boolean contains(String ip) { return list.contains(ip); // 频繁读 } } 场景2:监听器列表 public class EventPublisher { private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>(); public void addListener(EventListener listener) { listeners.add(listener); // 很少添加 } public void publishEvent(Event event) { for (EventListener listener : listeners) { // 频繁遍历 listener.onEvent(event); } } } 场景3:配置管理 public class ConfigManager { private final CopyOnWriteArrayList<Config> configs = new CopyOnWriteArrayList<>(); public void updateConfig(Config config) { configs.add(config); // 偶尔更新 } public Config getConfig(String key) { for (Config config : configs) { // 频繁查询 if (config.getKey().equals(key)) { return config; } } return null; } } 核心特性 1. 读操作无锁 // 读操作:直接访问volatile数组 public E get(int index) { return get(getArray(), index); // 无synchronized } public Iterator<E> iterator() { return new COWIterator<E>(getArray(), 0); // 快照迭代器 } 2. 写操作加锁 // 写操作:加锁 + 复制 public boolean add(E e) { synchronized (lock) { // 1. 复制原数组 Object[] oldArray = getArray(); int len = oldArray.length; Object[] newArray = Arrays.copyOf(oldArray, len + 1); // 2. 修改副本 newArray[len] = e; // 3. 切换引用(原子操作) setArray(newArray); return true; } } 3. 快照迭代器 // 迭代器基于创建时的快照 Iterator<String> it = list.iterator(); list.add("new"); // 不影响迭代器 while (it.hasNext()) { System.out.println(it.next()); // 不包含"new" } 特点: ...

2025-11-20 · maneng

Java并发26:Phaser高级同步器 - 多阶段并发控制

Phaser简介 Phaser = CountDownLatch + CyclicBarrier的增强版 核心特性: ✅ 可重用(类似CyclicBarrier) ✅ 动态注册/注销线程(优于CyclicBarrier) ✅ 多阶段(支持多个屏障点) ✅ 层次结构(树形Phaser) Phaser phaser = new Phaser(3); // 3个参与者 // 工作线程 new Thread(() -> { doWork1(); phaser.arriveAndAwaitAdvance(); // 等待第1阶段 doWork2(); phaser.arriveAndAwaitAdvance(); // 等待第2阶段 }).start(); 核心方法 Phaser phaser = new Phaser(3); // 到达并等待 int phase = phaser.arriveAndAwaitAdvance(); // 阻塞 // 到达但不等待 int phase = phaser.arrive(); // 不阻塞 // 到达并注销 int phase = phaser.arriveAndDeregister(); // 注销自己 // 动态注册 phaser.register(); // 增加参与者 phaser.bulkRegister(5); // 一次注册5个 // 查询 int phase = phaser.getPhase(); // 当前阶段 int parties = phaser.getRegisteredParties(); // 参与者数 int arrived = phaser.getArrivedParties(); // 已到达数 int unarrived = phaser.getUnarrivedParties(); // 未到达数 boolean terminated = phaser.isTerminated(); // 是否终止 应用场景 场景1:多阶段任务 public class MultiPhaseTask { public void execute() { int parties = 3; Phaser phaser = new Phaser(parties); for (int i = 0; i < parties; i++) { int id = i; new Thread(() -> { System.out.println("Thread-" + id + ": Phase 1"); phaser.arriveAndAwaitAdvance(); // 阶段1结束 System.out.println("Thread-" + id + ": Phase 2"); phaser.arriveAndAwaitAdvance(); // 阶段2结束 System.out.println("Thread-" + id + ": Phase 3"); phaser.arriveAndAwaitAdvance(); // 阶段3结束 }).start(); } } } 场景2:动态参与者 public class DynamicParties { public void run() { Phaser phaser = new Phaser(1); // 初始1个(主线程) for (int i = 0; i < 5; i++) { phaser.register(); // 动态注册 new Thread(() -> { doWork(); phaser.arriveAndDeregister(); // 完成后注销 }).start(); } phaser.arriveAndAwaitAdvance(); // 主线程等待 System.out.println("所有任务完成"); } } 场景3:批处理 public class BatchProcessor { public void process(List<Data> dataList, int batchSize) { Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Batch " + phase + " completed"); return registeredParties == 0; // 无参与者时终止 } }; for (int i = 0; i < dataList.size(); i += batchSize) { List<Data> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size())); phaser.register(); executor.submit(() -> { processBatch(batch); phaser.arriveAndDeregister(); }); } phaser.register(); phaser.arriveAndAwaitAdvance(); // 等待所有批次完成 phaser.arriveAndDeregister(); } } onAdvance回调 Phaser phaser = new Phaser(3) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed, parties: " + registeredParties); return phase >= 2 || registeredParties == 0; // 3个阶段后终止 } }; 返回值: ...

2025-11-20 · maneng

Java并发25:Semaphore与Exchanger - 控制并发与数据交换

Semaphore:信号量 核心概念:控制同时访问资源的线程数 Semaphore semaphore = new Semaphore(3); // 3个许可证 semaphore.acquire(); // 获取许可证(阻塞) try { // 访问资源 } finally { semaphore.release(); // 释放许可证 } 应用场景 场景1:限流(数据库连接池) public class ConnectionPool { private final Semaphore semaphore; private final List<Connection> connections; public ConnectionPool(int size) { this.semaphore = new Semaphore(size); this.connections = new ArrayList<>(size); for (int i = 0; i < size; i++) { connections.add(createConnection()); } } public Connection getConnection() throws InterruptedException { semaphore.acquire(); // 获取许可证 return connections.remove(0); } public void releaseConnection(Connection conn) { connections.add(conn); semaphore.release(); // 释放许可证 } } 场景2:限制并发访问 ...

2025-11-20 · maneng

Java并发24:CountDownLatch与CyclicBarrier - 线程协作工具

CountDownLatch:倒计时门闩 核心概念:等待N个事件完成后,才能继续 CountDownLatch latch = new CountDownLatch(3); // 初始化计数为3 // 工作线程 new Thread(() -> { doWork(); latch.countDown(); // 计数-1 }).start(); // 主线程等待 latch.await(); // 阻塞,直到计数为0 System.out.println("所有任务完成"); 应用场景 场景1:等待多个服务启动 public class ApplicationStarter { private final CountDownLatch latch = new CountDownLatch(3); public void start() throws InterruptedException { // 启动数据库 new Thread(() -> { initDatabase(); latch.countDown(); }).start(); // 启动缓存 new Thread(() -> { initCache(); latch.countDown(); }).start(); // 启动MQ new Thread(() -> { initMQ(); latch.countDown(); }).start(); // 等待所有服务启动完成 latch.await(); System.out.println("应用启动完成"); } } 场景2:并行任务聚合 ...

2025-11-20 · maneng

Java并发23:ConcurrentHashMap原理 - 线程安全的HashMap

HashMap的线程安全问题 Map<String, String> map = new HashMap<>(); // 多线程并发put map.put("key1", "value1"); // 线程1 map.put("key2", "value2"); // 线程2 // 问题1:数据丢失 // 问题2:死循环(JDK 7) // 问题3:数据覆盖 三种解决方案: // 方案1:Hashtable(废弃) Map<String, String> map = new Hashtable<>(); // synchronized,性能差 // 方案2:Collections.synchronizedMap Map<String, String> map = Collections.synchronizedMap(new HashMap<>()); // 性能差 // 方案3:ConcurrentHashMap(推荐) Map<String, String> map = new ConcurrentHashMap<>(); // 高性能 JDK 7实现:分段锁 核心思想:将Map分成多个Segment,每个Segment独立加锁 ConcurrentHashMap ├─ Segment[0] (ReentrantLock) │ ├─ HashEntry[0] │ ├─ HashEntry[1] │ └─ ... ├─ Segment[1] (ReentrantLock) │ ├─ HashEntry[0] │ └─ ... └─ ... 优点:降低锁粒度,提高并发度 缺点: ...

2025-11-20 · maneng

Java并发22:BlockingQueue详解 - 线程安全的队列

BlockingQueue核心方法 操作 抛异常 返回值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 删除 remove() poll() take() poll(time, unit) 检查 element() peek() - - 核心区别: put/take:阻塞,生产者-消费者模式常用 offer/poll:不阻塞,适合轮询场景 常用实现类 1. ArrayBlockingQueue // 有界队列,数组实现,FIFO BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); // 特点 ✅ 有界,防止OOM ✅ 性能稳定 ❌ 容量固定,无法扩展 2. LinkedBlockingQueue // 可选有界/无界,链表实现 BlockingQueue<String> bounded = new LinkedBlockingQueue<>(10); // 有界 BlockingQueue<String> unbounded = new LinkedBlockingQueue<>(); // 无界 // 特点 ✅ 容量可选 ✅ 吞吐量高 ❌ 无界模式可能OOM 3. PriorityBlockingQueue // 优先级队列,无界 BlockingQueue<Task> queue = new PriorityBlockingQueue<>(); class Task implements Comparable<Task> { int priority; @Override public int compareTo(Task other) { return Integer.compare(other.priority, this.priority); // 高优先级优先 } } // 特点 ✅ 按优先级出队 ❌ 无界,可能OOM 4. SynchronousQueue // 不存储元素,直接交付 BlockingQueue<String> queue = new SynchronousQueue<>(); // 特点 ✅ 零容量,直接交付 ✅ 适合传递场景 ❌ 生产者必须等待消费者 5. DelayQueue // 延迟队列 BlockingQueue<DelayedTask> queue = new DelayQueue<>(); class DelayedTask implements Delayed { long executeTime; @Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed other) { return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); } } // 特点 ✅ 延迟执行 ✅ 适合定时任务 生产者-消费者模式 public class ProducerConsumer { private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); // 生产者 class Producer implements Runnable { @Override public void run() { try { for (int i = 0; i < 100; i++) { String product = "Product-" + i; queue.put(product); // 阻塞 System.out.println("Produced: " + product); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // 消费者 class Consumer implements Runnable { @Override public void run() { try { while (!Thread.interrupted()) { String product = queue.take(); // 阻塞 System.out.println("Consumed: " + product); // 处理产品 Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public void start() { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } } 性能对比 队列 吞吐量 是否有界 内存占用 适用场景 ArrayBlockingQueue 中 是 低 通用 LinkedBlockingQueue 高 可选 高 高吞吐 PriorityBlockingQueue 低 否 高 优先级 SynchronousQueue 最高 是 最低 直接交付 DelayQueue 低 否 中 延迟任务 实战案例:任务调度 public class TaskScheduler { private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100); private final ExecutorService executor = Executors.newFixedThreadPool(10); public void submitTask(Task task) { try { taskQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void start() { for (int i = 0; i < 10; i++) { executor.submit(() -> { while (!Thread.interrupted()) { try { Task task = taskQueue.take(); task.execute(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } }); } } } 总结 选择指南: ...

2025-11-20 · maneng

Java并发21:线程池最佳实践 - Executors陷阱与生产配置

Executors的陷阱 阿里巴巴Java开发手册明确规定:禁止使用Executors创建线程池。 三大陷阱 1. FixedThreadPool和SingleThreadExecutor // 源码 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 无界队列 } 问题:队列无界,任务堆积导致OOM 2. CachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 无限线程 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 问题:线程数无限,可能创建大量线程导致OOM 3. ScheduledThreadPool public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); // 内部使用DelayedWorkQueue,无界 } 问题:队列无界,任务堆积 生产级配置模板 模板1:Web服务 @Configuration public class ThreadPoolConfig { @Bean("webExecutor") public ThreadPoolExecutor webExecutor() { int coreSize = Runtime.getRuntime().availableProcessors() * 2; return new ThreadPoolExecutor( coreSize, // 核心线程数 coreSize * 4, // 最大线程数 60L, TimeUnit.SECONDS, // 空闲存活时间 new ArrayBlockingQueue<>(1000), // 有界队列 new NamedThreadFactory("web-pool"), new ThreadPoolExecutor.CallerRunsPolicy() // 降级策略 ); } } // 自定义线程工厂 class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public NamedThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement()); t.setDaemon(false); return t; } } 模板2:异步任务 @Bean("asyncExecutor") public ThreadPoolExecutor asyncExecutor() { return new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 300L, TimeUnit.SECONDS, // 较长的空闲时间 new LinkedBlockingQueue<>(500), // 较大队列 new NamedThreadFactory("async-pool"), (r, executor) -> { // 自定义拒绝策略:记录日志 log.error("Task rejected: {}", r); // 可以保存到数据库或MQ } ); } 异常处理 问题:线程池中的异常不会抛出 ...

2025-11-20 · maneng

Java并发20:ThreadPoolExecutor详解 - 深入线程池源码

核心参数详解 public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程存活时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 ) { ... } 参数说明 corePoolSize(核心线程数): 即使空闲也不会回收 除非设置allowCoreThreadTimeOut(true) maximumPoolSize(最大线程数): 队列满后创建的临时线程 空闲超过keepAliveTime后回收 workQueue(任务队列): // 1. 有界队列(推荐) new ArrayBlockingQueue<>(100) // 2. 无界队列(慎用) new LinkedBlockingQueue<>() // 可能OOM // 3. 同步队列(不存储) new SynchronousQueue<>() // 直接交付给线程 // 4. 优先级队列 new PriorityBlockingQueue<>() // 按优先级执行 任务提交流程 execute() vs submit(): ...

2025-11-20 · maneng

Java并发19:线程池核心原理 - 为什么需要线程池

为什么需要线程池? 直接创建线程的问题: // 每个请求创建一个线程 new Thread(() -> handleRequest()).start(); 三大问题: 创建/销毁开销大:线程创建需要约1MB栈空间,耗时1ms 资源无限制:可能创建上千个线程,耗尽内存 管理困难:无法统一管理和监控 线程池的优势: ✅ 线程复用,降低开销 ✅ 控制并发数,避免资源耗尽 ✅ 统一管理,便于监控 核心组件 public class ThreadPoolExecutor { // 核心参数 int corePoolSize; // 核心线程数 int maximumPoolSize; // 最大线程数 long keepAliveTime; // 空闲线程存活时间 BlockingQueue<Runnable> workQueue; // 任务队列 RejectedExecutionHandler handler; // 拒绝策略 } 工作流程 任务提交 ↓ 线程数 < corePoolSize? ├─ 是 → 创建核心线程执行 └─ 否 → 队列未满? ├─ 是 → 加入队列等待 └─ 否 → 线程数 < maximumPoolSize? ├─ 是 → 创建临时线程执行 └─ 否 → 执行拒绝策略 示例: ...

2025-11-20 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...