Java并发28:CompletableFuture异步编程 - 优雅的异步处理

Future的局限性 Future<Integer> future = executor.submit(() -> { return compute(); }); Integer result = future.get(); // 阻塞等待 // 问题: // 1. 无法主动完成 // 2. 无法链式调用 // 3. 无法组合多个Future // 4. 无法异常处理回调 CompletableFuture核心API 1. 创建 // 无返回值 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Running"); }); // 有返回值 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; }); // 指定线程池 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; }, executorService); 2. 回调 CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World") // 转换 .thenAccept(System.out::println) // 消费 .thenRun(() -> System.out.println("Done")); // 运行 3. 异常处理 CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Error"); }) .exceptionally(ex -> { System.err.println("Exception: " + ex.getMessage()); return "default"; // 默认值 }) .thenAccept(result -> System.out.println(result)); 4. 组合 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20); // 两个都完成 future1.thenCombine(future2, (a, b) -> a + b) .thenAccept(System.out::println); // 30 // 任一完成 future1.applyToEither(future2, x -> x * 2) .thenAccept(System.out::println); // 全部完成 CompletableFuture.allOf(future1, future2).join(); // 任一完成 CompletableFuture.anyOf(future1, future2).join(); 实战案例 案例1:异步查询聚合 public CompletableFuture<UserInfo> getUserInfo(Long userId) { CompletableFuture<User> userFuture = CompletableFuture.supplyAsync( () -> userService.getUser(userId)); CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync( () -> orderService.getOrders(userId)); CompletableFuture<Address> addressFuture = CompletableFuture.supplyAsync( () -> addressService.getAddress(userId)); return CompletableFuture.allOf(userFuture, ordersFuture, addressFuture) .thenApply(v -> { User user = userFuture.join(); List<Order> orders = ordersFuture.join(); Address address = addressFuture.join(); return new UserInfo(user, orders, address); }); } 案例2:超时控制 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { sleep(5000); // 模拟耗时操作 return "result"; }); try { String result = future.get(3, TimeUnit.SECONDS); // 3秒超时 } catch (TimeoutException e) { future.cancel(true); // 超时取消 return "default"; } // JDK 9+:orTimeout future.orTimeout(3, TimeUnit.SECONDS) .exceptionally(ex -> "default"); 方法对比 方法 描述 是否阻塞 线程池 thenApply 转换结果 否 同当前 thenApplyAsync 转换结果 否 ForkJoinPool thenAccept 消费结果 否 同当前 thenRun 执行操作 否 同当前 thenCompose 链式异步 否 同当前 thenCombine 组合两个 否 同当前 get() 获取结果 是 - join() 获取结果 是 - 最佳实践 1. 指定线程池 ...

2025-11-20 · maneng

CompletableFuture实战:异步编程最佳实践

一、为什么需要CompletableFuture? 1.1 传统Future的局限 // Future的问题 ExecutorService executor = Executors.newFixedThreadPool(10); Future<String> future = executor.submit(() -> { Thread.sleep(1000); return "result"; }); // ❌ 只能阻塞等待 String result = future.get(); // 阻塞! // ❌ 无法组合多个异步任务 // ❌ 无法处理异常回调 // ❌ 无法设置超时后的默认值 1.2 CompletableFuture的优势 // ✅ 链式编程 CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World") .thenAccept(System.out::println); // ✅ 任务组合 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B"); CompletableFuture<String> result = future1.thenCombine(future2, (a, b) -> a + b); // ✅ 异常处理 CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("Error"); return "OK"; }).exceptionally(ex -> "Default"); 核心改进: ...

2025-11-19 · maneng

线程池与异步编程:从Thread到CompletableFuture的演进

引子:一个Web服务器的性能优化之路 假设你正在开发一个Web服务器,每个HTTP请求需要启动一个新线程来处理。看似简单的设计,却隐藏着严重的性能问题。 场景A:为每个请求创建新线程 /** * 方案1:为每个请求创建新线程(性能差) */ public class ThreadPerRequestServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); System.out.println("服务器启动,监听8080端口"); while (true) { Socket socket = serverSocket.accept(); // 为每个请求创建新线程 new Thread(() -> { try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } private static void handleRequest(Socket socket) throws IOException { // 模拟请求处理(读取请求、业务处理、返回响应) InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); // 简化处理 byte[] buffer = new byte[1024]; in.read(buffer); String response = "HTTP/1.1 200 OK\r\n\r\nHello World"; out.write(response.getBytes()); } } /* 性能测试(使用JMeter压测): 并发数:1000 请求总数:10000 结果: ├─ 吞吐量:500 req/s ├─ 平均响应时间:2000ms ├─ CPU使用率:60% └─ 内存使用:峰值2GB 问题分析: 1. 线程创建开销大 ├─ 每个线程需要1MB栈空间 ├─ 1000个线程 = 1GB内存 └─ 线程创建/销毁耗时(ms级别) 2. 上下文切换频繁 ├─ 1000个线程竞争CPU ├─ 大量时间花在线程切换 └─ CPU利用率低 3. 系统资源耗尽 ├─ 线程数无限制 ├─ 可能导致OOM └─ 系统崩溃 */ 场景B:使用线程池 /** * 方案2:使用线程池(性能好) */ public class ThreadPoolServer { // 创建固定大小的线程池 private static final ExecutorService executor = Executors.newFixedThreadPool(100); public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); System.out.println("服务器启动,监听8080端口"); while (true) { Socket socket = serverSocket.accept(); // 提交任务到线程池 executor.submit(() -> { try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } private static void handleRequest(Socket socket) throws IOException { // 同上 InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte[] buffer = new byte[1024]; in.read(buffer); String response = "HTTP/1.1 200 OK\r\n\r\nHello World"; out.write(response.getBytes()); } } /* 性能测试(同样的压测条件): 并发数:1000 请求总数:10000 结果: ├─ 吞吐量:5000 req/s ← 提升10倍! ├─ 平均响应时间:200ms ← 降低10倍! ├─ CPU使用率:85% ← 提升25% └─ 内存使用:峰值200MB ← 降低10倍! 优势分析: 1. 线程复用 ├─ 100个线程处理1000个请求 ├─ 无需频繁创建/销毁线程 └─ 节省大量时间和内存 2. 减少上下文切换 ├─ 线程数固定(100个) ├─ 上下文切换次数大幅减少 └─ CPU利用率提升 3. 资源可控 ├─ 线程数有上限 ├─ 内存使用可预测 └─ 系统稳定 */ 性能对比总结 对比维度 直接创建线程 线程池 提升 吞吐量 500 req/s 5000 req/s 10倍 响应时间 2000ms 200ms 10倍 CPU使用率 60% 85% +25% 内存峰值 2GB 200MB 10倍 线程数 1000+ 100 可控 核心洞察: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...