索引优化实战案例

引言 本文通过5个真实案例,综合运用索引优化知识,展示完整的优化过程。 案例1:订单列表查询优化 1.1 问题 -- 慢SQL SELECT * FROM orders WHERE user_id = 123 ORDER BY created_at DESC LIMIT 20; -- 执行时间:5秒 1.2 分析 EXPLAIN SELECT * FROM orders WHERE user_id = 123 ORDER BY created_at DESC LIMIT 20; 结果: type: ref key: idx_user_id rows: 50000 Extra: Using filesort ← 文件排序 问题: 扫描5万行 需要额外排序(filesort) 1.3 优化 -- 创建联合索引 CREATE INDEX idx_user_created ON orders(user_id, created_at); -- 验证 EXPLAIN SELECT * FROM orders WHERE user_id = 123 ORDER BY created_at DESC LIMIT 20; 结果: type: ref key: idx_user_created rows: 20 Extra: Using index condition ← 无filesort 1.4 效果 执行时间:5秒 → 0.01秒 性能提升:500倍 扫描行数:50000 → 20 案例2:分页查询深分页优化 2.1 问题 -- 慢SQL(翻到第5000页) SELECT * FROM products ORDER BY id LIMIT 100000, 20; -- 执行时间:8秒 2.2 分析 EXPLAIN SELECT * FROM products ORDER BY id LIMIT 100000, 20; 问题: ...

2025-11-20 · maneng

实战:防止微服务雪崩传播

引言:一场真实的雪崩事故 2018年某电商平台的双11大促,凌晨2点,一场噩梦开始了: 2:00 - 商品详情服务的MySQL数据库出现慢查询,响应时间从10ms飙升到5秒 2:02 - 商品服务的Tomcat线程池被耗尽(200个线程全部阻塞在数据库查询上) 2:03 - 订单服务调用商品服务超时,订单服务的线程池也被耗尽 2:04 - 用户服务调用订单服务超时,用户服务也挂了 2:05 - 整个系统瘫痪,首页无法访问,交易全部失败 损失:10分钟内损失订单超过5000万元 这就是微服务雪崩效应的真实案例。 一个数据库的慢查询,像多米诺骨牌一样,导致整个系统崩溃。 今天我们就来学习如何用Sentinel防止这种雪崩的发生。 场景描述:一个典型的调用链路 系统架构 我们有一个典型的微服务架构: 用户请求 ↓ [用户服务 User-Service] ↓ 调用 [订单服务 Order-Service] ↓ 调用 [商品服务 Product-Service] ↓ 查询 [MySQL数据库] 调用关系: 用户服务 → 订单服务:查询用户的订单列表 订单服务 → 商品服务:查询订单中的商品详情 商品服务 → MySQL:查询商品信息 正常流程 用户访问"我的订单"页面 用户服务调用订单服务的/order/list接口 订单服务返回订单ID列表 订单服务调用商品服务的/product/detail接口,批量查询商品信息 商品服务查询MySQL数据库 层层返回,用户看到订单列表 正常情况下的响应时间: 商品服务查询数据库:10ms 订单服务调用商品服务:50ms 用户服务调用订单服务:100ms 用户看到页面:200ms 问题分析:雪崩是如何发生的? 故障起点:商品服务的数据库慢查询 某天凌晨,商品服务的MySQL数据库出现了慢查询: -- 慢查询:全表扫描,耗时5秒 SELECT * FROM product WHERE category = 'phone' AND price > 5000 ORDER BY sales DESC; 直接影响:商品服务的响应时间从10ms飙升到5秒。 ...

2025-11-20 · maneng

第一个完整案例:用户管理系统

引言 回顾与展望 前9篇文章中,我们学习了: ✅ 数据库基础概念 ✅ MySQL安装配置 ✅ DDL(创建表) ✅ 数据类型选择 ✅ DML(增删改查) ✅ 约束与完整性 ✅ 字符集处理 ✅ 数据导入导出 现在,让我们综合运用这些知识,搭建一个完整的用户管理系统! 系统需求 实现一个用户管理系统,支持: 用户注册(唯一用户名、邮箱验证) 用户登录(密码加密存储) 个人信息管理(修改资料、上传头像) 角色权限管理(管理员、普通用户) 登录日志记录 数据统计(用户数、登录次数) 需求分析 功能模块 用户管理系统 ├── 用户模块 │ ├── 注册 │ ├── 登录 │ ├── 修改资料 │ └── 注销账户 ├── 角色权限模块 │ ├── 角色管理 │ ├── 权限分配 │ └── 权限验证 └── 日志模块 ├── 登录日志 └── 操作日志 数据库设计 需要以下数据表: users:用户表(核心表) roles:角色表 permissions:权限表 role_permissions:角色权限关联表 user_roles:用户角色关联表 login_logs:登录日志表 数据库设计 创建数据库 -- 创建数据库 CREATE DATABASE IF NOT EXISTS user_management CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; -- 使用数据库 USE user_management; 1. 用户表(users) CREATE TABLE users ( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '用户ID', username VARCHAR(50) NOT NULL UNIQUE COMMENT '用户名(唯一)', password VARCHAR(255) NOT NULL COMMENT '密码(SHA256加密)', email VARCHAR(100) NOT NULL UNIQUE COMMENT '邮箱(唯一)', phone VARCHAR(20) COMMENT '手机号', real_name VARCHAR(50) COMMENT '真实姓名', nickname VARCHAR(50) COMMENT '昵称', avatar VARCHAR(255) COMMENT '头像URL', gender ENUM('male', 'female', 'other') DEFAULT 'other' COMMENT '性别', birthday DATE COMMENT '生日', bio TEXT COMMENT '个人简介', status TINYINT DEFAULT 1 COMMENT '状态:1正常 0禁用 2待激活', login_count INT UNSIGNED DEFAULT 0 COMMENT '登录次数', last_login_at TIMESTAMP NULL COMMENT '最后登录时间', last_login_ip VARCHAR(45) COMMENT '最后登录IP', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', deleted_at TIMESTAMP NULL COMMENT '删除时间(软删除)', INDEX idx_username (username), INDEX idx_email (email), INDEX idx_phone (phone), INDEX idx_status (status), INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表'; 2. 角色表(roles) CREATE TABLE roles ( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT, name VARCHAR(50) NOT NULL UNIQUE COMMENT '角色名称', display_name VARCHAR(100) COMMENT '显示名称', description VARCHAR(255) COMMENT '角色描述', level INT DEFAULT 0 COMMENT '角色级别(数字越大权限越高)', status TINYINT DEFAULT 1 COMMENT '状态:1启用 0禁用', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name), INDEX idx_level (level) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='角色表'; 3. 权限表(permissions) CREATE TABLE permissions ( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE COMMENT '权限名称(如user.create)', display_name VARCHAR(100) COMMENT '显示名称', description VARCHAR(255) COMMENT '权限描述', module VARCHAR(50) COMMENT '所属模块(user、order、product)', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_name (name), INDEX idx_module (module) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='权限表'; 4. 角色权限关联表(role_permissions) CREATE TABLE role_permissions ( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT, role_id INT UNSIGNED NOT NULL COMMENT '角色ID', permission_id INT UNSIGNED NOT NULL COMMENT '权限ID', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_role_permission (role_id, permission_id), INDEX idx_role_id (role_id), INDEX idx_permission_id (permission_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='角色权限关联表'; 5. 用户角色关联表(user_roles) CREATE TABLE user_roles ( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT, user_id INT UNSIGNED NOT NULL COMMENT '用户ID', role_id INT UNSIGNED NOT NULL COMMENT '角色ID', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_user_role (user_id, role_id), INDEX idx_user_id (user_id), INDEX idx_role_id (role_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户角色关联表'; 6. 登录日志表(login_logs) CREATE TABLE login_logs ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, user_id INT UNSIGNED NOT NULL COMMENT '用户ID', login_ip VARCHAR(45) NOT NULL COMMENT '登录IP', user_agent TEXT COMMENT '浏览器UA', status TINYINT DEFAULT 1 COMMENT '状态:1成功 0失败', failure_reason VARCHAR(255) COMMENT '失败原因', login_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '登录时间', INDEX idx_user_id (user_id), INDEX idx_login_at (login_at), INDEX idx_status (status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录日志表'; 初始化数据 插入默认角色 -- 插入角色 INSERT INTO roles (name, display_name, description, level) VALUES ('admin', '超级管理员', '拥有所有权限', 100), ('manager', '管理员', '拥有大部分管理权限', 50), ('user', '普通用户', '基础用户权限', 1); 插入权限 -- 插入权限 INSERT INTO permissions (name, display_name, description, module) VALUES -- 用户模块 ('user.create', '创建用户', '可以创建新用户', 'user'), ('user.read', '查看用户', '可以查看用户信息', 'user'), ('user.update', '修改用户', '可以修改用户信息', 'user'), ('user.delete', '删除用户', '可以删除用户', 'user'), -- 角色模块 ('role.create', '创建角色', '可以创建新角色', 'role'), ('role.read', '查看角色', '可以查看角色', 'role'), ('role.update', '修改角色', '可以修改角色', 'role'), ('role.delete', '删除角色', '可以删除角色', 'role'), -- 系统模块 ('system.setting', '系统设置', '可以修改系统设置', 'system'), ('system.log', '查看日志', '可以查看系统日志', 'system'); 分配权限给角色 -- 超级管理员拥有所有权限 INSERT INTO role_permissions (role_id, permission_id) SELECT 1, id FROM permissions; -- 管理员拥有用户模块权限 INSERT INTO role_permissions (role_id, permission_id) SELECT 2, id FROM permissions WHERE module = 'user'; -- 普通用户只有查看权限 INSERT INTO role_permissions (role_id, permission_id) SELECT 3, id FROM permissions WHERE name IN ('user.read'); 创建管理员账户 -- 创建超级管理员 INSERT INTO users (username, password, email, real_name, status) VALUES ( 'admin', SHA2('Admin@123456', 256), -- 密码加密 'admin@example.com', '系统管理员', 1 ); -- 分配超级管理员角色 INSERT INTO user_roles (user_id, role_id) VALUES (1, 1); -- user_id=1, role_id=1(超级管理员) 核心功能实现 功能1:用户注册 -- 注册新用户 INSERT INTO users (username, password, email, phone, real_name) VALUES ( 'zhangsan', SHA2('Password@123', 256), -- 密码加密 'zhangsan@example.com', '13800138000', '张三' ); -- 获取刚插入的用户ID SET @new_user_id = LAST_INSERT_ID(); -- 分配默认角色(普通用户) INSERT INTO user_roles (user_id, role_id) VALUES (@new_user_id, 3); -- role_id=3(普通用户) 功能2:用户登录 -- 验证用户名和密码 SELECT u.id, u.username, u.email, u.real_name, u.nickname, u.avatar, u.status, GROUP_CONCAT(r.name) AS roles FROM users u LEFT JOIN user_roles ur ON u.id = ur.user_id LEFT JOIN roles r ON ur.role_id = r.id WHERE u.username = 'zhangsan' AND u.password = SHA2('Password@123', 256) AND u.status = 1 GROUP BY u.id; -- 如果查到记录,登录成功 -- 更新登录信息 UPDATE users SET login_count = login_count + 1, last_login_at = CURRENT_TIMESTAMP, last_login_ip = '192.168.1.100' WHERE id = 2; -- 记录登录日志 INSERT INTO login_logs (user_id, login_ip, user_agent, status) VALUES (2, '192.168.1.100', 'Mozilla/5.0...', 1); 功能3:权限验证 -- 检查用户是否拥有指定权限 SELECT COUNT(*) AS has_permission FROM users u JOIN user_roles ur ON u.id = ur.user_id JOIN role_permissions rp ON ur.role_id = rp.role_id JOIN permissions p ON rp.permission_id = p.id WHERE u.id = 2 AND p.name = 'user.create'; -- 如果结果>0,说明有权限 功能4:修改个人信息 -- 修改昵称和头像 UPDATE users SET nickname = '张小三', avatar = 'https://example.com/avatars/zhangsan.jpg', updated_at = CURRENT_TIMESTAMP WHERE id = 2; 功能5:修改密码 -- 验证旧密码,修改新密码 UPDATE users SET password = SHA2('NewPassword@456', 256), updated_at = CURRENT_TIMESTAMP WHERE id = 2 AND password = SHA2('Password@123', 256); -- 验证旧密码 -- 检查affected rows -- 如果=1,说明旧密码正确,修改成功 -- 如果=0,说明旧密码错误 功能6:软删除用户 -- 软删除(推荐) UPDATE users SET status = 0, deleted_at = CURRENT_TIMESTAMP WHERE id = 2; -- 查询时排除已删除用户 SELECT * FROM users WHERE deleted_at IS NULL -- 未删除 AND status = 1; -- 正常状态 功能7:数据统计 -- 统计总用户数 SELECT COUNT(*) AS total_users FROM users WHERE deleted_at IS NULL; -- 统计各角色用户数 SELECT r.display_name AS role_name, COUNT(ur.user_id) AS user_count FROM roles r LEFT JOIN user_roles ur ON r.id = ur.role_id LEFT JOIN users u ON ur.user_id = u.id AND u.deleted_at IS NULL GROUP BY r.id, r.display_name ORDER BY user_count DESC; -- 统计最近7天注册用户 SELECT DATE(created_at) AS date, COUNT(*) AS register_count FROM users WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY) GROUP BY DATE(created_at) ORDER BY date; -- 统计最近7天登录日志 SELECT DATE(login_at) AS date, COUNT(*) AS login_count, COUNT(DISTINCT user_id) AS unique_users FROM login_logs WHERE login_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY) AND status = 1 GROUP BY DATE(login_at) ORDER BY date; 数据完整性验证 测试1:唯一约束 -- 尝试注册重复用户名 INSERT INTO users (username, password, email) VALUES ('zhangsan', SHA2('test', 256), 'zhangsan2@example.com'); -- ERROR 1062: Duplicate entry 'zhangsan' for key 'username' -- 尝试注册重复邮箱 INSERT INTO users (username, password, email) VALUES ('zhangsan2', SHA2('test', 256), 'zhangsan@example.com'); -- ERROR 1062: Duplicate entry 'zhangsan@example.com' for key 'email' 测试2:非空约束 -- 尝试注册时不提供密码 INSERT INTO users (username, email) VALUES ('test', 'test@example.com'); -- ERROR 1364: Field 'password' doesn't have a default value 测试3:数据一致性 -- 查询用户的角色和权限 SELECT u.username, r.display_name AS role, GROUP_CONCAT(p.display_name) AS permissions FROM users u JOIN user_roles ur ON u.id = ur.user_id JOIN roles r ON ur.role_id = r.id LEFT JOIN role_permissions rp ON r.id = rp.role_id LEFT JOIN permissions p ON rp.permission_id = p.id WHERE u.id = 2 GROUP BY u.id, u.username, r.display_name; 性能优化 索引优化 -- 查看表的索引 SHOW INDEX FROM users; -- 为常用查询字段添加索引 CREATE INDEX idx_last_login_at ON users(last_login_at); CREATE INDEX idx_login_count ON users(login_count); -- 复合索引(用户名+状态) CREATE INDEX idx_username_status ON users(username, status); 查询优化 -- ❌ 不推荐:SELECT * SELECT * FROM users WHERE username = 'zhangsan'; -- ✅ 推荐:只查询需要的列 SELECT id, username, email, real_name FROM users WHERE username = 'zhangsan'; -- ✅ 推荐:使用LIMIT限制结果 SELECT * FROM users ORDER BY created_at DESC LIMIT 10; 安全加固 1. 密码安全 -- ✅ 使用SHA256加密(基础) SHA2('password', 256) -- ✅ 更安全:加盐(Salt) CONCAT(SHA2(CONCAT('password', 'random_salt'), 256)) -- ✅ 最佳实践:使用bcrypt(应用层实现) 2. SQL注入防护 -- ❌ 危险:字符串拼接 SELECT * FROM users WHERE username = '{$username}'; -- 如果username = "admin' OR '1'='1",会查出所有用户 -- ✅ 安全:使用预处理语句(Prepared Statement) -- 在应用层使用PDO或MySQLi的预处理 3. 权限最小化 -- 创建只读用户(用于查询) CREATE USER 'readonly'@'localhost' IDENTIFIED BY 'password'; GRANT SELECT ON user_management.* TO 'readonly'@'localhost'; -- 创建普通用户(不能删除) CREATE USER 'webapp'@'localhost' IDENTIFIED BY 'password'; GRANT SELECT, INSERT, UPDATE ON user_management.* TO 'webapp'@'localhost'; 完整测试流程 1. 创建测试用户 -- 注册3个测试用户 INSERT INTO users (username, password, email, real_name) VALUES ('alice', SHA2('Alice@123', 256), 'alice@example.com', 'Alice'), ('bob', SHA2('Bob@123', 256), 'bob@example.com', 'Bob'), ('charlie', SHA2('Charlie@123', 256), 'charlie@example.com', 'Charlie'); -- 分配角色 INSERT INTO user_roles (user_id, role_id) VALUES (LAST_INSERT_ID()-2, 2), -- alice: manager (LAST_INSERT_ID()-1, 3), -- bob: user (LAST_INSERT_ID(), 3); -- charlie: user 2. 测试登录 -- Alice登录 INSERT INTO login_logs (user_id, login_ip, status) VALUES (2, '192.168.1.101', 1); -- Bob登录 INSERT INTO login_logs (user_id, login_ip, status) VALUES (3, '192.168.1.102', 1); -- 登录失败(密码错误) INSERT INTO login_logs (user_id, login_ip, status, failure_reason) VALUES (4, '192.168.1.103', 0, '密码错误'); 3. 测试权限 -- Alice(管理员)创建用户 SELECT '权限验证:Alice创建用户' AS action; SELECT COUNT(*) AS has_permission FROM users u JOIN user_roles ur ON u.id = ur.user_id JOIN role_permissions rp ON ur.role_id = rp.role_id JOIN permissions p ON rp.permission_id = p.id WHERE u.username = 'alice' AND p.name = 'user.create'; -- 结果:1(有权限) -- Bob(普通用户)创建用户 SELECT '权限验证:Bob创建用户' AS action; SELECT COUNT(*) AS has_permission FROM users u JOIN user_roles ur ON u.id = ur.user_id JOIN role_permissions rp ON ur.role_id = rp.role_id JOIN permissions p ON rp.permission_id = p.id WHERE u.username = 'bob' AND p.name = 'user.create'; -- 结果:0(无权限) 总结与展望 本案例涵盖的知识点 ✅ 数据库设计: ...

2025-11-20 · maneng

RocketMQ进阶02:事务消息实战 - 订单支付场景完整实现

引言:真实业务场景 业务需求:用户支付成功后,需要完成以下操作: 支付服务:记录支付流水 订单服务:更新订单状态 积分服务:增加用户积分 通知服务:发送支付成功短信 要求:保证数据一致性,支付成功后其他操作必须成功。 一、系统架构设计 1.1 整体架构 ┌──────────────────────────────────────────────────────┐ │ 订单支付系统架构 │ ├──────────────────────────────────────────────────────┤ │ │ │ 用户端 │ │ │ │ │ │ 1. 发起支付请求 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 支付服务(Producer) │ │ │ │ - 调用第三方支付 │ │ │ │ - 记录支付流水(本地事务) │ │ │ │ - 发送事务消息 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ 事务消息 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ RocketMQ Broker │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ 订单服务 │ │ 积分服务 │ │ 通知服务 │ │ │ │ Consumer │ │ Consumer │ │ Consumer │ │ │ │ │ │ │ │ │ │ │ │ 更新订单 │ │ 增加积分 │ │ 发送短信 │ │ │ │ 状态 │ │ │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ 1.2 消息流转 支付成功 → 发送Half消息 → 记录支付流水(本地事务) ↓ 事务提交(Commit) ↓ 消息变为可消费状态 ↓ ┌─────────────┼─────────────┐ ▼ ▼ ▼ 订单服务 积分服务 通知服务 更新状态 增加积分 发送短信 二、代码实现 2.1 数据库设计 -- 支付流水表 CREATE TABLE payment_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, pay_time DATETIME NOT NULL, status VARCHAR(20) NOT NULL, -- SUCCESS, FAILED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_order_id (order_id) ); -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) NOT NULL, -- CREATED, PAID, CANCELLED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 积分表 CREATE TABLE user_points ( user_id BIGINT PRIMARY KEY, points INT NOT NULL DEFAULT 0, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); 2.2 支付服务实现 @Service public class PaymentService { @Autowired private TransactionMQProducer transactionProducer; @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 处理支付请求 */ public PaymentResult processPay(PaymentRequest request) { String orderId = request.getOrderId(); BigDecimal amount = request.getAmount(); // 1. 调用第三方支付接口 ThirdPartyPayResult payResult = callThirdPartyPay(orderId, amount); if (!payResult.isSuccess()) { return PaymentResult.failed("支付失败:" + payResult.getMessage()); } // 2. 构建事务消息 PaymentEvent event = new PaymentEvent(); event.setOrderId(orderId); event.setUserId(request.getUserId()); event.setAmount(amount); event.setPayTime(new Date()); Message message = new Message( "payment_success_topic", "pay", orderId, // Key JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8) ); // 3. 发送事务消息 try { TransactionSendResult sendResult = transactionProducer.sendMessageInTransaction( message, event // 传递给executeLocalTransaction ); if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { return PaymentResult.success("支付成功"); } else { return PaymentResult.failed("支付记录失败"); } } catch (Exception e) { log.error("发送事务消息失败", e); return PaymentResult.failed("系统异常"); } } /** * 调用第三方支付(模拟) */ private ThirdPartyPayResult callThirdPartyPay(String orderId, BigDecimal amount) { // 实际调用支付宝、微信等支付接口 log.info("调用第三方支付:订单={}, 金额={}", orderId, amount); return ThirdPartyPayResult.success(); } } /** * 事务消息监听器 */ @Component public class PaymentTransactionListener implements TransactionListener { @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 执行本地事务:记录支付流水 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { PaymentEvent event = (PaymentEvent) arg; String orderId = event.getOrderId(); try { // 1. 插入支付记录 PaymentRecord record = new PaymentRecord(); record.setOrderId(orderId); record.setUserId(event.getUserId()); record.setAmount(event.getAmount()); record.setPayTime(event.getPayTime()); record.setStatus("SUCCESS"); int rows = paymentRecordMapper.insert(record); if (rows > 0) { log.info("支付记录成功:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.error("支付记录失败:{}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (DuplicateKeyException e) { // 重复记录,说明已经成功 log.warn("支付记录重复:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("支付记录异常:{}", orderId, e); // 返回UNKNOW,等待回查 return LocalTransactionState.UNKNOW; } } /** * 回查事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getKeys(); try { // 查询支付记录是否存在 PaymentRecord record = paymentRecordMapper.selectByOrderId(orderId); if (record != null && "SUCCESS".equals(record.getStatus())) { log.info("回查成功:支付记录存在,订单={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("回查失败:支付记录不存在,订单={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("回查异常:{}", orderId, e); return LocalTransactionState.UNKNOW; } } } 2.3 订单服务实现 @Service public class OrderConsumerService { @Autowired private OrderMapper orderMapper; /** * 消费支付成功消息 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "order_consumer_group" ) public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); log.info("收到支付成功消息:{}", orderId); try { // 1. 幂等性检查 Order order = orderMapper.selectByOrderId(orderId); if (order == null) { log.error("订单不存在:{}", orderId); return; } if ("PAID".equals(order.getStatus())) { log.warn("订单已支付,跳过处理:{}", orderId); return; // 幂等性保证 } // 2. 更新订单状态 order.setStatus("PAID"); order.setUpdatedAt(new Date()); int rows = orderMapper.updateById(order); if (rows > 0) { log.info("订单状态更新成功:{}", orderId); } else { log.error("订单状态更新失败:{}", orderId); throw new RuntimeException("更新失败"); } } catch (Exception e) { log.error("处理支付消息异常:{}", orderId, e); throw new RuntimeException(e); // 触发重试 } } } } 2.4 积分服务实现 @Service public class PointsConsumerService { @Autowired private UserPointsMapper userPointsMapper; @Autowired private PointsRecordMapper pointsRecordMapper; /** * 消费支付成功消息,增加积分 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "points_consumer_group" ) public class PointsMessageListener implements RocketMQListener<String> { @Override @Transactional public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); Long userId = event.getUserId(); log.info("收到支付成功消息,增加积分:用户={}, 订单={}", userId, orderId); try { // 1. 幂等性检查:是否已经增加过积分 PointsRecord existing = pointsRecordMapper.selectByOrderId(orderId); if (existing != null) { log.warn("积分已增加,跳过处理:{}", orderId); return; } // 2. 计算积分(1元=1积分) int points = event.getAmount().intValue(); // 3. 增加积分 int rows = userPointsMapper.increasePoints(userId, points); if (rows == 0) { // 用户不存在,创建记录 UserPoints userPoints = new UserPoints(); userPoints.setUserId(userId); userPoints.setPoints(points); userPointsMapper.insert(userPoints); } // 4. 记录积分变更 PointsRecord record = new PointsRecord(); record.setUserId(userId); record.setOrderId(orderId); record.setPoints(points); record.setReason("支付获得"); pointsRecordMapper.insert(record); log.info("积分增加成功:用户={}, 积分={}", userId, points); } catch (DuplicateKeyException e) { // 重复处理,直接返回 log.warn("积分记录重复:{}", orderId); } catch (Exception e) { log.error("积分增加异常:{}", orderId, e); throw new RuntimeException(e); } } } } 三、测试验证 3.1 正常流程测试 @SpringBootTest public class PaymentTransactionTest { @Autowired private PaymentService paymentService; @Test public void testNormalPayment() throws InterruptedException { // 1. 发起支付 PaymentRequest request = new PaymentRequest(); request.setOrderId("ORDER_001"); request.setUserId(10001L); request.setAmount(new BigDecimal("99.00")); PaymentResult result = paymentService.processPay(request); // 2. 验证支付结果 assertTrue(result.isSuccess()); // 3. 等待消息消费(实际生产环境通过监控确认) Thread.sleep(5000); // 4. 验证订单状态 Order order = orderMapper.selectByOrderId("ORDER_001"); assertEquals("PAID", order.getStatus()); // 5. 验证积分 UserPoints points = userPointsMapper.selectByUserId(10001L); assertTrue(points.getPoints() >= 99); } } 3.2 异常场景测试 @Test public void testPaymentFailure() { // 场景1:支付失败 // 预期:不记录支付流水,不发送消息 // 场景2:记录支付流水失败 // 预期:回滚消息,不通知下游服务 // 场景3:Producer宕机 // 预期:回查机制保证消息最终发送 // 场景4:Consumer消费失败 // 预期:自动重试,直到成功 } 四、最佳实践总结 4.1 关键要点 1. 幂等性设计 - 使用唯一键约束 - 消费前检查是否已处理 - 数据库层面保证 2. 本地事务设计 - 尽量简单 - 避免远程调用 - 快速返回 3. 回查逻辑设计 - 通过数据库查询事务状态 - 不依赖缓存 - 保证幂等 4. 异常处理 - 明确的Commit/Rollback - 未知异常返回UNKNOW - 记录详细日志 4.2 注意事项 ❌ 避免: - 本地事务中调用RPC - 本地事务执行时间过长 - 回查逻辑依赖不可靠数据源 ✅ 推荐: - 本地事务只操作数据库 - 控制本地事务在1秒内 - 回查直接查询数据库 五、总结 本文通过订单支付场景,完整展示了事务消息的实战应用: ...

2025-11-14 · maneng

实战:电商秒杀系统的多层限流方案

引言:秒杀系统的终极挑战 2024年双11,某电商平台iPhone秒杀: 库存:100台 参与人数:10万人 秒杀时间:10:00:00开始 预期流量:瞬间10万QPS 不做限流的后果: 10:00:00.000 - 10万请求同时到达 ↓ 数据库:100个连接池瞬间耗尽 ↓ 应用服务器:200个线程全部阻塞 ↓ 响应时间:从50ms飙升到30秒 ↓ 系统崩溃:所有接口无法响应 ↓ 用户体验:网站打不开,投诉无数 今天,我们将设计一个完整的秒杀系统限流方案,综合运用前面学到的所有知识。 一、场景分析 1.1 业务需求 秒杀活动: 商品: iPhone 15 Pro Max 库存: 100台 价格: 6999元(原价9999元) 开始时间: 2024-11-11 10:00:00 预计参与人数: 10万人 技术指标: 系统容量: QPS 5000 数据库连接: 100个 应用线程池: 200个 Redis: 10000 QPS 1.2 系统架构 用户(10万) ↓ CDN(静态资源) ↓ ┌─────────────────────────────────┐ │ 第1层:网关限流 │ │ - 总入口QPS:5000 │ │ - 限制单用户频率 │ └────────────┬────────────────────┘ ↓ ┌─────────────────────────────────┐ │ 第2层:接口限流 │ │ - 秒杀接口:QPS 1000 │ │ - Warm Up预热 │ └────────────┬────────────────────┘ ↓ ┌─────────────────────────────────┐ │ 第3层:热点限流 │ │ - 商品维度:每个商品100 QPS │ └────────────┬────────────────────┘ ↓ ┌─────────────────────────────────┐ │ 第4层:库存扣减限流 │ │ - 线程数限流:10个 │ │ - 匀速排队 │ └────────────┬────────────────────┘ ↓ 数据库(100个连接) 二、第1层:网关限流 2.1 网关层的职责 目标: ...

2025-01-21 · maneng

事务实战:转账案例与并发控制

实战案例概览 案例 核心问题 解决方案 难点 转账业务 数据一致性、死锁 固定加锁顺序、悲观锁 多账户并发转账 秒杀抢购 超卖、高并发 乐观锁 + 限流 10000人抢100件商品 订单支付 重复支付、幂等性 悲观锁 + 唯一约束 防止重复扣款 红包发放 余额不足、公平性 悲观锁 + 事务隔离 1个红包被多人抢 积分扣减 负数积分 乐观锁 + 余额检查 并发扣减积分 案例1:转账业务 需求 用户A向用户B转账100元,要求: 余额不能为负数 转账过程中不能被打断 防止死锁 方案1:基础实现(有死锁风险) -- ❌ 可能死锁 -- 事务A:A向B转100 START TRANSACTION; UPDATE account SET balance = balance - 100 WHERE user_id = 'A'; -- 锁A UPDATE account SET balance = balance + 100 WHERE user_id = 'B'; -- 等待锁B COMMIT; -- 事务B:B向A转50(并发执行) START TRANSACTION; UPDATE account SET balance = balance - 50 WHERE user_id = 'B'; -- 锁B UPDATE account SET balance = balance + 50 WHERE user_id = 'A'; -- 等待锁A(死锁!) COMMIT; 方案2:固定加锁顺序(推荐) -- ✅ 避免死锁:按user_id升序加锁 -- 转账函数(伪代码) FUNCTION transfer(from_user, to_user, amount): -- 1. 固定加锁顺序(按user_id升序) first_user = MIN(from_user, to_user) second_user = MAX(from_user, to_user) START TRANSACTION; -- 2. 按顺序锁定账户 SELECT balance FROM account WHERE user_id = first_user FOR UPDATE; SELECT balance FROM account WHERE user_id = second_user FOR UPDATE; -- 3. 检查余额 IF from_user.balance < amount THEN ROLLBACK; RETURN "余额不足"; END IF; -- 4. 扣款和到账 UPDATE account SET balance = balance - amount WHERE user_id = from_user; UPDATE account SET balance = balance + amount WHERE user_id = to_user; COMMIT; RETURN "转账成功"; END FUNCTION; 方案3:Java实现(完整代码) @Service public class TransferService { @Autowired private AccountMapper accountMapper; @Transactional(rollbackFor = Exception.class) public void transfer(String fromUser, String toUser, BigDecimal amount) { // 1. 固定加锁顺序(避免死锁) String firstUser = fromUser.compareTo(toUser) < 0 ? fromUser : toUser; String secondUser = fromUser.compareTo(toUser) < 0 ? toUser : fromUser; // 2. 按顺序锁定账户(悲观锁) Account first = accountMapper.selectForUpdate(firstUser); Account second = accountMapper.selectForUpdate(secondUser); // 3. 检查余额 Account fromAccount = fromUser.equals(firstUser) ? first : second; if (fromAccount.getBalance().compareTo(amount) < 0) { throw new BusinessException("余额不足"); } // 4. 扣款和到账 accountMapper.updateBalance(fromUser, amount.negate()); // 扣款 accountMapper.updateBalance(toUser, amount); // 到账 // 5. 记录流水(可选) recordTransferLog(fromUser, toUser, amount); } } <!-- MyBatis Mapper --> <select id="selectForUpdate" resultType="Account"> SELECT * FROM account WHERE user_id = #{userId} FOR UPDATE </select> <update id="updateBalance"> UPDATE account SET balance = balance + #{amount} WHERE user_id = #{userId} </update> 案例2:秒杀抢购 需求 10000个用户抢购100件商品,要求: ...

2025-01-14 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...