库存预占与释放机制:高并发下的库存一致性保障

引言 想象这样一个场景:用户在购物车中看到商品显示"有货",提交订单时却提示"库存不足"。或者更糟糕的情况:订单已支付,仓库却发现没有库存可发货。这些问题的根源都指向一个核心能力:库存预占。 库存预占是OMS系统的生命线。它要解决的核心问题是:在下单到发货的整个周期内,如何保证已承诺给用户的商品库存不被其他订单抢走?本文将从第一性原理出发,系统性地探讨库存预占与释放机制的设计与实现。 库存预占的本质 什么是库存预占? 库存预占(Inventory Reservation)是指在订单创建时,为订单中的商品临时锁定库存,防止被其他订单占用。 库存状态转换: 可用库存 → 预占库存 → 已出库 ↓ 释放回可用(订单取消) 为什么需要库存预占? 场景1:下单到支付的时间窗口 用户操作流程: 1. 加入购物车(不预占) 2. 提交订单(预占库存)← 关键点 3. 支付(15分钟内) 4. 发货 问题:如果不预占,步骤2到步骤3之间,库存可能被其他用户购买 场景2:支付到发货的时间窗口 订单处理流程: 1. 用户支付成功 2. 订单进入待发货队列 3. 仓库拣货打包(可能需要几小时) 4. 发货 问题:如果不预占,步骤1到步骤4之间,库存可能被超卖 场景3:高并发秒杀 秒杀场景: - 库存:100件 - 请求:10000个并发请求 - 目标:保证只有前100个请求成功,且不超卖 核心挑战:如何在毫秒级时间内准确预占库存? 库存预占的核心目标 防止超卖:已售数量 ≤ 实际库存 保证时效:预占操作要快(毫秒级) 支持释放:订单取消/超时后自动释放 高并发:支持万级QPS的库存预占 数据一致性:分布式环境下保证强一致性 库存预占流程设计 标准预占流程 class InventoryReservationService: """库存预占服务""" def reserve(self, order): """ 预占库存的标准流程 1. 校验库存可用性 2. 锁定库存 3. 扣减可用库存 4. 增加预占库存 5. 记录预占日志 """ try: # 1. 校验库存 self._validate_inventory(order) # 2. 获取分布式锁 lock = self._acquire_lock(order.items) # 3. 执行预占 reservations = [] for item in order.items: reservation = self._reserve_single_item( sku_id=item.sku_id, warehouse_id=item.warehouse_id, quantity=item.quantity, order_id=order.order_id ) reservations.append(reservation) # 4. 释放锁 self._release_lock(lock) return ReservationResult( success=True, reservations=reservations ) except InsufficientStockException as e: return ReservationResult( success=False, error=str(e) ) def _reserve_single_item(self, sku_id, warehouse_id, quantity, order_id): """预占单个SKU的库存""" # 原子操作:扣减可用库存,增加预占库存 with self.db.transaction(): # 1. 查询当前库存(加行锁) inventory = self.db.query( """ SELECT available, reserved FROM inventory WHERE sku_id = %s AND warehouse_id = %s FOR UPDATE """, (sku_id, warehouse_id) ) # 2. 检查库存充足性 if inventory.available < quantity: raise InsufficientStockException( f"SKU {sku_id} 库存不足" ) # 3. 更新库存 self.db.execute( """ UPDATE inventory SET available = available - %s, reserved = reserved + %s WHERE sku_id = %s AND warehouse_id = %s """, (quantity, quantity, sku_id, warehouse_id) ) # 4. 记录预占记录 reservation = InventoryReservation( reservation_id=generate_id(), order_id=order_id, sku_id=sku_id, warehouse_id=warehouse_id, quantity=quantity, status='RESERVED', expire_at=datetime.now() + timedelta(minutes=30) ) self.db.insert('inventory_reservations', reservation) return reservation 数据库设计 -- 库存表 CREATE TABLE inventory ( id BIGINT PRIMARY KEY AUTO_INCREMENT, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, available INT NOT NULL DEFAULT 0, -- 可用库存 reserved INT NOT NULL DEFAULT 0, -- 预占库存 sold INT NOT NULL DEFAULT 0, -- 已售库存 total INT NOT NULL DEFAULT 0, -- 总库存 version INT NOT NULL DEFAULT 0, -- 乐观锁版本号 created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id), INDEX idx_sku (sku_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 库存预占记录表 CREATE TABLE inventory_reservations ( id BIGINT PRIMARY KEY AUTO_INCREMENT, reservation_id VARCHAR(64) NOT NULL UNIQUE, order_id VARCHAR(64) NOT NULL, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, quantity INT NOT NULL, status VARCHAR(32) NOT NULL, -- RESERVED, CONSUMED, RELEASED expire_at DATETIME NOT NULL, -- 预占过期时间 created_at DATETIME NOT NULL, released_at DATETIME, INDEX idx_order (order_id), INDEX idx_expire (expire_at, status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 库存变更日志表 CREATE TABLE inventory_change_logs ( id BIGINT PRIMARY KEY AUTO_INCREMENT, log_id VARCHAR(64) NOT NULL UNIQUE, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, change_type VARCHAR(32) NOT NULL, -- RESERVE, RELEASE, CONSUME quantity INT NOT NULL, before_available INT NOT NULL, after_available INT NOT NULL, order_id VARCHAR(64), created_at DATETIME NOT NULL, INDEX idx_sku (sku_id), INDEX idx_order (order_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 库存预占失败处理 失败场景分类 class ReservationFailureHandler: """预占失败处理器""" def handle_failure(self, order, failure_reason): """处理预占失败""" if isinstance(failure_reason, InsufficientStockException): return self._handle_insufficient_stock(order) elif isinstance(failure_reason, DistributedLockException): return self._handle_lock_timeout(order) elif isinstance(failure_reason, DatabaseException): return self._handle_db_error(order) else: return self._handle_unknown_error(order) def _handle_insufficient_stock(self, order): """处理库存不足""" # 1. 尝试部分预占 partial_result = self._try_partial_reservation(order) if partial_result.has_available_items(): return self._create_partial_order( order, partial_result.available_items ) # 2. 尝试跨仓调拨 transfer_result = self._try_warehouse_transfer(order) if transfer_result.success: return self._retry_reservation(order) # 3. 推荐替代商品 alternatives = self._find_alternatives(order) return ReservationResult( success=False, error="库存不足", alternatives=alternatives ) def _handle_lock_timeout(self, order): """处理锁超时""" # 重试策略:指数退避 for retry in range(3): time.sleep(0.1 * (2 ** retry)) # 100ms, 200ms, 400ms try: return self.reservation_service.reserve(order) except DistributedLockException: continue # 降级策略:异步处理 return self._async_reservation(order) 部分预占策略 class PartialReservationStrategy: """部分预占策略""" def try_partial_reservation(self, order): """尝试部分预占""" available_items = [] unavailable_items = [] for item in order.items: try: # 尝试预占单个商品 reservation = self.reservation_service.reserve_single_item( item ) available_items.append(item) except InsufficientStockException: unavailable_items.append(item) return PartialReservationResult( available_items=available_items, unavailable_items=unavailable_items, fulfillment_rate=len(available_items) / len(order.items) ) def should_accept_partial(self, result, order): """判断是否接受部分预占""" # 策略1:订单金额超过阈值,接受部分预占 if order.total_amount > 500 and result.fulfillment_rate > 0.8: return True # 策略2:重要商品都有货,接受部分预占 critical_items = [i for i in order.items if i.is_critical] if all(i in result.available_items for i in critical_items): return True # 策略3:用户设置允许部分发货 if order.allow_partial_shipment: return True return False 库存释放触发条件 自动释放场景 class InventoryReleaseService: """库存释放服务""" def __init__(self): # 定义释放规则 self.release_rules = [ # 规则1:订单取消 ReleaseRule( name="order_cancelled", condition=lambda order: order.status == 'CANCELLED', action=self._release_all ), # 规则2:支付超时 ReleaseRule( name="payment_timeout", condition=lambda order: ( order.status == 'PENDING_PAYMENT' and datetime.now() > order.payment_deadline ), action=self._release_all ), # 规则3:部分退货 ReleaseRule( name="partial_refund", condition=lambda order: order.has_partial_refund(), action=self._release_partial ), # 规则4:预占过期 ReleaseRule( name="reservation_expired", condition=lambda reservation: ( datetime.now() > reservation.expire_at ), action=self._release_expired ) ] def release_for_order(self, order): """为订单释放库存""" # 1. 查询订单的所有预占记录 reservations = self.reservation_repo.find_by_order(order.order_id) # 2. 执行释放 released_count = 0 for reservation in reservations: if reservation.status != 'RESERVED': continue try: self._release_reservation(reservation) released_count += 1 except Exception as e: self.logger.error( f"释放预占失败: {reservation.reservation_id}", exc_info=e ) return ReleaseResult( success=True, released_count=released_count ) def _release_reservation(self, reservation): """释放单个预占""" with self.db.transaction(): # 1. 更新库存(原子操作) affected_rows = self.db.execute( """ UPDATE inventory SET available = available + %s, reserved = reserved - %s WHERE sku_id = %s AND warehouse_id = %s AND reserved >= %s """, ( reservation.quantity, reservation.quantity, reservation.sku_id, reservation.warehouse_id, reservation.quantity ) ) if affected_rows == 0: raise InventoryMismatchException( f"预占库存不足: {reservation.reservation_id}" ) # 2. 更新预占记录状态 self.db.execute( """ UPDATE inventory_reservations SET status = 'RELEASED', released_at = NOW() WHERE reservation_id = %s """, (reservation.reservation_id,) ) # 3. 记录变更日志 self._log_inventory_change( sku_id=reservation.sku_id, warehouse_id=reservation.warehouse_id, change_type='RELEASE', quantity=reservation.quantity, order_id=reservation.order_id ) 定时释放任务 class ScheduledReleaseJob: """定时释放任务""" def run(self): """ 每分钟执行一次,释放过期预占 批量处理策略: 1. 查询过期预占(分批查询) 2. 批量释放库存 3. 批量更新预占状态 """ batch_size = 1000 offset = 0 while True: # 1. 查询过期预占 expired_reservations = self.reservation_repo.find_expired( limit=batch_size, offset=offset ) if not expired_reservations: break # 2. 按仓库和SKU分组 grouped = defaultdict(list) for res in expired_reservations: key = (res.sku_id, res.warehouse_id) grouped[key].append(res) # 3. 批量释放 for (sku_id, warehouse_id), reservations in grouped.items(): total_quantity = sum(r.quantity for r in reservations) # 批量更新库存 self.db.execute( """ UPDATE inventory SET available = available + %s, reserved = reserved - %s WHERE sku_id = %s AND warehouse_id = %s """, (total_quantity, total_quantity, sku_id, warehouse_id) ) # 批量更新预占状态 reservation_ids = [r.reservation_id for r in reservations] self.db.execute( """ UPDATE inventory_reservations SET status = 'RELEASED', released_at = NOW() WHERE reservation_id IN %s """, (tuple(reservation_ids),) ) offset += batch_size # 限流保护 time.sleep(0.1) 库存预占的并发控制 数据库层面的并发控制 方案1:悲观锁(SELECT FOR UPDATE) ...

2025-11-22 · maneng

Redis实战:分布式锁、消息队列、缓存设计

一、分布式锁:从SETNX到Redlock 1.1 为什么需要分布式锁? 场景:秒杀系统的超卖问题 // ❌ 错误:单机锁无法解决分布式超卖 @Service public class SeckillService { @Autowired private ProductRepository productRepository; /** * 秒杀下单(单机锁) */ public synchronized Order seckill(Long productId, Long userId) { // 1. 检查库存 Integer stock = productRepository.getStock(productId); if (stock <= 0) { throw new SoldOutException("商品已售罄"); } // 2. 扣减库存 productRepository.decrementStock(productId); // 3. 创建订单 return orderService.createOrder(productId, userId); } } // 问题: // 假设有3台服务器(Server A、B、C) // T1: Server A:检查库存=1(通过) // T2: Server B:检查库存=1(通过) // T3: Server A:扣减库存=0,创建订单1 // T4: Server B:扣减库存=-1(超卖!),创建订单2 // synchronized只能锁住单机JVM内的线程 // 无法锁住分布式环境的多个进程 核心问题:分布式环境下,单机锁无效,需要分布式锁。 ...

2025-11-03 · maneng

Redis实战:分布式锁、消息队列、缓存设计

一、分布式锁:从SETNX到Redlock 1.1 为什么需要分布式锁? 场景:秒杀系统的超卖问题 // ❌ 错误:单机锁无法解决分布式超卖 @Service public class SeckillService { @Autowired private ProductRepository productRepository; /** * 秒杀下单(单机锁) */ public synchronized Order seckill(Long productId, Long userId) { // 1. 检查库存 Integer stock = productRepository.getStock(productId); if (stock <= 0) { throw new SoldOutException("商品已售罄"); } // 2. 扣减库存 productRepository.decrementStock(productId); // 3. 创建订单 return orderService.createOrder(productId, userId); } } // 问题: // 假设有3台服务器(Server A、B、C) // T1: Server A:检查库存=1(通过) // T2: Server B:检查库存=1(通过) // T3: Server A:扣减库存=0,创建订单1 // T4: Server B:扣减库存=-1(超卖!),创建订单2 // synchronized只能锁住单机JVM内的线程 // 无法锁住分布式环境的多个进程 核心问题:分布式环境下,单机锁无效,需要分布式锁。 ...

2025-11-03 · maneng

渠道共享库存中心:Redis分布式锁的生产实践

引子:一次价值百万的库存超卖事故 2024年双十一,凌晨00:05分,我的手机突然响起刺耳的告警声。 打开监控大盘,一行红色数字让我瞬间清醒:某爆款SKU超卖217件。这意味着我们已经卖出了比实际库存多217件商品,客户投诉、赔偿成本、品牌信任危机接踵而至。 这不是我第一次遇到超卖问题,但这次的损失格外惨重——该SKU是限量联名款,成本价就超过500元,217件的赔偿成本加上品牌损失,直接损失超过百万。 事后复盘,我们发现问题的根源:在分布式环境下,单机锁完全失效了。 10个服务实例同时处理来自Amazon、Shopify、天猫国际等多个渠道的订单,每个实例内部的synchronized锁只能保证单个JVM内的线程安全,但对于跨JVM的并发请求,库存扣减的原子性荡然无存。 这次事故后,我们用了整整一周时间,重构了整个库存中心的并发控制机制,将Redis分布式锁引入生产环境。三个月后,超卖率从5%降至0.1%,系统TPS从200提升到2000。 这篇文章,就是那次重构的完整技术总结。 问题本质:分布式环境下的并发控制 业务场景 我们的渠道共享库存中心需要服务10+电商平台,这些平台的订单会同时扣减同一个商品的库存: 时间 渠道 SKU-1001 操作 当前库存 00:01 Amazon -1 扣减 100 → 99 00:01 Shopify -2 扣减 99 → 97 00:01 天猫国际 -1 扣减 97 → 96 00:01 独立站 -3 扣减 96 → 93 在分布式部署下(假设5个服务实例),这些请求可能被不同的实例处理。如果没有正确的并发控制机制,就会出现经典的竞态条件(Race Condition)。 错误方案的代价 我们尝试过的失败方案: ❌ 方案1:数据库行锁 SELECT * FROM inventory WHERE sku = 'SKU-1001' FOR UPDATE; UPDATE inventory SET quantity = quantity - 1 WHERE sku = 'SKU-1001'; 问题: 性能极差:TPS<50,远低于业务需求 锁等待严重:高并发时大量请求超时 数据库压力大:成为系统瓶颈 ❌ 方案2:乐观锁(版本号) // 基于版本号的乐观锁 UPDATE inventory SET quantity = quantity - 1, version = version + 1 WHERE sku = 'SKU-1001' AND version = 10; 问题: ...

2025-10-15 · maneng

渠道共享库存中心:Redis分布式锁的生产实践

引子:一次价值百万的库存超卖事故 2024年双十一,凌晨00:05分,我的手机突然响起刺耳的告警声。 打开监控大盘,一行红色数字让我瞬间清醒:某爆款SKU超卖217件。这意味着我们已经卖出了比实际库存多217件商品,客户投诉、赔偿成本、品牌信任危机接踵而至。 这不是我第一次遇到超卖问题,但这次的损失格外惨重——该SKU是限量联名款,成本价就超过500元,217件的赔偿成本加上品牌损失,直接损失超过百万。 事后复盘,我们发现问题的根源:在分布式环境下,单机锁完全失效了。 10个服务实例同时处理来自Amazon、Shopify、天猫国际等多个渠道的订单,每个实例内部的synchronized锁只能保证单个JVM内的线程安全,但对于跨JVM的并发请求,库存扣减的原子性荡然无存。 这次事故后,我们用了整整一周时间,重构了整个库存中心的并发控制机制,将Redis分布式锁引入生产环境。三个月后,超卖率从5%降至0.1%,系统TPS从200提升到2000。 这篇文章,就是那次重构的完整技术总结。 问题本质:分布式环境下的并发控制 业务场景 我们的渠道共享库存中心需要服务10+电商平台,这些平台的订单会同时扣减同一个商品的库存: 时间 渠道 SKU-1001 操作 当前库存 00:01 Amazon -1 扣减 100 → 99 00:01 Shopify -2 扣减 99 → 97 00:01 天猫国际 -1 扣减 97 → 96 00:01 独立站 -3 扣减 96 → 93 在分布式部署下(假设5个服务实例),这些请求可能被不同的实例处理。如果没有正确的并发控制机制,就会出现经典的竞态条件(Race Condition)。 错误方案的代价 我们尝试过的失败方案: ❌ 方案1:数据库行锁 SELECT * FROM inventory WHERE sku = 'SKU-1001' FOR UPDATE; UPDATE inventory SET quantity = quantity - 1 WHERE sku = 'SKU-1001'; 问题: 性能极差:TPS<50,远低于业务需求 锁等待严重:高并发时大量请求超时 数据库压力大:成为系统瓶颈 ❌ 方案2:乐观锁(版本号) // 基于版本号的乐观锁 UPDATE inventory SET quantity = quantity - 1, version = version + 1 WHERE sku = 'SKU-1001' AND version = 10; 问题: ...

2025-10-15 · maneng

渠道共享库存中心:Redis分布式锁的生产实践

引子:一次价值百万的库存超卖事故 2024年双十一,凌晨00:05分,我的手机突然响起刺耳的告警声。 打开监控大盘,一行红色数字让我瞬间清醒:某爆款SKU超卖217件。这意味着我们已经卖出了比实际库存多217件商品,客户投诉、赔偿成本、品牌信任危机接踵而至。 这不是我第一次遇到超卖问题,但这次的损失格外惨重——该SKU是限量联名款,成本价就超过500元,217件的赔偿成本加上品牌损失,直接损失超过百万。 事后复盘,我们发现问题的根源:在分布式环境下,单机锁完全失效了。 10个服务实例同时处理来自Amazon、Shopify、天猫国际等多个渠道的订单,每个实例内部的synchronized锁只能保证单个JVM内的线程安全,但对于跨JVM的并发请求,库存扣减的原子性荡然无存。 这次事故后,我们用了整整一周时间,重构了整个库存中心的并发控制机制,将Redis分布式锁引入生产环境。三个月后,超卖率从5%降至0.1%,系统TPS从200提升到2000。 这篇文章,就是那次重构的完整技术总结。 问题本质:分布式环境下的并发控制 业务场景 我们的渠道共享库存中心需要服务10+电商平台,这些平台的订单会同时扣减同一个商品的库存: 时间 渠道 SKU-1001 操作 当前库存 00:01 Amazon -1 扣减 100 → 99 00:01 Shopify -2 扣减 99 → 97 00:01 天猫国际 -1 扣减 97 → 96 00:01 独立站 -3 扣减 96 → 93 在分布式部署下(假设5个服务实例),这些请求可能被不同的实例处理。如果没有正确的并发控制机制,就会出现经典的竞态条件(Race Condition)。 错误方案的代价 我们尝试过的失败方案: ❌ 方案1:数据库行锁 SELECT * FROM inventory WHERE sku = 'SKU-1001' FOR UPDATE; UPDATE inventory SET quantity = quantity - 1 WHERE sku = 'SKU-1001'; 问题: 性能极差:TPS<50,远低于业务需求 锁等待严重:高并发时大量请求超时 数据库压力大:成为系统瓶颈 ❌ 方案2:乐观锁(版本号) // 基于版本号的乐观锁 UPDATE inventory SET quantity = quantity - 1, version = version + 1 WHERE sku = 'SKU-1001' AND version = 10; 问题: ...

2025-10-15 · maneng

分布式锁实战:从SETNX到Redlock

为什么需要分布式锁? 单机锁的局限: // ❌ 单机环境有效 synchronized (this) { // 扣减库存 } // ❌ 分布式环境失效 // 服务器1和服务器2的锁互不影响 分布式锁特性: 互斥性:同一时刻只有一个客户端持有锁 安全性:只有持锁者能释放锁 可用性:高可用,避免死锁 性能:加锁解锁快速 方案演进 V1:基础版(SETNX + EXPIRE) public boolean lock(String key, String value, long expireSeconds) { Boolean success = redis.opsForValue().setIfAbsent(key, value); if (Boolean.TRUE.equals(success)) { redis.expire(key, expireSeconds, TimeUnit.SECONDS); return true; } return false; } // ❌ 问题:SETNX和EXPIRE不是原子操作 // 如果SETNX成功后宕机,锁永不过期 V2:原子操作版(SET NX EX) public boolean lock(String key, String value, long expireSeconds) { Boolean success = redis.opsForValue() .setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } public void unlock(String key) { redis.delete(key); } // ❌ 问题:可能释放别人的锁 // 线程A持锁超时,锁自动释放 // 线程B获得锁 // 线程A执行完,删除了线程B的锁 V3:安全释放版(Lua脚本) public class DistributedLock { @Autowired private RedisTemplate<String, String> redis; private static final String UNLOCK_SCRIPT = "if redis.call('GET', KEYS[1]) == ARGV[1] then " + " return redis.call('DEL', KEYS[1]) " + "else " + " return 0 " + "end"; // 加锁 public boolean lock(String key, String requestId, long expireSeconds) { Boolean success = redis.opsForValue() .setIfAbsent(key, requestId, expireSeconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } // 安全解锁 public boolean unlock(String key, String requestId) { Long result = redis.execute( RedisScript.of(UNLOCK_SCRIPT, Long.class), Collections.singletonList(key), requestId ); return result != null && result == 1; } } // ✅ 优势:原子性校验和删除 // ❌ 问题:业务执行时间超过锁过期时间 V4:自动续期版(看门狗) @Component public class RedisLockWithWatchdog { @Autowired private RedisTemplate<String, String> redis; private static final long DEFAULT_EXPIRE_SECONDS = 30; private static final long RENEW_INTERVAL_SECONDS = 10; private final Map<String, ScheduledFuture<?>> renewTasks = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public boolean lock(String key, String requestId) { Boolean success = redis.opsForValue() .setIfAbsent(key, requestId, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS); if (Boolean.TRUE.equals(success)) { // 启动续期任务 startRenewTask(key, requestId); return true; } return false; } private void startRenewTask(String key, String requestId) { ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> { try { String value = redis.opsForValue().get(key); if (requestId.equals(value)) { redis.expire(key, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS); log.debug("锁续期成功: {}", key); } else { // 锁已被他人持有,停止续期 stopRenewTask(key); } } catch (Exception e) { log.error("锁续期失败", e); } }, RENEW_INTERVAL_SECONDS, RENEW_INTERVAL_SECONDS, TimeUnit.SECONDS); renewTasks.put(key, task); } public boolean unlock(String key, String requestId) { stopRenewTask(key); String script = "if redis.call('GET', KEYS[1]) == ARGV[1] then " + " return redis.call('DEL', KEYS[1]) " + "else " + " return 0 " + "end"; Long result = redis.execute( RedisScript.of(script, Long.class), Collections.singletonList(key), requestId ); return result != null && result == 1; } private void stopRenewTask(String key) { ScheduledFuture<?> task = renewTasks.remove(key); if (task != null) { task.cancel(false); } } } 实战案例 案例1:库存扣减 @Service public class StockService { @Autowired private RedisLockWithWatchdog redisLock; @Autowired private RedisTemplate<String, Object> redis; public boolean deductStock(String productId, int quantity) { String lockKey = "lock:stock:" + productId; String requestId = UUID.randomUUID().toString(); try { // 尝试获取锁(超时3秒) if (!tryLock(lockKey, requestId, 3000)) { return false; // 获取锁失败 } // 检查库存 Integer stock = (Integer) redis.opsForValue().get("stock:" + productId); if (stock == null || stock < quantity) { return false; // 库存不足 } // 扣减库存 redis.opsForValue().decrement("stock:" + productId, quantity); return true; } finally { // 释放锁 redisLock.unlock(lockKey, requestId); } } private boolean tryLock(String key, String requestId, long timeoutMs) { long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < timeoutMs) { if (redisLock.lock(key, requestId)) { return true; } try { Thread.sleep(50); // 等待50ms后重试 } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } } 案例2:订单防重 @Service public class OrderService { @Autowired private RedisLockWithWatchdog redisLock; public String createOrder(OrderRequest request) { String lockKey = "lock:order:" + request.getUserId() + ":" + request.getOrderNo(); String requestId = UUID.randomUUID().toString(); try { if (!redisLock.lock(lockKey, requestId)) { throw new BusinessException("订单正在处理中,请勿重复提交"); } // 创建订单逻辑 Order order = new Order(); order.setOrderNo(request.getOrderNo()); // ... 保存订单 return order.getOrderNo(); } finally { redisLock.unlock(lockKey, requestId); } } } 案例3:定时任务防重 @Component public class ScheduledTaskWithLock { @Autowired private RedisLockWithWatchdog redisLock; @Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行 public void syncData() { String lockKey = "lock:task:sync_data"; String requestId = UUID.randomUUID().toString(); if (!redisLock.lock(lockKey, requestId)) { log.info("其他节点正在执行同步任务,跳过"); return; } try { log.info("开始执行同步任务"); // 执行同步逻辑 doSync(); } finally { redisLock.unlock(lockKey, requestId); } } private void doSync() { // 同步逻辑 } } Redlock算法(多Redis实例) 问题:单Redis实例有单点故障风险 ...

2025-01-21 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...