Java并发21:线程池最佳实践 - Executors陷阱与生产配置

Executors的陷阱 阿里巴巴Java开发手册明确规定:禁止使用Executors创建线程池。 三大陷阱 1. FixedThreadPool和SingleThreadExecutor // 源码 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 无界队列 } 问题:队列无界,任务堆积导致OOM 2. CachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 无限线程 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 问题:线程数无限,可能创建大量线程导致OOM 3. ScheduledThreadPool public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); // 内部使用DelayedWorkQueue,无界 } 问题:队列无界,任务堆积 生产级配置模板 模板1:Web服务 @Configuration public class ThreadPoolConfig { @Bean("webExecutor") public ThreadPoolExecutor webExecutor() { int coreSize = Runtime.getRuntime().availableProcessors() * 2; return new ThreadPoolExecutor( coreSize, // 核心线程数 coreSize * 4, // 最大线程数 60L, TimeUnit.SECONDS, // 空闲存活时间 new ArrayBlockingQueue<>(1000), // 有界队列 new NamedThreadFactory("web-pool"), new ThreadPoolExecutor.CallerRunsPolicy() // 降级策略 ); } } // 自定义线程工厂 class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public NamedThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement()); t.setDaemon(false); return t; } } 模板2:异步任务 @Bean("asyncExecutor") public ThreadPoolExecutor asyncExecutor() { return new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 300L, TimeUnit.SECONDS, // 较长的空闲时间 new LinkedBlockingQueue<>(500), // 较大队列 new NamedThreadFactory("async-pool"), (r, executor) -> { // 自定义拒绝策略:记录日志 log.error("Task rejected: {}", r); // 可以保存到数据库或MQ } ); } 异常处理 问题:线程池中的异常不会抛出 ...

2025-11-20 · maneng

创建索引的最佳实践

一、何时创建索引 1.1 适合创建索引的场景 -- 1. WHERE条件列 SELECT * FROM orders WHERE user_id = 123; CREATE INDEX idx_user_id ON orders(user_id); -- 2. ORDER BY排序列 SELECT * FROM products ORDER BY price DESC; CREATE INDEX idx_price ON products(price); -- 3. GROUP BY分组列 SELECT category, COUNT(*) FROM products GROUP BY category; CREATE INDEX idx_category ON products(category); -- 4. JOIN连接列 SELECT * FROM orders o JOIN users u ON o.user_id = u.id; CREATE INDEX idx_user_id ON orders(user_id); -- 5. DISTINCT去重列 SELECT DISTINCT category FROM products; CREATE INDEX idx_category ON products(category); 1.2 不适合创建索引的场景 -- 1. 小表(< 1000行) -- 全表扫描更快 -- 2. 频繁更新的列 -- 维护索引代价高 -- 3. 区分度低的列(选择性 < 0.01) SELECT * FROM users WHERE gender = '男'; -- 50%数据 -- 不如全表扫描 -- 4. 不在WHERE/ORDER BY/JOIN中使用的列 -- 纯粹浪费空间 二、选择索引列的原则 2.1 选择性高的列 -- 计算列的选择性 SELECT COUNT(DISTINCT column) / COUNT(*) AS selectivity FROM table_name; -- 选择性 > 0.1:适合索引 -- 选择性 < 0.01:不适合索引 示例: ...

2025-11-20 · maneng

生产级最佳实践:Java并发编程完整指南

一、核心原则 1.1 安全第一 // 1. 线程安全的优先级 // 正确性 > 性能 > 可读性 // ❌ 错误:追求性能,忽略安全 public class UnsafeCounter { private int count = 0; public void increment() { count++; // 非原子操作,线程不安全 } } // ✅ 正确:优先保证线程安全 public class SafeCounter { private final AtomicInteger count = new AtomicInteger(0); public void increment() { count.incrementAndGet(); // 原子操作,线程安全 } } 1.2 最小化同步范围 // ❌ 错误:同步范围过大 public synchronized void process() { prepareData(); // 无需同步 accessSharedData(); // 需要同步 cleanup(); // 无需同步 } // ✅ 正确:最小化同步范围 public void process() { prepareData(); synchronized (lock) { accessSharedData(); // 只同步必要代码 } cleanup(); } 1.3 不变性优于锁 // ❌ 复杂:使用锁保护可变对象 public class MutablePoint { private int x, y; public synchronized void setX(int x) { this.x = x; } public synchronized int getX() { return x; } } // ✅ 简单:使用不变对象 public final class ImmutablePoint { private final int x, y; public ImmutablePoint(int x, int y) { this.x = x; this.y = y; } public int getX() { return x; // 无需同步 } } 二、线程与线程池最佳实践 2.1 务必使用线程池 // ❌ 错误:直接创建线程 for (int i = 0; i < 10000; i++) { new Thread(() -> task()).start(); // 线程数爆炸 } // ✅ 正确:使用线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, // 核心线程数、最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 10000; i++) { executor.submit(() -> task()); } 2.2 线程池参数计算 // CPU密集型任务 int cpuCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = cpuCount + 1; // I/O密集型任务 int corePoolSize = cpuCount * 2; // 混合型任务(推荐公式) // corePoolSize = N * (1 + WT/ST) // N = CPU核心数 // WT = 等待时间 // ST = 计算时间 int corePoolSize = cpuCount * (1 + waitTime / computeTime); // 队列容量 int queueCapacity = peakQPS * avgExecutionTime; // 最大线程数 int maximumPoolSize = corePoolSize * 2; 2.3 线程命名 // ✅ 使用ThreadFactory自定义线程名 ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .setDaemon(false) .setPriority(Thread.NORM_PRIORITY) .setUncaughtExceptionHandler((t, e) -> { log.error("线程异常:" + t.getName(), e); }) .build(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy() ); 三、锁的最佳实践 3.1 锁的选择 // 1. 读多写少:StampedLock StampedLock sl = new StampedLock(); long stamp = sl.tryOptimisticRead(); // 读取数据 if (!sl.validate(stamp)) { stamp = sl.readLock(); try { // 重新读取 } finally { sl.unlockRead(stamp); } } // 2. 公平性要求:ReentrantLock(true) Lock lock = new ReentrantLock(true); // 3. 简单场景:synchronized synchronized (lock) { // 业务逻辑 } // 4. 高并发计数:LongAdder LongAdder counter = new LongAdder(); counter.increment(); 3.2 锁的粒度 // ❌ 粗粒度锁:性能差 public synchronized void process(String key, String value) { map.put(key, value); // 所有key共用一个锁 } // ✅ 细粒度锁:性能好 private final ConcurrentHashMap<String, Lock> lockMap = new ConcurrentHashMap<>(); public void process(String key, String value) { Lock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { map.put(key, value); // 每个key独立锁 } finally { lock.unlock(); } } 3.3 避免死锁 // ✅ 固定加锁顺序 public void transfer(Account from, Account to, int amount) { Account first, second; if (from.getId() < to.getId()) { first = from; second = to; } else { first = to; second = from; } synchronized (first) { synchronized (second) { // 转账逻辑 } } } // ✅ 使用tryLock超时 if (lock1.tryLock(1, TimeUnit.SECONDS)) { try { if (lock2.tryLock(1, TimeUnit.SECONDS)) { try { // 业务逻辑 } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } 四、并发集合最佳实践 4.1 集合选择 场景 推荐集合 理由 读多写少 CopyOnWriteArrayList 读无锁,性能高 高并发Map ConcurrentHashMap 分段锁,性能好 生产者-消费者 BlockingQueue 自带阻塞,简化代码 优先级队列 PriorityBlockingQueue 支持优先级 延迟队列 DelayQueue 支持延迟 4.2 ConcurrentHashMap正确用法 ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); // ❌ 错误:组合操作不原子 if (!map.containsKey(key)) { map.put(key, value); // 线程不安全 } // ✅ 正确:使用原子方法 map.putIfAbsent(key, value); // ✅ 原子更新 map.compute(key, (k, v) -> (v == null ? 0 : v) + 1); // ✅ 原子替换 map.replace(key, oldValue, newValue); 五、异步编程最佳实践 5.1 CompletableFuture // ✅ 务必指定线程池 ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> { return queryData(); }, executor) // 指定线程池 .thenApplyAsync(data -> { return processData(data); }, executor) .exceptionally(ex -> { log.error("异步任务失败", ex); return defaultValue; }) .thenAccept(result -> { log.info("结果:{}", result); }); // ✅ 并行查询 CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> queryUser(), executor); CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> queryOrders(), executor); CompletableFuture.allOf(userFuture, orderFuture) .thenApply(v -> { UserInfo user = userFuture.join(); List<Order> orders = orderFuture.join(); return new UserDetailDTO(user, orders); }); 5.2 超时控制 // ✅ 设置超时 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowQuery(); }, executor); try { String result = future.get(1, TimeUnit.SECONDS); } catch (TimeoutException e) { log.error("查询超时"); return defaultValue; } // ✅ Java 9+:orTimeout future.orTimeout(1, TimeUnit.SECONDS) .exceptionally(ex -> defaultValue); 六、生产环境配置 6.1 JVM参数 # 通用配置 -Xms4g -Xmx4g # 堆内存 -Xss1m # 线程栈 -XX:+UseG1GC # 使用G1垃圾回收器 -XX:MaxGCPauseMillis=200 # GC暂停时间目标 -XX:ParallelGCThreads=8 # 并行GC线程数 -XX:ConcGCThreads=2 # 并发GC线程数 # 偏向锁(低竞争场景) -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 # 容器环境 -XX:ActiveProcessorCount=2 # 显式指定CPU核心数 -Djava.util.concurrent.ForkJoinPool.common.parallelism=2 # 监控与调试 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/heapdump.hprof -Xloggc:/var/log/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps 6.2 Spring Boot配置 # application.yml server: tomcat: threads: max: 200 # 最大线程数 min-spare: 10 # 最小空闲线程数 accept-count: 100 # 等待队列长度 max-connections: 10000 # 最大连接数 spring: task: execution: pool: core-size: 10 max-size: 20 queue-capacity: 1000 thread-name-prefix: async-pool- 七、监控与告警 7.1 核心监控指标 // 1. 线程池监控 public class ThreadPoolMonitor { @Scheduled(fixedRate = 5000) public void monitor() { // 活跃线程数 int activeCount = executor.getActiveCount(); // 队列大小 int queueSize = executor.getQueue().size(); // 线程利用率 double threadUtilization = (double) activeCount / executor.getPoolSize() * 100; // 队列使用率 double queueUtilization = (double) queueSize / queueCapacity * 100; // 告警 if (threadUtilization > 90) { alert("线程池利用率过高:" + threadUtilization + "%"); } if (queueUtilization > 80) { alert("队列积压严重:" + queueUtilization + "%"); } } } 7.2 Prometheus + Grafana // 使用Micrometer导出指标 MeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); // 监控线程池 ExecutorServiceMetrics.monitor(registry, executor, "business-pool", "app", "my-app"); // 访问指标 // http://localhost:8080/actuator/prometheus 八、问题排查清单 8.1 CPU 100% 1. top -Hp <pid> # 找到CPU高的线程ID 2. printf "%x\n" <tid> # 转换为16进制 3. jstack <pid> | grep "0x<hex-id>" # 查看堆栈 4. 定位代码 → 修复问题 8.2 死锁 1. jstack <pid> | grep "Found one Java-level deadlock" 2. 分析等待链 3. 修复锁顺序 8.3 线程泄漏 1. jstack <pid> | wc -l # 查看线程数 2. jmap -dump:file=heap.hprof <pid> 3. MAT分析Thread对象 4. 定位泄漏点 九、代码审查检查项 9.1 必查项 是否使用线程池? 线程池是否指定线程名? 线程池是否配置拒绝策略? 是否有死锁风险? 是否正确处理中断? 是否有资源泄漏? 异常是否被正确捕获? 是否使用了正确的锁? 锁的粒度是否合理? 是否使用了线程安全的集合? 9.2 性能优化项 读多写少场景是否使用读写锁? 高并发计数是否使用LongAdder? 是否使用了不变对象? 是否最小化同步范围? 是否避免了锁竞争? 十、核心知识图谱 10.1 并发基础 线程基础 ├── 线程创建(Thread、Runnable、Callable) ├── 线程状态(NEW、RUNNABLE、BLOCKED、WAITING、TERMINATED) ├── 线程中断(interrupt、isInterrupted、interrupted) └── 线程通信(wait、notify、notifyAll) 10.2 内存模型 JMM(Java Memory Model) ├── 可见性(volatile、synchronized) ├── 有序性(happens-before) ├── 原子性(AtomicInteger、synchronized) └── CPU缓存(缓存一致性、伪共享) 10.3 同步工具 锁 ├── synchronized(偏向锁、轻量级锁、重量级锁) ├── ReentrantLock(公平锁、非公平锁、可中断) ├── ReadWriteLock(读锁、写锁) └── StampedLock(乐观读、悲观读、写锁) 原子类 ├── AtomicInteger(CAS) ├── LongAdder(分段累加) └── AtomicReference(对象原子操作) 并发集合 ├── ConcurrentHashMap(分段锁、CAS) ├── CopyOnWriteArrayList(读写分离) └── BlockingQueue(阻塞队列) 同步器 ├── CountDownLatch(倒计时) ├── CyclicBarrier(循环栅栏) ├── Semaphore(信号量) └── Phaser(多阶段同步) 10.4 线程池 ThreadPoolExecutor ├── 核心参数(corePoolSize、maximumPoolSize、keepAliveTime) ├── 工作队列(ArrayBlockingQueue、LinkedBlockingQueue) ├── 拒绝策略(AbortPolicy、CallerRunsPolicy) └── 线程工厂(ThreadFactory) 特殊线程池 ├── ScheduledThreadPoolExecutor(定时任务) ├── ForkJoinPool(工作窃取) └── CompletableFuture(异步编程) 十一、学习路径总结 11.1 基础篇(1-10篇) 为什么需要并发 进程与线程 线程生命周期 线程创建方式 线程中断机制 线程通信 线程安全问题 可见性、有序性、原子性 CPU缓存与多核 JMM与happens-before 11.2 原理篇(11-18篇) happens-before规则 volatile原理 synchronized原理 synchronized优化 原子类AtomicInteger CAS与ABA Lock与ReentrantLock ReadWriteLock读写锁 11.3 工具篇(19-28篇) 线程池原理 ThreadPoolExecutor详解 线程池最佳实践 BlockingQueue阻塞队列 ConcurrentHashMap CountDownLatch与CyclicBarrier Semaphore与Exchanger Phaser多阶段同步 CopyOnWriteArrayList CompletableFuture异步编程 11.4 实战篇(29-33篇) CompletableFuture实战 ForkJoinPool工作窃取 无锁编程与LongAdder StampedLock性能优化 并发设计模式 11.5 排查篇(34-39篇) 死锁的产生与排查 线程池监控与调优 JVM线程相关参数 并发问题排查工具 JMH性能测试 生产级最佳实践(本篇) 总结 Java并发编程是一个复杂但重要的技术领域,掌握并发编程需要: ...

2025-11-20 · maneng

生产环境最佳实践清单

部署架构 ✅ 推荐架构 ┌────────────────────────────────────┐ │ Nginx (反向代理 + 健康检查) │ └──────────────┬─────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Spring Cloud Gateway │ │ (网关层限流) │ └──────────┬───────────────────────┘ ↓ ┌──────────────────────────────────┐ │ 微服务集群 │ │ (服务层限流 + 熔断) │ │ ├─ Order Service (3实例) │ │ ├─ Product Service (5实例) │ │ └─ Payment Service (2实例) │ └──────────┬───────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Sentinel Dashboard (高可用) │ │ ├─ Dashboard 1 │ │ └─ Dashboard 2 │ └──────────────────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Nacos (配置中心 + 注册中心) │ │ (规则持久化) │ └──────────────────────────────────┘ 规则配置 1. 限流规则 // ✅ 推荐:分层限流 // 第一层:网关限流(总入口) GatewayFlowRule gatewayRule = new GatewayFlowRule("order-service") .setCount(10000); // 第二层:服务限流(服务级) FlowRule serviceRule = new FlowRule(); serviceRule.setResource("orderService"); serviceRule.setCount(5000); // 第三层:接口限流(接口级) FlowRule apiRule = new FlowRule(); apiRule.setResource("orderCreate"); apiRule.setCount(1000); 2. 熔断规则 // ✅ 推荐:慢调用比例 DegradeRule rule = new DegradeRule(); rule.setResource("callProductService"); rule.setGrade(RuleConstant.DEGRADE_GRADE_RT); rule.setCount(1000); // RT > 1秒 rule.setSlowRatioThreshold(0.5); // 慢调用比例50% rule.setMinRequestAmount(10); // 最少10个请求 rule.setStatIntervalMs(10000); // 统计10秒 rule.setTimeWindow(10); // 熔断10秒 3. 系统保护 // ✅ 推荐:多指标保护 SystemRule rule = new SystemRule(); rule.setHighestCpuUsage(0.8); // CPU 80% rule.setAvgRt(500); // RT 500ms rule.setMaxThread(50); // 线程数50 配置清单 application.yml spring: application: name: order-service cloud: # Nacos配置 nacos: discovery: server-addr: nacos.example.com:8848 namespace: production config: server-addr: nacos.example.com:8848 namespace: production # Sentinel配置 sentinel: transport: dashboard: sentinel.example.com:8080 port: 8719 eager: true # 规则持久化 datasource: flow: nacos: server-addr: nacos.example.com:8848 dataId: ${spring.application.name}-flow-rules groupId: SENTINEL_GROUP rule-type: flow namespace: production degrade: nacos: server-addr: nacos.example.com:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP rule-type: degrade namespace: production 监控告警 Prometheus指标 management: endpoints: web: exposure: include: '*' metrics: export: prometheus: enabled: true tags: application: ${spring.application.name} env: production 告警规则 groups: - name: sentinel_critical rules: # 限流比例过高 - alert: HighBlockRate expr: | rate(sentinel_block_qps[1m]) / (rate(sentinel_pass_qps[1m]) + rate(sentinel_block_qps[1m])) > 0.1 for: 5m annotations: summary: "限流比例超过10%" # 熔断器开启 - alert: CircuitBreakerOpen expr: sentinel_circuit_breaker_state == 1 for: 1m annotations: summary: "熔断器已开启" # RT过高 - alert: HighRT expr: sentinel_avg_rt > 1000 for: 5m annotations: summary: "平均RT超过1秒" 代码规范 1. 资源命名 // ✅ 推荐:模块_操作 @SentinelResource("order_create") @SentinelResource("product_query") @SentinelResource("payment_pay") // ❌ 避免:动态资源名 @SentinelResource(value = "/api/" + userId) // 会产生大量资源 2. 降级处理 // ✅ 推荐:友好降级 @SentinelResource( value = "getProduct", blockHandler = "handleBlock", fallback = "handleFallback" ) public Product getProduct(Long id) { return productService.getById(id); } public Product handleBlock(Long id, BlockException ex) { // 限流降级:返回缓存 return productCache.get(id); } public Product handleFallback(Long id, Throwable ex) { // 异常降级:返回默认值 return Product.builder().id(id).name("服务暂时不可用").build(); } 3. 异常处理 // ✅ 推荐:全局异常处理 @RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(FlowException.class) public Result handleFlowException(FlowException e) { return Result.error(429, "请求过于频繁,请稍后重试"); } @ExceptionHandler(DegradeException.class) public Result handleDegradeException(DegradeException e) { return Result.error(503, "服务暂时不可用,请稍后重试"); } } 运维流程 1. 变更流程 提出变更需求 ↓ 编写变更方案 ↓ 技术Review ↓ 灰度验证 ↓ 全量发布 ↓ 监控观察 ↓ 变更总结 2. 灰度发布 # 1. 先更新1个实例 kubectl scale deployment order-service --replicas=1 # 更新规则 # 观察5分钟 # 2. 逐步扩容 kubectl scale deployment order-service --replicas=3 # 观察10分钟 # 3. 全量发布 kubectl scale deployment order-service --replicas=10 3. 回滚方案 # 1. 规则回滚 # 在Nacos中恢复旧规则 # 2. 应用回滚 kubectl rollout undo deployment order-service # 3. 验证 curl http://localhost:8080/health 性能优化 1. JVM参数 -Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof 2. 线程池 server: tomcat: threads: max: 200 min-spare: 50 accept-count: 100 3. 规则优化 // 控制规则数量 < 100条 // 合并相似规则 // 定期清理无用规则 安全加固 1. Dashboard认证 # Dashboard启动参数 -Dsentinel.dashboard.auth.username=admin -Dsentinel.dashboard.auth.password=your_strong_password # 启用HTTPS server.port=8443 server.ssl.enabled=true server.ssl.key-store=classpath:keystore.p12 server.ssl.key-store-password=password 2. 规则权限 // 配置规则审批流程 // 只允许管理员修改规则 @PreAuthorize("hasRole('ADMIN')") public Result updateRule(FlowRule rule) { // ... } 测试验证 1. 限流测试 # 压测工具 wrk -t10 -c100 -d60s http://localhost:8080/order/create # 验证限流生效 # 查看Dashboard监控 # 查看应用日志 2. 熔断测试 # 故障注入 curl -X POST http://localhost:8080/fault/inject?resource=productService&delay=5000 # 观察熔断生效 # 查看熔断日志 # 验证降级逻辑 3. 压测报告 测试场景:订单创建接口 限流阈值:1000 QPS 压测并发:100 持续时间:60秒 结果: - 通过QPS:1000 - 限流QPS:100 - 成功率:90.9% - 平均RT:50ms - 限流生效:✅ 应急预案 1. 限流过度 # 立即调整规则 # 在Nacos中增大阈值 # 或临时关闭限流 2. 熔断误判 # 调整熔断阈值 # 增大慢调用时间 # 增大慢调用比例 3. 全局降级 # 配置降级开关 sentinel: enabled: false # 紧急关闭 检查清单 上线前检查 Dashboard部署完成 配置Nacos持久化 配置限流规则 配置熔断规则 配置监控告警 限流压测验证 熔断故障注入测试 降级逻辑验证 应急预案准备 文档编写完成 运行中检查 每日查看Dashboard监控 每周review规则配置 每月review告警记录 每季度压测验证 定期演练应急预案 总结 生产环境最佳实践: ...

2025-11-20 · maneng

并发设计模式:经典模式与最佳实践

一、生产者-消费者模式 1.1 核心思想 生产者线程 → [队列] → 消费者线程 - 生产者:生产数据,放入队列 - 消费者:从队列取出数据,处理 - 队列:解耦生产者和消费者 优势: ✅ 解耦:生产者和消费者独立 ✅ 削峰填谷:队列缓冲,应对突发流量 ✅ 异步处理:提高响应速度 1.2 BlockingQueue实现 public class ProducerConsumerDemo { private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100); // 生产者 class Producer implements Runnable { @Override public void run() { while (true) { try { Task task = produceTask(); queue.put(task); // 队列满时阻塞 System.out.println("生产:" + task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } private Task produceTask() { // 生产任务 return new Task(); } } // 消费者 class Consumer implements Runnable { @Override public void run() { while (true) { try { Task task = queue.take(); // 队列空时阻塞 processTask(task); System.out.println("消费:" + task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } private void processTask(Task task) { // 处理任务 } } public static void main(String[] args) { ProducerConsumerDemo demo = new ProducerConsumerDemo(); // 启动3个生产者 for (int i = 0; i < 3; i++) { new Thread(demo.new Producer()).start(); } // 启动5个消费者 for (int i = 0; i < 5; i++) { new Thread(demo.new Consumer()).start(); } } } 二、线程池模式 2.1 核心思想 任务提交 → [线程池] → 线程执行 - 预创建线程,复用线程资源 - 避免频繁创建/销毁线程的开销 - 控制并发数量,避免资源耗尽 2.2 自定义线程池 public class CustomThreadPool { private final BlockingQueue<Runnable> taskQueue; private final List<WorkerThread> workers; private volatile boolean isShutdown = false; public CustomThreadPool(int poolSize, int queueSize) { taskQueue = new LinkedBlockingQueue<>(queueSize); workers = new ArrayList<>(poolSize); // 创建工作线程 for (int i = 0; i < poolSize; i++) { WorkerThread worker = new WorkerThread(); workers.add(worker); worker.start(); } } // 提交任务 public void submit(Runnable task) { if (isShutdown) { throw new IllegalStateException("线程池已关闭"); } try { taskQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 关闭线程池 public void shutdown() { isShutdown = true; for (WorkerThread worker : workers) { worker.interrupt(); } } // 工作线程 private class WorkerThread extends Thread { @Override public void run() { while (!isShutdown) { try { Runnable task = taskQueue.take(); task.run(); } catch (InterruptedException e) { break; } } } } } 三、Future模式 3.1 核心思想 主线程提交任务 → 异步执行 → 主线程继续工作 → 需要时获取结果 - 异步获取结果 - 主线程不阻塞 - 提高并发性能 3.2 简化实现 public class FutureDemo { // 自定义Future class FutureResult<T> { private volatile T result; private volatile boolean isDone = false; public void set(T result) { this.result = result; this.isDone = true; synchronized (this) { notifyAll(); // 唤醒等待线程 } } public T get() throws InterruptedException { if (!isDone) { synchronized (this) { while (!isDone) { wait(); // 等待结果 } } } return result; } } // 异步任务 public FutureResult<String> asyncTask() { FutureResult<String> future = new FutureResult<>(); new Thread(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 设置结果 future.set("异步任务完成"); }).start(); return future; // 立即返回Future } public static void main(String[] args) throws InterruptedException { FutureDemo demo = new FutureDemo(); // 提交异步任务 FutureResult<String> future = demo.asyncTask(); // 主线程继续工作 System.out.println("主线程继续工作..."); // 需要时获取结果 String result = future.get(); System.out.println("结果:" + result); } } 四、不变模式(Immutable Pattern) 4.1 核心思想 对象创建后,状态不可改变 - 无需同步:线程安全 - 简化并发:无竞态条件 - 高性能:无锁开销 4.2 实现方式 // 不变对象 public final class ImmutablePoint { private final int x; private final int y; public ImmutablePoint(int x, int y) { this.x = x; this.y = y; } public int getX() { return x; } public int getY() { return y; } // 修改操作返回新对象 public ImmutablePoint move(int deltaX, int deltaY) { return new ImmutablePoint(x + deltaX, y + deltaY); } } // 使用 ImmutablePoint p1 = new ImmutablePoint(0, 0); ImmutablePoint p2 = p1.move(10, 10); // 新对象 // 多线程安全:p1和p2都不会被修改 关键要素: ...

2025-11-20 · maneng

RocketMQ入门10:SpringBoot集成实战 - 从配置到生产级应用

引言:将知识转化为实践 前面9篇,我们深入学习了 RocketMQ 的各个方面。现在,让我们通过一个完整的 SpringBoot 项目,把这些知识串联起来,构建一个真正的生产级应用。 这个项目将包含: 完整的项目结构 各种消息类型的实现 错误处理和重试机制 监控和告警 性能优化 部署方案 一、项目搭建 1.1 项目结构 rocketmq-springboot-demo/ ├── pom.xml ├── src/main/java/com/example/rocketmq/ │ ├── RocketMQApplication.java # 启动类 │ ├── config/ │ │ ├── RocketMQConfig.java # RocketMQ配置 │ │ ├── ProducerConfig.java # 生产者配置 │ │ └── ConsumerConfig.java # 消费者配置 │ ├── producer/ │ │ ├── OrderProducer.java # 订单消息生产者 │ │ ├── TransactionProducer.java # 事务消息生产者 │ │ └── DelayProducer.java # 延迟消息生产者 │ ├── consumer/ │ │ ├── OrderConsumer.java # 订单消息消费者 │ │ ├── TransactionConsumer.java # 事务消息消费者 │ │ └── DelayConsumer.java # 延迟消息消费者 │ ├── message/ │ │ ├── OrderMessage.java # 订单消息实体 │ │ └── MessageWrapper.java # 消息包装器 │ ├── service/ │ │ ├── OrderService.java # 订单服务 │ │ └── MessageService.java # 消息服务 │ ├── utils/ │ │ ├── MessageBuilder.java # 消息构建工具 │ │ └── TraceUtils.java # 链路追踪工具 │ └── listener/ │ ├── TransactionListenerImpl.java # 事务监听器 │ └── MessageListenerImpl.java # 消息监听器 ├── src/main/resources/ │ ├── application.yml # 配置文件 │ ├── application-dev.yml # 开发环境配置 │ ├── application-prod.yml # 生产环境配置 │ └── logback-spring.xml # 日志配置 └── src/test/java/ # 测试类 1.2 Maven 依赖 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> </parent> <groupId>com.example</groupId> <artifactId>rocketmq-springboot-demo</artifactId> <version>1.0.0</version> <properties> <java.version>8</java.version> <rocketmq-spring.version>2.2.3</rocketmq-spring.version> </properties> <dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ Spring Boot Starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring.version}</version> </dependency> <!-- 数据库相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- Redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 监控 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <!-- 工具类 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.34</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.20</version> </dependency> <!-- 测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project> 1.3 配置文件 # application.yml spring: application: name: rocketmq-springboot-demo # RocketMQ 配置 rocketmq: name-server: ${ROCKETMQ_NAMESRV:localhost:9876} producer: group: ${spring.application.name}_producer_group send-message-timeout: 3000 compress-message-body-threshold: 4096 max-message-size: 4194304 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2 retry-next-server: true # 自定义配置 consumer: groups: order-consumer-group: topics: ORDER_TOPIC consumer-group: ${spring.application.name}_order_consumer consume-thread-max: 64 consume-thread-min: 20 transaction-consumer-group: topics: TRANSACTION_TOPIC consumer-group: ${spring.application.name}_transaction_consumer # 应用配置 app: rocketmq: # 是否启用事务消息 transaction-enabled: true # 是否启用消息轨迹 trace-enabled: true # 消息轨迹Topic trace-topic: RMQ_SYS_TRACE_TOPIC # ACL配置 acl-enabled: false access-key: ${ROCKETMQ_ACCESS_KEY:} secret-key: ${ROCKETMQ_SECRET_KEY:} # 监控配置 management: endpoints: web: exposure: include: health,info,metrics,prometheus metrics: export: prometheus: enabled: true 二、核心配置类 2.1 RocketMQ 配置 // RocketMQConfig.java @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Slf4j public class RocketMQConfig { @Autowired private RocketMQProperties rocketMQProperties; /** * 配置消息轨迹 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "trace-enabled", havingValue = "true") public DefaultMQProducer tracedProducer() { DefaultMQProducer producer = new DefaultMQProducer( rocketMQProperties.getProducer().getGroup(), true, // 启用消息轨迹 rocketMQProperties.getTraceTopic() ); configureProducer(producer); return producer; } /** * 配置事务消息生产者 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "transaction-enabled", havingValue = "true") public TransactionMQProducer transactionProducer( @Autowired TransactionListener transactionListener) { TransactionMQProducer producer = new TransactionMQProducer( rocketMQProperties.getProducer().getGroup() + "_TX" ); configureProducer(producer); // 设置事务监听器 producer.setTransactionListener(transactionListener); // 设置事务回查线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("transaction-check-thread"); return thread; } ); producer.setExecutorService(executor); return producer; } /** * 通用生产者配置 */ private void configureProducer(DefaultMQProducer producer) { producer.setNamesrvAddr(rocketMQProperties.getNameServer()); producer.setSendMsgTimeout(rocketMQProperties.getProducer().getSendMessageTimeout()); producer.setCompressMsgBodyOverHowmuch( rocketMQProperties.getProducer().getCompressMessageBodyThreshold() ); producer.setMaxMessageSize(rocketMQProperties.getProducer().getMaxMessageSize()); producer.setRetryTimesWhenSendFailed( rocketMQProperties.getProducer().getRetryTimesWhenSendFailed() ); // 配置ACL if (rocketMQProperties.isAclEnabled()) { producer.setAccessKey(rocketMQProperties.getAccessKey()); producer.setSecretKey(rocketMQProperties.getSecretKey()); } } /** * 消息发送模板 */ @Bean public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter converter) { RocketMQTemplate template = new RocketMQTemplate(); template.setMessageConverter(converter); return template; } /** * 自定义消息转换器 */ @Bean public RocketMQMessageConverter rocketMQMessageConverter() { return new CustomMessageConverter(); } } 2.2 自定义消息转换器 // CustomMessageConverter.java @Component public class CustomMessageConverter extends CompositeMessageConverter { private static final String ENCODING = "UTF-8"; @Override public org.springframework.messaging.Message<?> toMessage( Object payload, MessageHeaders headers) { // 添加追踪信息 Map<String, Object> enrichedHeaders = new HashMap<>(headers); enrichedHeaders.put("traceId", TraceUtils.getTraceId()); enrichedHeaders.put("timestamp", System.currentTimeMillis()); return super.toMessage(payload, new MessageHeaders(enrichedHeaders)); } @Override public Object fromMessage(org.springframework.messaging.Message<?> message, Class<?> targetClass) { // 自定义反序列化逻辑 if (targetClass == OrderMessage.class) { return JSON.parseObject( new String((byte[]) message.getPayload(), StandardCharsets.UTF_8), OrderMessage.class ); } return super.fromMessage(message, targetClass); } } 三、消息生产者实现 3.1 通用消息服务 // MessageService.java @Service @Slf4j public class MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 */ public SendResult sendMessage(String topic, String tag, Object message) { String destination = buildDestination(topic, tag); try { Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("businessType", message.getClass().getSimpleName()) .build(); SendResult result = rocketMQTemplate.syncSend(destination, springMessage); log.info("消息发送成功: destination={}, msgId={}, sendStatus={}", destination, result.getMsgId(), result.getSendStatus()); // 记录监控指标 recordMetrics(topic, tag, true, 0); return result; } catch (Exception e) { log.error("消息发送失败: destination={}, error={}", destination, e.getMessage()); recordMetrics(topic, tag, false, 1); throw new RuntimeException("消息发送失败", e); } } /** * 发送延迟消息 */ public SendResult sendDelayMessage(String topic, String tag, Object message, int delayLevel) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); // delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h SendResult result = rocketMQTemplate.syncSend(destination, springMessage, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); log.info("延迟消息发送成功: destination={}, delayLevel={}, msgId={}", destination, delayLevel, result.getMsgId()); return result; } /** * 发送顺序消息 */ public SendResult sendOrderlyMessage(String topic, String tag, Object message, String hashKey) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); SendResult result = rocketMQTemplate.syncSendOrderly( destination, springMessage, hashKey); log.info("顺序消息发送成功: destination={}, hashKey={}, msgId={}", destination, hashKey, result.getMsgId()); return result; } /** * 发送事务消息 */ public TransactionSendResult sendTransactionMessage( String topic, String tag, Object message, Object arg) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("transactionId", UUID.randomUUID().toString()) .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( destination, springMessage, arg); log.info("事务消息发送成功: destination={}, transactionId={}, sendStatus={}", destination, result.getTransactionId(), result.getSendStatus()); return result; } /** * 批量发送消息 */ public SendResult sendBatchMessage(String topic, String tag, List<?> messages) { String destination = buildDestination(topic, tag); List<Message> rocketMQMessages = messages.stream() .map(msg -> { Message message = new Message(); message.setTopic(topic); message.setTags(tag); message.setKeys(generateKey(msg)); message.setBody(JSON.toJSONBytes(msg)); return message; }) .collect(Collectors.toList()); SendResult result = rocketMQTemplate.syncSend(destination, rocketMQMessages); log.info("批量消息发送成功: destination={}, count={}, msgId={}", destination, messages.size(), result.getMsgId()); return result; } /** * 异步发送消息 */ public void sendAsyncMessage(String topic, String tag, Object message, SendCallback sendCallback) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); rocketMQTemplate.asyncSend(destination, springMessage, sendCallback); log.info("异步消息已提交: destination={}", destination); } private String buildDestination(String topic, String tag) { return StringUtils.isBlank(tag) ? topic : topic + ":" + tag; } private String generateKey(Object message) { if (message instanceof BaseMessage) { return ((BaseMessage) message).getBusinessId(); } return UUID.randomUUID().toString(); } private void recordMetrics(String topic, String tag, boolean success, int failCount) { // 记录Prometheus指标 if (success) { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "success" ).increment(); } else { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "fail" ).increment(failCount); } } } 3.2 订单消息生产者 // OrderProducer.java @Component @Slf4j public class OrderProducer { @Autowired private MessageService messageService; /** * 发送订单创建消息 */ public void sendOrderCreateMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.CREATED) .createTime(new Date()) .build(); messageService.sendMessage("ORDER_TOPIC", "ORDER_CREATE", message); } /** * 发送订单支付消息(事务消息) */ public void sendOrderPaymentMessage(Order order, PaymentInfo paymentInfo) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.PAID) .paymentInfo(paymentInfo) .build(); // 发送事务消息,确保订单更新和消息发送的原子性 messageService.sendTransactionMessage( "ORDER_TOPIC", "ORDER_PAYMENT", message, order // 传递订单对象作为事务回查参数 ); } /** * 发送订单状态变更消息(顺序消息) */ public void sendOrderStatusMessage(Order order, OrderStatus newStatus) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(newStatus) .updateTime(new Date()) .build(); // 使用订单ID作为顺序key,确保同一订单的消息顺序 messageService.sendOrderlyMessage( "ORDER_TOPIC", "ORDER_STATUS_" + newStatus.name(), message, order.getOrderId() ); } /** * 发送订单超时取消消息(延迟消息) */ public void sendOrderTimeoutMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(OrderStatus.TIMEOUT) .build(); // 30分钟后发送超时消息(延迟级别16 = 30m) messageService.sendDelayMessage( "ORDER_TOPIC", "ORDER_TIMEOUT", message, 16 ); } } 四、消息消费者实现 4.1 订单消息消费者 // OrderConsumer.java @Component @Slf4j @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "order_consumer_group", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 64, consumeThreadMin = 20 ) public class OrderConsumer implements RocketMQListener<OrderMessage>, RocketMQPushConsumerLifecycleListener { @Autowired private OrderService orderService; @Autowired private RedisTemplate<String, Object> redisTemplate; private final AtomicLong consumeCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); @Override public void onMessage(OrderMessage message) { long count = consumeCount.incrementAndGet(); log.info("消费订单消息: orderId={}, status={}, count={}", message.getOrderId(), message.getStatus(), count); try { // 幂等性检查 if (isDuplicate(message)) { log.warn("重复消息,跳过处理: orderId={}", message.getOrderId()); return; } // 处理消息 processMessage(message); // 记录已处理 markAsProcessed(message); // 记录监控指标 recordSuccessMetrics(message); } catch (Exception e) { errorCount.incrementAndGet(); log.error("消息处理失败: orderId={}, error={}", message.getOrderId(), e.getMessage(), e); // 记录错误指标 recordErrorMetrics(message); // 重新抛出异常,触发重试 throw new RuntimeException(e); } } /** * 幂等性检查 */ private boolean isDuplicate(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); Boolean result = redisTemplate.opsForValue().setIfAbsent( key, "1", 24, TimeUnit.HOURS); return !Boolean.TRUE.equals(result); } /** * 处理消息 */ private void processMessage(OrderMessage message) { switch (message.getStatus()) { case CREATED: orderService.handleOrderCreated(message); break; case PAID: orderService.handleOrderPaid(message); break; case SHIPPED: orderService.handleOrderShipped(message); break; case COMPLETED: orderService.handleOrderCompleted(message); break; case CANCELLED: orderService.handleOrderCancelled(message); break; case TIMEOUT: orderService.handleOrderTimeout(message); break; default: log.warn("未知订单状态: {}", message.getStatus()); } } /** * 标记消息已处理 */ private void markAsProcessed(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); redisTemplate.opsForValue().set(key, JSON.toJSONString(message), 7, TimeUnit.DAYS); } /** * 记录成功指标 */ private void recordSuccessMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "success" ).increment(); } /** * 记录错误指标 */ private void recordErrorMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "error" ).increment(); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 消费者启动前的准备工作 log.info("订单消费者准备启动: group={}", consumer.getConsumerGroup()); // 设置消费位点 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置消费超时(默认15分钟) consumer.setConsumeTimeout(15); // 设置最大重试次数 consumer.setMaxReconsumeTimes(3); // 设置消息过滤 try { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("status in ('PAID', 'SHIPPED')")); } catch (Exception e) { log.error("设置消息过滤失败", e); } } } 4.2 事务消息监听器 // TransactionListenerImpl.java @Component @Slf4j public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Autowired private TransactionLogService transactionLogService; /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction( Message msg, Object arg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("执行本地事务: transactionId={}", transactionId); try { // 解析消息 OrderMessage orderMessage = JSON.parseObject( new String((byte[]) msg.getPayload()), OrderMessage.class ); // 执行本地事务 Order order = (Order) arg; orderService.updateOrderStatus(order, OrderStatus.PAID); // 记录事务日志 transactionLogService.save( transactionId, orderMessage.getOrderId(), TransactionStatus.COMMIT ); log.info("本地事务执行成功: orderId={}", orderMessage.getOrderId()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务执行失败: transactionId={}", transactionId, e); // 记录失败 transactionLogService.save( transactionId, null, TransactionStatus.ROLLBACK ); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 事务回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("事务回查: transactionId={}", transactionId); // 查询事务日志 TransactionLog log = transactionLogService.findByTransactionId(transactionId); if (log == null) { log.warn("事务日志不存在: transactionId={}", transactionId); return RocketMQLocalTransactionState.UNKNOWN; } // 根据事务状态返回 switch (log.getStatus()) { case COMMIT: return RocketMQLocalTransactionState.COMMIT; case ROLLBACK: return RocketMQLocalTransactionState.ROLLBACK; default: return RocketMQLocalTransactionState.UNKNOWN; } } } 五、错误处理和重试 5.1 消费失败处理 // RetryMessageHandler.java @Component @Slf4j public class RetryMessageHandler { @Autowired private MessageService messageService; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 处理消费失败的消息 */ public void handleConsumeError(Message message, Exception error) { String messageId = message.getHeaders().get(RocketMQHeaders.MESSAGE_ID, String.class); String topic = message.getHeaders().get(RocketMQHeaders.TOPIC, String.class); // 记录重试次数 String retryKey = "retry:count:" + messageId; Long retryCount = redisTemplate.opsForValue().increment(retryKey); redisTemplate.expire(retryKey, 1, TimeUnit.DAYS); log.error("消息消费失败: messageId={}, topic={}, retryCount={}, error={}", messageId, topic, retryCount, error.getMessage()); // 根据重试次数决定处理策略 if (retryCount <= 3) { // 延迟重试 retryWithDelay(message, retryCount); } else if (retryCount <= 5) { // 发送到重试队列 sendToRetryQueue(message); } else { // 发送到死信队列 sendToDeadLetterQueue(message, error); } } /** * 延迟重试 */ private void retryWithDelay(Message message, Long retryCount) { // 计算延迟级别:第1次10s,第2次30s,第3次1m int delayLevel = retryCount.intValue() * 2 + 1; messageService.sendDelayMessage( message.getHeaders().get(RocketMQHeaders.TOPIC, String.class), "RETRY", message.getPayload(), delayLevel ); log.info("消息已发送到延迟重试队列: messageId={}, delayLevel={}", message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), delayLevel); } /** * 发送到重试队列 */ private void sendToRetryQueue(Message message) { String retryTopic = "%RETRY%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); messageService.sendMessage(retryTopic, "RETRY", message.getPayload()); log.info("消息已发送到重试队列: topic={}", retryTopic); } /** * 发送到死信队列 */ private void sendToDeadLetterQueue(Message message, Exception error) { String dlqTopic = "%DLQ%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); // 添加错误信息 Map<String, Object> headers = new HashMap<>(message.getHeaders()); headers.put("errorMessage", error.getMessage()); headers.put("errorTime", new Date()); Message<Object> dlqMessage = MessageBuilder .withPayload(message.getPayload()) .copyHeaders(headers) .build(); messageService.sendMessage(dlqTopic, "DLQ", dlqMessage); log.error("消息已发送到死信队列: topic={}", dlqTopic); // 发送告警 sendAlert(message, error); } /** * 发送告警 */ private void sendAlert(Message message, Exception error) { // 发送告警通知(邮件、短信、钉钉等) String alertMessage = String.format( "消息处理失败告警\n" + "Topic: %s\n" + "MessageId: %s\n" + "Error: %s\n" + "Time: %s", message.getHeaders().get(RocketMQHeaders.TOPIC), message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), error.getMessage(), new Date() ); // 这里可以集成告警系统 log.error("告警: {}", alertMessage); } } 六、监控和运维 6.1 健康检查 // RocketMQHealthIndicator.java @Component public class RocketMQHealthIndicator implements HealthIndicator { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public Health health() { try { // 检查 NameServer 连接 DefaultMQProducer producer = rocketMQTemplate.getProducer(); producer.getDefaultMQProducerImpl().getmQClientFactory() .getMQClientAPIImpl().getNameServerAddressList(); // 检查 Broker 连接 Collection<String> topics = producer.getDefaultMQProducerImpl() .getmQClientFactory().getTopicRouteTable().keySet(); return Health.up() .withDetail("nameServer", producer.getNamesrvAddr()) .withDetail("producerGroup", producer.getProducerGroup()) .withDetail("topics", topics) .build(); } catch (Exception e) { return Health.down() .withDetail("error", e.getMessage()) .build(); } } } 6.2 监控指标收集 // RocketMQMetrics.java @Component @Slf4j public class RocketMQMetrics { private final MeterRegistry meterRegistry; private final Map<String, AtomicLong> counters = new ConcurrentHashMap<>(); public RocketMQMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; initMetrics(); } private void initMetrics() { // 发送成功计数 Gauge.builder("rocketmq.producer.send.success", counters, map -> map.getOrDefault("send.success", new AtomicLong()).get()) .register(meterRegistry); // 发送失败计数 Gauge.builder("rocketmq.producer.send.failure", counters, map -> map.getOrDefault("send.failure", new AtomicLong()).get()) .register(meterRegistry); // 消费成功计数 Gauge.builder("rocketmq.consumer.consume.success", counters, map -> map.getOrDefault("consume.success", new AtomicLong()).get()) .register(meterRegistry); // 消费失败计数 Gauge.builder("rocketmq.consumer.consume.failure", counters, map -> map.getOrDefault("consume.failure", new AtomicLong()).get()) .register(meterRegistry); // 消息堆积数 Gauge.builder("rocketmq.consumer.lag", this, RocketMQMetrics::getConsumerLag) .register(meterRegistry); } public void recordSendSuccess() { counters.computeIfAbsent("send.success", k -> new AtomicLong()).incrementAndGet(); } public void recordSendFailure() { counters.computeIfAbsent("send.failure", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeSuccess() { counters.computeIfAbsent("consume.success", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeFailure() { counters.computeIfAbsent("consume.failure", k -> new AtomicLong()).incrementAndGet(); } private double getConsumerLag() { // 实现获取消费延迟的逻辑 return 0.0; } } 七、部署和运维 7.1 Docker 部署 # docker-compose.yml version: '3.8' services: app: build: . container_name: rocketmq-springboot-demo environment: - SPRING_PROFILES_ACTIVE=prod - ROCKETMQ_NAMESRV=rocketmq-namesrv:9876 - JAVA_OPTS=-Xms1G -Xmx1G -XX:+UseG1GC ports: - "8080:8080" - "9090:9090" # Prometheus metrics depends_on: - rocketmq-namesrv - rocketmq-broker - redis - mysql networks: - rocketmq-network rocketmq-namesrv: image: apache/rocketmq:4.9.4 container_name: rocketmq-namesrv command: sh mqnamesrv ports: - "9876:9876" networks: - rocketmq-network rocketmq-broker: image: apache/rocketmq:4.9.4 container_name: rocketmq-broker command: sh mqbroker -n rocketmq-namesrv:9876 ports: - "10909:10909" - "10911:10911" networks: - rocketmq-network redis: image: redis:6.2 container_name: redis ports: - "6379:6379" networks: - rocketmq-network mysql: image: mysql:8.0 container_name: mysql environment: - MYSQL_ROOT_PASSWORD=root123 - MYSQL_DATABASE=rocketmq_demo ports: - "3306:3306" networks: - rocketmq-network networks: rocketmq-network: driver: bridge 7.2 Kubernetes 部署 # deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: rocketmq-springboot-demo namespace: default spec: replicas: 3 selector: matchLabels: app: rocketmq-springboot-demo template: metadata: labels: app: rocketmq-springboot-demo spec: containers: - name: app image: rocketmq-springboot-demo:latest ports: - containerPort: 8080 - containerPort: 9090 env: - name: SPRING_PROFILES_ACTIVE value: "prod" - name: ROCKETMQ_NAMESRV value: "rocketmq-namesrv-service:9876" - name: JAVA_OPTS value: "-Xms1G -Xmx1G -XX:+UseG1GC" resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m" livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 60 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: rocketmq-springboot-demo-service spec: selector: app: rocketmq-springboot-demo ports: - name: http port: 8080 targetPort: 8080 - name: metrics port: 9090 targetPort: 9090 type: LoadBalancer 八、总结 通过这个完整的 SpringBoot 集成示例,我们实现了: ...

2025-11-13 · maneng

并发设计模式与最佳实践

一、不可变对象模式 1.1 不可变对象的设计 /** * 标准的不可变对象设计 */ public final class ImmutableUser { private final String name; private final int age; private final List<String> hobbies; public ImmutableUser(String name, int age, List<String> hobbies) { this.name = name; this.age = age; // 防御性复制 this.hobbies = new ArrayList<>(hobbies); } public String getName() { return name; } public int getAge() { return age; } public List<String> getHobbies() { // 返回不可变视图 return Collections.unmodifiableList(hobbies); } } /* 不可变对象的五大要求: 1. ✅ 类声明为final 2. ✅ 所有字段都是final 3. ✅ 所有字段都是private 4. ✅ 不提供setter方法 5. ✅ 可变字段防御性复制 优势: ├─ 天然线程安全 ├─ 可以安全共享 └─ 适合做缓存key 适用场景:String、Integer、LocalDate等 */ 二、ThreadLocal模式 2.1 基本使用 /** * ThreadLocal:线程本地存储 */ public class ThreadLocalExample { // 每个线程独立的SimpleDateFormat private static ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); public static String formatDate(Date date) { return dateFormat.get().format(date); // 线程安全 } public static void main(String[] args) { // 10个线程并发格式化日期 for (int i = 0; i < 10; i++) { new Thread(() -> { System.out.println(formatDate(new Date())); }).start(); } } } /* 使用场景: 1. SimpleDateFormat(线程不安全) 2. 数据库连接(每个线程独立连接) 3. 用户上下文(Spring Security) */ 2.2 内存泄漏问题 /** * ThreadLocal内存泄漏问题 */ public class ThreadLocalMemoryLeak { private static ThreadLocal<byte[]> threadLocal = new ThreadLocal<>(); public static void main(String[] args) throws InterruptedException { // 线程池场景 ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 100; i++) { executor.submit(() -> { // 存储大对象 threadLocal.set(new byte[1024 * 1024]); // 1MB // 业务逻辑 // ... // ❌ 忘记清理,导致内存泄漏 // threadLocal.remove(); // 正确做法 }); } executor.shutdown(); } /* 内存泄漏原因: 1. ThreadLocal存储在ThreadLocalMap中 2. key是弱引用(ThreadLocal) 3. value是强引用(实际对象) 4. 线程池线程复用,ThreadLocalMap一直存在 5. value无法被GC,导致内存泄漏 解决方案: try { threadLocal.set(value); // 业务逻辑 } finally { threadLocal.remove(); // 必须清理 } */ } 三、两阶段终止模式 3.1 优雅停止线程 /** * 正确的线程停止方式 */ public class GracefulShutdown { private volatile boolean running = true; public void start() { Thread thread = new Thread(() -> { while (running) { // 检查标志位 try { // 业务逻辑 Thread.sleep(1000); } catch (InterruptedException e) { // 响应中断 Thread.currentThread().interrupt(); break; } } // 清理资源 cleanup(); }); thread.start(); } public void stop() { running = false; // 第一阶段:设置标志位 } private void cleanup() { // 第二阶段:清理资源 System.out.println("清理资源"); } /* 错误做法: thread.stop(); // ❌ 已废弃,不安全 正确做法: 1. volatile标志位 2. interrupt()中断 3. 两阶段:设置标志 + 清理资源 */ } 四、读写锁模式 4.1 ReentrantReadWriteLock /** * 读写锁:读多写少场景 */ public class ReadWriteLockExample { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Map<String, String> cache = new HashMap<>(); // 读操作:共享锁 public String get(String key) { rwLock.readLock().lock(); try { return cache.get(key); } finally { rwLock.readLock().unlock(); } } // 写操作:排他锁 public void put(String key, String value) { rwLock.writeLock().lock(); try { cache.put(key, value); } finally { rwLock.writeLock().unlock(); } } /* 读写锁特性: ├─ 读-读:可并发 ├─ 读-写:互斥 └─ 写-写:互斥 性能提升: 读多写少场景,性能比synchronized高3-5倍 */ } 4.2 StampedLock(JDK 8) /** * StampedLock:性能更好的读写锁 */ public class StampedLockExample { private final StampedLock lock = new StampedLock(); private double x, y; // 乐观读 public double distanceFromOrigin() { long stamp = lock.tryOptimisticRead(); // 乐观读 double currentX = x; double currentY = y; if (!lock.validate(stamp)) { // 验证是否被修改 stamp = lock.readLock(); // 升级为悲观读锁 try { currentX = x; currentY = y; } finally { lock.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } // 写锁 public void move(double deltaX, double deltaY) { long stamp = lock.writeLock(); try { x += deltaX; y += deltaY; } finally { lock.unlockWrite(stamp); } } /* StampedLock优势: 1. 乐观读不加锁(性能极高) 2. 读写性能比ReentrantReadWriteLock高 3. 适合读多写少场景 注意: ❌ 不支持重入 ❌ 不支持Condition */ } 五、并发编程最佳实践 5.1 核心原则 1. 优先使用不可变对象 └─ String、Integer、LocalDate 2. 减少锁的范围 └─ 只锁必要的代码 3. 避免锁嵌套 └─ 容易死锁 4. 使用并发工具类 ├─ AtomicInteger替代synchronized计数 ├─ ConcurrentHashMap替代Hashtable ├─ CountDownLatch/CyclicBarrier协调线程 └─ BlockingQueue实现生产者-消费者 5. 线程池管理 ├─ 自定义ThreadPoolExecutor(禁用Executors) ├─ 优雅关闭(shutdown + awaitTermination) └─ 线程命名(便于排查) 6. 异常处理 ├─ 捕获InterruptedException ├─ 设置UncaughtExceptionHandler └─ 避免异常导致线程死亡 5.2 常见陷阱 /** * 常见并发陷阱 */ public class ConcurrencyTraps { // ❌ 陷阱1:双重检查锁定错误 private static Singleton instance; public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); // 需要volatile } } } return instance; } // ✅ 正确:private static volatile Singleton instance; // ❌ 陷阱2:SimpleDateFormat共享 private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // ✅ 正确:使用ThreadLocal或DateTimeFormatter // ❌ 陷阱3:复合操作不原子 if (!map.containsKey(key)) { map.put(key, value); } // ✅ 正确:map.putIfAbsent(key, value); // ❌ 陷阱4:忘记释放锁 lock.lock(); // 业务逻辑 lock.unlock(); // 异常时不会执行 // ✅ 正确:try-finally // ❌ 陷阱5:使用Executors工厂方法 Executors.newFixedThreadPool(10); // 无界队列 // ✅ 正确:自定义ThreadPoolExecutor static class Singleton { private static volatile Singleton instance; public static Singleton getInstance() { return instance; } } } 5.3 性能调优 1. 减少锁竞争 ├─ 缩小锁范围 ├─ 降低锁粒度(分段锁) └─ 使用无锁算法(CAS) 2. 减少上下文切换 ├─ 控制线程数(CPU核心数 × 2) ├─ 使用协程/虚拟线程(JDK 19+) └─ 避免频繁阻塞 3. 内存优化 ├─ 对象池(避免频繁创建) ├─ ThreadLocal注意清理 └─ 合理设置JVM参数 4. 监控与调优 ├─ JProfiler监控线程状态 ├─ Arthas诊断死锁 └─ 压测验证性能 六、总结:并发编程知识体系 6.1 核心知识点回顾 第1篇:并发三大问题 ├─ 可见性:CPU缓存导致 ├─ 原子性:指令交错执行 ├─ 有序性:指令重排序 └─ JMM:happens-before规则 第2篇:同步机制 ├─ synchronized:锁升级(偏向锁→轻量级锁→重量级锁) ├─ Lock:ReentrantLock、公平锁/非公平锁 ├─ AQS:CLH队列、独占/共享模式 └─ 死锁:四个必要条件、预防策略 第3篇:无锁编程 ├─ CAS:CPU的CMPXCHG指令 ├─ Atomic:AtomicInteger、LongAdder ├─ 并发工具:CountDownLatch、Semaphore └─ 无锁数据结构:无锁栈、无锁队列 第4篇:线程池与异步 ├─ ThreadPoolExecutor:七参数、四拒绝策略 ├─ ForkJoinPool:工作窃取算法 ├─ CompletableFuture:组合式异步编程 └─ 最佳实践:配置、监控、优雅关闭 第5篇:并发集合 ├─ ConcurrentHashMap:分段锁→CAS+synchronized ├─ BlockingQueue:生产者-消费者模式 ├─ CopyOnWriteArrayList:写时复制 └─ 选择指南:性能对比、适用场景 第6篇:设计模式与实践 ├─ 不可变对象:天然线程安全 ├─ ThreadLocal:线程本地存储 ├─ 读写锁:读多写少优化 └─ 最佳实践:避坑指南、性能调优 6.2 学习路径建议 阶段1:理解原理(1-2周) ├─ JMM、happens-before ├─ 三大问题 └─ volatile、final 阶段2:掌握同步(2-3周) ├─ synchronized原理 ├─ Lock接口 └─ AQS源码 阶段3:无锁编程(2-3周) ├─ CAS原理 ├─ Atomic类 └─ 并发工具类 阶段4:工程实践(4-6周) ├─ 线程池配置 ├─ 并发集合使用 ├─ 实际项目应用 └─ 性能调优 总计:约3个月系统学习 6.3 推荐资源 经典书籍: 1. 《Java并发编程实战》 ⭐⭐⭐⭐⭐ 2. 《Java并发编程的艺术》 ⭐⭐⭐⭐ 3. 《深入理解Java虚拟机》 ⭐⭐⭐⭐⭐ 官方文档: 1. JDK源码(java.util.concurrent包) 2. Java Language Specification - Chapter 17 3. JSR-133: Java Memory Model 开源项目: 1. Netty(高性能网络框架) 2. Disruptor(高性能队列) 3. Spring(DI容器、事务管理) 结语 Java并发编程系列完结 ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...