引言 想象这样一个场景:用户在购物车中看到商品显示"有货",提交订单时却提示"库存不足"。或者更糟糕的情况:订单已支付,仓库却发现没有库存可发货。这些问题的根源都指向一个核心能力:库存预占。
库存预占是OMS系统的生命线。它要解决的核心问题是:在下单到发货的整个周期内,如何保证已承诺给用户的商品库存不被其他订单抢走?本文将从第一性原理出发,系统性地探讨库存预占与释放机制的设计与实现。
库存预占的本质 什么是库存预占? 库存预占(Inventory Reservation)是指在订单创建时,为订单中的商品临时锁定库存,防止被其他订单占用。
库存状态转换: 可用库存 → 预占库存 → 已出库 ↓ 释放回可用(订单取消) 为什么需要库存预占? 场景1:下单到支付的时间窗口
用户操作流程: 1. 加入购物车(不预占) 2. 提交订单(预占库存)← 关键点 3. 支付(15分钟内) 4. 发货 问题:如果不预占,步骤2到步骤3之间,库存可能被其他用户购买 场景2:支付到发货的时间窗口
订单处理流程: 1. 用户支付成功 2. 订单进入待发货队列 3. 仓库拣货打包(可能需要几小时) 4. 发货 问题:如果不预占,步骤1到步骤4之间,库存可能被超卖 场景3:高并发秒杀
秒杀场景: - 库存:100件 - 请求:10000个并发请求 - 目标:保证只有前100个请求成功,且不超卖 核心挑战:如何在毫秒级时间内准确预占库存? 库存预占的核心目标 防止超卖:已售数量 ≤ 实际库存 保证时效:预占操作要快(毫秒级) 支持释放:订单取消/超时后自动释放 高并发:支持万级QPS的库存预占 数据一致性:分布式环境下保证强一致性 库存预占流程设计 标准预占流程 class InventoryReservationService: """库存预占服务""" def reserve(self, order): """ 预占库存的标准流程 1. 校验库存可用性 2. 锁定库存 3. 扣减可用库存 4. 增加预占库存 5. 记录预占日志 """ try: # 1. 校验库存 self._validate_inventory(order) # 2. 获取分布式锁 lock = self._acquire_lock(order.items) # 3. 执行预占 reservations = [] for item in order.items: reservation = self._reserve_single_item( sku_id=item.sku_id, warehouse_id=item.warehouse_id, quantity=item.quantity, order_id=order.order_id ) reservations.append(reservation) # 4. 释放锁 self._release_lock(lock) return ReservationResult( success=True, reservations=reservations ) except InsufficientStockException as e: return ReservationResult( success=False, error=str(e) ) def _reserve_single_item(self, sku_id, warehouse_id, quantity, order_id): """预占单个SKU的库存""" # 原子操作:扣减可用库存,增加预占库存 with self.db.transaction(): # 1. 查询当前库存(加行锁) inventory = self.db.query( """ SELECT available, reserved FROM inventory WHERE sku_id = %s AND warehouse_id = %s FOR UPDATE """, (sku_id, warehouse_id) ) # 2. 检查库存充足性 if inventory.available < quantity: raise InsufficientStockException( f"SKU {sku_id} 库存不足" ) # 3. 更新库存 self.db.execute( """ UPDATE inventory SET available = available - %s, reserved = reserved + %s WHERE sku_id = %s AND warehouse_id = %s """, (quantity, quantity, sku_id, warehouse_id) ) # 4. 记录预占记录 reservation = InventoryReservation( reservation_id=generate_id(), order_id=order_id, sku_id=sku_id, warehouse_id=warehouse_id, quantity=quantity, status='RESERVED', expire_at=datetime.now() + timedelta(minutes=30) ) self.db.insert('inventory_reservations', reservation) return reservation 数据库设计 -- 库存表 CREATE TABLE inventory ( id BIGINT PRIMARY KEY AUTO_INCREMENT, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, available INT NOT NULL DEFAULT 0, -- 可用库存 reserved INT NOT NULL DEFAULT 0, -- 预占库存 sold INT NOT NULL DEFAULT 0, -- 已售库存 total INT NOT NULL DEFAULT 0, -- 总库存 version INT NOT NULL DEFAULT 0, -- 乐观锁版本号 created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id), INDEX idx_sku (sku_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 库存预占记录表 CREATE TABLE inventory_reservations ( id BIGINT PRIMARY KEY AUTO_INCREMENT, reservation_id VARCHAR(64) NOT NULL UNIQUE, order_id VARCHAR(64) NOT NULL, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, quantity INT NOT NULL, status VARCHAR(32) NOT NULL, -- RESERVED, CONSUMED, RELEASED expire_at DATETIME NOT NULL, -- 预占过期时间 created_at DATETIME NOT NULL, released_at DATETIME, INDEX idx_order (order_id), INDEX idx_expire (expire_at, status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 库存变更日志表 CREATE TABLE inventory_change_logs ( id BIGINT PRIMARY KEY AUTO_INCREMENT, log_id VARCHAR(64) NOT NULL UNIQUE, sku_id VARCHAR(64) NOT NULL, warehouse_id VARCHAR(64) NOT NULL, change_type VARCHAR(32) NOT NULL, -- RESERVE, RELEASE, CONSUME quantity INT NOT NULL, before_available INT NOT NULL, after_available INT NOT NULL, order_id VARCHAR(64), created_at DATETIME NOT NULL, INDEX idx_sku (sku_id), INDEX idx_order (order_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 库存预占失败处理 失败场景分类 class ReservationFailureHandler: """预占失败处理器""" def handle_failure(self, order, failure_reason): """处理预占失败""" if isinstance(failure_reason, InsufficientStockException): return self._handle_insufficient_stock(order) elif isinstance(failure_reason, DistributedLockException): return self._handle_lock_timeout(order) elif isinstance(failure_reason, DatabaseException): return self._handle_db_error(order) else: return self._handle_unknown_error(order) def _handle_insufficient_stock(self, order): """处理库存不足""" # 1. 尝试部分预占 partial_result = self._try_partial_reservation(order) if partial_result.has_available_items(): return self._create_partial_order( order, partial_result.available_items ) # 2. 尝试跨仓调拨 transfer_result = self._try_warehouse_transfer(order) if transfer_result.success: return self._retry_reservation(order) # 3. 推荐替代商品 alternatives = self._find_alternatives(order) return ReservationResult( success=False, error="库存不足", alternatives=alternatives ) def _handle_lock_timeout(self, order): """处理锁超时""" # 重试策略:指数退避 for retry in range(3): time.sleep(0.1 * (2 ** retry)) # 100ms, 200ms, 400ms try: return self.reservation_service.reserve(order) except DistributedLockException: continue # 降级策略:异步处理 return self._async_reservation(order) 部分预占策略 class PartialReservationStrategy: """部分预占策略""" def try_partial_reservation(self, order): """尝试部分预占""" available_items = [] unavailable_items = [] for item in order.items: try: # 尝试预占单个商品 reservation = self.reservation_service.reserve_single_item( item ) available_items.append(item) except InsufficientStockException: unavailable_items.append(item) return PartialReservationResult( available_items=available_items, unavailable_items=unavailable_items, fulfillment_rate=len(available_items) / len(order.items) ) def should_accept_partial(self, result, order): """判断是否接受部分预占""" # 策略1:订单金额超过阈值,接受部分预占 if order.total_amount > 500 and result.fulfillment_rate > 0.8: return True # 策略2:重要商品都有货,接受部分预占 critical_items = [i for i in order.items if i.is_critical] if all(i in result.available_items for i in critical_items): return True # 策略3:用户设置允许部分发货 if order.allow_partial_shipment: return True return False 库存释放触发条件 自动释放场景 class InventoryReleaseService: """库存释放服务""" def __init__(self): # 定义释放规则 self.release_rules = [ # 规则1:订单取消 ReleaseRule( name="order_cancelled", condition=lambda order: order.status == 'CANCELLED', action=self._release_all ), # 规则2:支付超时 ReleaseRule( name="payment_timeout", condition=lambda order: ( order.status == 'PENDING_PAYMENT' and datetime.now() > order.payment_deadline ), action=self._release_all ), # 规则3:部分退货 ReleaseRule( name="partial_refund", condition=lambda order: order.has_partial_refund(), action=self._release_partial ), # 规则4:预占过期 ReleaseRule( name="reservation_expired", condition=lambda reservation: ( datetime.now() > reservation.expire_at ), action=self._release_expired ) ] def release_for_order(self, order): """为订单释放库存""" # 1. 查询订单的所有预占记录 reservations = self.reservation_repo.find_by_order(order.order_id) # 2. 执行释放 released_count = 0 for reservation in reservations: if reservation.status != 'RESERVED': continue try: self._release_reservation(reservation) released_count += 1 except Exception as e: self.logger.error( f"释放预占失败: {reservation.reservation_id}", exc_info=e ) return ReleaseResult( success=True, released_count=released_count ) def _release_reservation(self, reservation): """释放单个预占""" with self.db.transaction(): # 1. 更新库存(原子操作) affected_rows = self.db.execute( """ UPDATE inventory SET available = available + %s, reserved = reserved - %s WHERE sku_id = %s AND warehouse_id = %s AND reserved >= %s """, ( reservation.quantity, reservation.quantity, reservation.sku_id, reservation.warehouse_id, reservation.quantity ) ) if affected_rows == 0: raise InventoryMismatchException( f"预占库存不足: {reservation.reservation_id}" ) # 2. 更新预占记录状态 self.db.execute( """ UPDATE inventory_reservations SET status = 'RELEASED', released_at = NOW() WHERE reservation_id = %s """, (reservation.reservation_id,) ) # 3. 记录变更日志 self._log_inventory_change( sku_id=reservation.sku_id, warehouse_id=reservation.warehouse_id, change_type='RELEASE', quantity=reservation.quantity, order_id=reservation.order_id ) 定时释放任务 class ScheduledReleaseJob: """定时释放任务""" def run(self): """ 每分钟执行一次,释放过期预占 批量处理策略: 1. 查询过期预占(分批查询) 2. 批量释放库存 3. 批量更新预占状态 """ batch_size = 1000 offset = 0 while True: # 1. 查询过期预占 expired_reservations = self.reservation_repo.find_expired( limit=batch_size, offset=offset ) if not expired_reservations: break # 2. 按仓库和SKU分组 grouped = defaultdict(list) for res in expired_reservations: key = (res.sku_id, res.warehouse_id) grouped[key].append(res) # 3. 批量释放 for (sku_id, warehouse_id), reservations in grouped.items(): total_quantity = sum(r.quantity for r in reservations) # 批量更新库存 self.db.execute( """ UPDATE inventory SET available = available + %s, reserved = reserved - %s WHERE sku_id = %s AND warehouse_id = %s """, (total_quantity, total_quantity, sku_id, warehouse_id) ) # 批量更新预占状态 reservation_ids = [r.reservation_id for r in reservations] self.db.execute( """ UPDATE inventory_reservations SET status = 'RELEASED', released_at = NOW() WHERE reservation_id IN %s """, (tuple(reservation_ids),) ) offset += batch_size # 限流保护 time.sleep(0.1) 库存预占的并发控制 数据库层面的并发控制 方案1:悲观锁(SELECT FOR UPDATE)
...