引言
想象这样一个场景:用户在购物车中看到商品显示"有货",提交订单时却提示"库存不足"。或者更糟糕的情况:订单已支付,仓库却发现没有库存可发货。这些问题的根源都指向一个核心能力:库存预占。
库存预占是OMS系统的生命线。它要解决的核心问题是:在下单到发货的整个周期内,如何保证已承诺给用户的商品库存不被其他订单抢走?本文将从第一性原理出发,系统性地探讨库存预占与释放机制的设计与实现。
库存预占的本质
什么是库存预占?
库存预占(Inventory Reservation)是指在订单创建时,为订单中的商品临时锁定库存,防止被其他订单占用。
库存状态转换:
可用库存 → 预占库存 → 已出库
↓
释放回可用(订单取消)
为什么需要库存预占?
场景1:下单到支付的时间窗口
用户操作流程:
1. 加入购物车(不预占)
2. 提交订单(预占库存)← 关键点
3. 支付(15分钟内)
4. 发货
问题:如果不预占,步骤2到步骤3之间,库存可能被其他用户购买
场景2:支付到发货的时间窗口
订单处理流程:
1. 用户支付成功
2. 订单进入待发货队列
3. 仓库拣货打包(可能需要几小时)
4. 发货
问题:如果不预占,步骤1到步骤4之间,库存可能被超卖
场景3:高并发秒杀
秒杀场景:
- 库存:100件
- 请求:10000个并发请求
- 目标:保证只有前100个请求成功,且不超卖
核心挑战:如何在毫秒级时间内准确预占库存?
库存预占的核心目标
- 防止超卖:已售数量 ≤ 实际库存
- 保证时效:预占操作要快(毫秒级)
- 支持释放:订单取消/超时后自动释放
- 高并发:支持万级QPS的库存预占
- 数据一致性:分布式环境下保证强一致性
库存预占流程设计
标准预占流程
class InventoryReservationService:
"""库存预占服务"""
def reserve(self, order):
"""
预占库存的标准流程
1. 校验库存可用性
2. 锁定库存
3. 扣减可用库存
4. 增加预占库存
5. 记录预占日志
"""
try:
# 1. 校验库存
self._validate_inventory(order)
# 2. 获取分布式锁
lock = self._acquire_lock(order.items)
# 3. 执行预占
reservations = []
for item in order.items:
reservation = self._reserve_single_item(
sku_id=item.sku_id,
warehouse_id=item.warehouse_id,
quantity=item.quantity,
order_id=order.order_id
)
reservations.append(reservation)
# 4. 释放锁
self._release_lock(lock)
return ReservationResult(
success=True,
reservations=reservations
)
except InsufficientStockException as e:
return ReservationResult(
success=False,
error=str(e)
)
def _reserve_single_item(self, sku_id, warehouse_id,
quantity, order_id):
"""预占单个SKU的库存"""
# 原子操作:扣减可用库存,增加预占库存
with self.db.transaction():
# 1. 查询当前库存(加行锁)
inventory = self.db.query(
"""
SELECT available, reserved
FROM inventory
WHERE sku_id = %s AND warehouse_id = %s
FOR UPDATE
""",
(sku_id, warehouse_id)
)
# 2. 检查库存充足性
if inventory.available < quantity:
raise InsufficientStockException(
f"SKU {sku_id} 库存不足"
)
# 3. 更新库存
self.db.execute(
"""
UPDATE inventory
SET available = available - %s,
reserved = reserved + %s
WHERE sku_id = %s AND warehouse_id = %s
""",
(quantity, quantity, sku_id, warehouse_id)
)
# 4. 记录预占记录
reservation = InventoryReservation(
reservation_id=generate_id(),
order_id=order_id,
sku_id=sku_id,
warehouse_id=warehouse_id,
quantity=quantity,
status='RESERVED',
expire_at=datetime.now() + timedelta(minutes=30)
)
self.db.insert('inventory_reservations', reservation)
return reservation
数据库设计
-- 库存表
CREATE TABLE inventory (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
sku_id VARCHAR(64) NOT NULL,
warehouse_id VARCHAR(64) NOT NULL,
available INT NOT NULL DEFAULT 0, -- 可用库存
reserved INT NOT NULL DEFAULT 0, -- 预占库存
sold INT NOT NULL DEFAULT 0, -- 已售库存
total INT NOT NULL DEFAULT 0, -- 总库存
version INT NOT NULL DEFAULT 0, -- 乐观锁版本号
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id),
INDEX idx_sku (sku_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 库存预占记录表
CREATE TABLE inventory_reservations (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
reservation_id VARCHAR(64) NOT NULL UNIQUE,
order_id VARCHAR(64) NOT NULL,
sku_id VARCHAR(64) NOT NULL,
warehouse_id VARCHAR(64) NOT NULL,
quantity INT NOT NULL,
status VARCHAR(32) NOT NULL, -- RESERVED, CONSUMED, RELEASED
expire_at DATETIME NOT NULL, -- 预占过期时间
created_at DATETIME NOT NULL,
released_at DATETIME,
INDEX idx_order (order_id),
INDEX idx_expire (expire_at, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 库存变更日志表
CREATE TABLE inventory_change_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
log_id VARCHAR(64) NOT NULL UNIQUE,
sku_id VARCHAR(64) NOT NULL,
warehouse_id VARCHAR(64) NOT NULL,
change_type VARCHAR(32) NOT NULL, -- RESERVE, RELEASE, CONSUME
quantity INT NOT NULL,
before_available INT NOT NULL,
after_available INT NOT NULL,
order_id VARCHAR(64),
created_at DATETIME NOT NULL,
INDEX idx_sku (sku_id),
INDEX idx_order (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
库存预占失败处理
失败场景分类
class ReservationFailureHandler:
"""预占失败处理器"""
def handle_failure(self, order, failure_reason):
"""处理预占失败"""
if isinstance(failure_reason, InsufficientStockException):
return self._handle_insufficient_stock(order)
elif isinstance(failure_reason, DistributedLockException):
return self._handle_lock_timeout(order)
elif isinstance(failure_reason, DatabaseException):
return self._handle_db_error(order)
else:
return self._handle_unknown_error(order)
def _handle_insufficient_stock(self, order):
"""处理库存不足"""
# 1. 尝试部分预占
partial_result = self._try_partial_reservation(order)
if partial_result.has_available_items():
return self._create_partial_order(
order,
partial_result.available_items
)
# 2. 尝试跨仓调拨
transfer_result = self._try_warehouse_transfer(order)
if transfer_result.success:
return self._retry_reservation(order)
# 3. 推荐替代商品
alternatives = self._find_alternatives(order)
return ReservationResult(
success=False,
error="库存不足",
alternatives=alternatives
)
def _handle_lock_timeout(self, order):
"""处理锁超时"""
# 重试策略:指数退避
for retry in range(3):
time.sleep(0.1 * (2 ** retry)) # 100ms, 200ms, 400ms
try:
return self.reservation_service.reserve(order)
except DistributedLockException:
continue
# 降级策略:异步处理
return self._async_reservation(order)
部分预占策略
class PartialReservationStrategy:
"""部分预占策略"""
def try_partial_reservation(self, order):
"""尝试部分预占"""
available_items = []
unavailable_items = []
for item in order.items:
try:
# 尝试预占单个商品
reservation = self.reservation_service.reserve_single_item(
item
)
available_items.append(item)
except InsufficientStockException:
unavailable_items.append(item)
return PartialReservationResult(
available_items=available_items,
unavailable_items=unavailable_items,
fulfillment_rate=len(available_items) / len(order.items)
)
def should_accept_partial(self, result, order):
"""判断是否接受部分预占"""
# 策略1:订单金额超过阈值,接受部分预占
if order.total_amount > 500 and result.fulfillment_rate > 0.8:
return True
# 策略2:重要商品都有货,接受部分预占
critical_items = [i for i in order.items if i.is_critical]
if all(i in result.available_items for i in critical_items):
return True
# 策略3:用户设置允许部分发货
if order.allow_partial_shipment:
return True
return False
库存释放触发条件
自动释放场景
class InventoryReleaseService:
"""库存释放服务"""
def __init__(self):
# 定义释放规则
self.release_rules = [
# 规则1:订单取消
ReleaseRule(
name="order_cancelled",
condition=lambda order: order.status == 'CANCELLED',
action=self._release_all
),
# 规则2:支付超时
ReleaseRule(
name="payment_timeout",
condition=lambda order: (
order.status == 'PENDING_PAYMENT' and
datetime.now() > order.payment_deadline
),
action=self._release_all
),
# 规则3:部分退货
ReleaseRule(
name="partial_refund",
condition=lambda order: order.has_partial_refund(),
action=self._release_partial
),
# 规则4:预占过期
ReleaseRule(
name="reservation_expired",
condition=lambda reservation: (
datetime.now() > reservation.expire_at
),
action=self._release_expired
)
]
def release_for_order(self, order):
"""为订单释放库存"""
# 1. 查询订单的所有预占记录
reservations = self.reservation_repo.find_by_order(order.order_id)
# 2. 执行释放
released_count = 0
for reservation in reservations:
if reservation.status != 'RESERVED':
continue
try:
self._release_reservation(reservation)
released_count += 1
except Exception as e:
self.logger.error(
f"释放预占失败: {reservation.reservation_id}",
exc_info=e
)
return ReleaseResult(
success=True,
released_count=released_count
)
def _release_reservation(self, reservation):
"""释放单个预占"""
with self.db.transaction():
# 1. 更新库存(原子操作)
affected_rows = self.db.execute(
"""
UPDATE inventory
SET available = available + %s,
reserved = reserved - %s
WHERE sku_id = %s
AND warehouse_id = %s
AND reserved >= %s
""",
(
reservation.quantity,
reservation.quantity,
reservation.sku_id,
reservation.warehouse_id,
reservation.quantity
)
)
if affected_rows == 0:
raise InventoryMismatchException(
f"预占库存不足: {reservation.reservation_id}"
)
# 2. 更新预占记录状态
self.db.execute(
"""
UPDATE inventory_reservations
SET status = 'RELEASED',
released_at = NOW()
WHERE reservation_id = %s
""",
(reservation.reservation_id,)
)
# 3. 记录变更日志
self._log_inventory_change(
sku_id=reservation.sku_id,
warehouse_id=reservation.warehouse_id,
change_type='RELEASE',
quantity=reservation.quantity,
order_id=reservation.order_id
)
定时释放任务
class ScheduledReleaseJob:
"""定时释放任务"""
def run(self):
"""
每分钟执行一次,释放过期预占
批量处理策略:
1. 查询过期预占(分批查询)
2. 批量释放库存
3. 批量更新预占状态
"""
batch_size = 1000
offset = 0
while True:
# 1. 查询过期预占
expired_reservations = self.reservation_repo.find_expired(
limit=batch_size,
offset=offset
)
if not expired_reservations:
break
# 2. 按仓库和SKU分组
grouped = defaultdict(list)
for res in expired_reservations:
key = (res.sku_id, res.warehouse_id)
grouped[key].append(res)
# 3. 批量释放
for (sku_id, warehouse_id), reservations in grouped.items():
total_quantity = sum(r.quantity for r in reservations)
# 批量更新库存
self.db.execute(
"""
UPDATE inventory
SET available = available + %s,
reserved = reserved - %s
WHERE sku_id = %s AND warehouse_id = %s
""",
(total_quantity, total_quantity, sku_id, warehouse_id)
)
# 批量更新预占状态
reservation_ids = [r.reservation_id for r in reservations]
self.db.execute(
"""
UPDATE inventory_reservations
SET status = 'RELEASED',
released_at = NOW()
WHERE reservation_id IN %s
""",
(tuple(reservation_ids),)
)
offset += batch_size
# 限流保护
time.sleep(0.1)
库存预占的并发控制
数据库层面的并发控制
方案1:悲观锁(SELECT FOR UPDATE)
def reserve_with_pessimistic_lock(sku_id, quantity):
"""使用悲观锁预占库存"""
with db.transaction():
# 加行锁,阻塞其他事务
inventory = db.query(
"""
SELECT available, reserved
FROM inventory
WHERE sku_id = %s
FOR UPDATE
""",
(sku_id,)
)
if inventory.available < quantity:
raise InsufficientStockException()
# 更新库存
db.execute(
"""
UPDATE inventory
SET available = available - %s,
reserved = reserved + %s
WHERE sku_id = %s
""",
(quantity, quantity, sku_id)
)
优点:实现简单,数据一致性强 缺点:高并发下性能差,容易死锁
方案2:乐观锁(Version版本号)
def reserve_with_optimistic_lock(sku_id, quantity, max_retries=3):
"""使用乐观锁预占库存"""
for retry in range(max_retries):
# 1. 查询当前库存和版本号
inventory = db.query(
"SELECT available, version FROM inventory WHERE sku_id = %s",
(sku_id,)
)
if inventory.available < quantity:
raise InsufficientStockException()
# 2. 更新库存(CAS操作)
affected_rows = db.execute(
"""
UPDATE inventory
SET available = available - %s,
reserved = reserved + %s,
version = version + 1
WHERE sku_id = %s
AND version = %s
AND available >= %s
""",
(quantity, quantity, sku_id, inventory.version, quantity)
)
# 3. 检查是否更新成功
if affected_rows > 0:
return True
# 4. 版本冲突,重试
time.sleep(0.01 * (retry + 1))
raise ConcurrentUpdateException("乐观锁重试次数超限")
优点:高并发下性能好 缺点:冲突率高时重试次数多
分布式锁方案
基于Redis的分布式锁
class RedisDistributedLock:
"""Redis分布式锁"""
def __init__(self, redis_client):
self.redis = redis_client
def acquire(self, sku_id, timeout=5):
"""获取锁"""
lock_key = f"inventory:lock:{sku_id}"
lock_value = str(uuid.uuid4())
end_time = time.time() + timeout
while time.time() < end_time:
# 使用SET NX EX命令原子性获取锁
if self.redis.set(
lock_key,
lock_value,
ex=30, # 锁过期时间30秒
nx=True # 只在key不存在时设置
):
return Lock(lock_key, lock_value, self.redis)
# 自旋等待
time.sleep(0.01)
raise DistributedLockException(f"获取锁超时: {sku_id}")
def release(self, lock):
"""释放锁(Lua脚本保证原子性)"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, lock.key, lock.value)
class Lock:
"""锁对象"""
def __init__(self, key, value, redis):
self.key = key
self.value = value
self.redis = redis
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
RedisDistributedLock(self.redis).release(self)
# 使用示例
def reserve_with_redis_lock(sku_id, quantity):
"""使用Redis分布式锁预占库存"""
lock_manager = RedisDistributedLock(redis_client)
with lock_manager.acquire(sku_id):
# 在锁保护下执行预占操作
inventory = get_inventory(sku_id)
if inventory.available < quantity:
raise InsufficientStockException()
update_inventory(sku_id, -quantity)
Redis原子操作方案(推荐)
class RedisInventoryService:
"""基于Redis的库存服务"""
def __init__(self, redis_client):
self.redis = redis_client
def reserve(self, sku_id, quantity):
"""
使用Lua脚本原子性预占库存
优势:
1. 原子性:Lua脚本在Redis中原子执行
2. 高性能:单次网络请求
3. 无锁:不需要分布式锁
"""
lua_script = """
local sku_key = KEYS[1]
local quantity = tonumber(ARGV[1])
-- 获取当前可用库存
local available = tonumber(redis.call('HGET', sku_key, 'available') or 0)
-- 检查库存充足性
if available < quantity then
return -1 -- 库存不足
end
-- 扣减可用库存,增加预占库存
redis.call('HINCRBY', sku_key, 'available', -quantity)
redis.call('HINCRBY', sku_key, 'reserved', quantity)
return available - quantity -- 返回剩余库存
"""
result = self.redis.eval(
lua_script,
1, # KEYS数量
f"inventory:{sku_id}", # KEYS[1]
quantity # ARGV[1]
)
if result < 0:
raise InsufficientStockException(
f"SKU {sku_id} 库存不足"
)
return result
def release(self, sku_id, quantity):
"""释放库存"""
lua_script = """
local sku_key = KEYS[1]
local quantity = tonumber(ARGV[1])
-- 增加可用库存,扣减预占库存
redis.call('HINCRBY', sku_key, 'available', quantity)
redis.call('HINCRBY', sku_key, 'reserved', -quantity)
return 1
"""
self.redis.eval(
lua_script,
1,
f"inventory:{sku_id}",
quantity
)
def sync_to_database(self):
"""定期同步Redis库存到数据库"""
# 扫描所有库存key
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor,
match="inventory:*",
count=100
)
for key in keys:
sku_id = key.split(':')[1]
inventory_data = self.redis.hgetall(key)
# 更新数据库
self.db.execute(
"""
UPDATE inventory
SET available = %s,
reserved = %s
WHERE sku_id = %s
""",
(
inventory_data['available'],
inventory_data['reserved'],
sku_id
)
)
if cursor == 0:
break
实战:秒杀场景的库存预占
秒杀场景特点
- 高并发:瞬时万级QPS
- 库存少:通常几十到几百件
- 时间短:1-2秒决定胜负
- 零容忍:绝对不能超卖
秒杀库存预占方案
class SeckillInventoryService:
"""秒杀库存服务"""
def __init__(self, redis_client):
self.redis = redis_client
def prepare_seckill(self, sku_id, total_stock):
"""
秒杀预热:提前将库存加载到Redis
数据结构:
- String: inventory:seckill:{sku_id} → 剩余库存(原子递减)
- Set: inventory:seckill:{sku_id}:users → 已抢购用户(防重复)
"""
# 1. 设置初始库存
self.redis.set(
f"inventory:seckill:{sku_id}",
total_stock
)
# 2. 设置过期时间(秒杀结束后1小时)
self.redis.expire(
f"inventory:seckill:{sku_id}",
3600
)
def reserve_seckill(self, sku_id, user_id):
"""
秒杀库存预占
使用Lua脚本保证原子性:
1. 检查用户是否已抢购
2. 检查库存是否充足
3. 扣减库存
4. 记录用户
"""
lua_script = """
local stock_key = KEYS[1]
local users_key = KEYS[2]
local user_id = ARGV[1]
-- 1. 检查用户是否已抢购
if redis.call('SISMEMBER', users_key, user_id) == 1 then
return -2 -- 重复抢购
end
-- 2. 检查库存
local stock = tonumber(redis.call('GET', stock_key) or 0)
if stock <= 0 then
return -1 -- 库存不足
end
-- 3. 扣减库存
redis.call('DECR', stock_key)
-- 4. 记录用户
redis.call('SADD', users_key, user_id)
return stock - 1 -- 返回剩余库存
"""
result = self.redis.eval(
lua_script,
2,
f"inventory:seckill:{sku_id}",
f"inventory:seckill:{sku_id}:users",
user_id
)
if result == -1:
raise SoldOutException("商品已抢光")
elif result == -2:
raise DuplicatePurchaseException("您已抢购过该商品")
return result
def fallback_to_queue(self, sku_id, user_id):
"""
降级方案:库存不足时进入等待队列
场景:用户取消订单后,自动通知队列中的用户
"""
queue_key = f"inventory:seckill:{sku_id}:queue"
# 加入队列(带分数的有序集合,分数=时间戳)
self.redis.zadd(
queue_key,
{user_id: time.time()}
)
# 查询队列位置
rank = self.redis.zrank(queue_key, user_id)
return QueueResult(
position=rank + 1,
estimated_wait_time=(rank + 1) * 60 # 假设每分钟处理1个
)
总结
库存预占是OMS系统的核心能力,直接决定了系统的可靠性和用户体验。关键要点:
- 原子性保证:使用数据库事务、Redis Lua脚本保证操作原子性
- 并发控制:根据场景选择悲观锁、乐观锁或分布式锁
- 自动释放:通过定时任务、事件触发实现库存自动释放
- 降级策略:库存不足时提供部分预占、排队等降级方案
- 监控告警:实时监控预占成功率、释放及时性等指标
下一篇文章,我们将探讨订单智能路由与仓库选择,这是库存预占的前置决策环节。
关键词:库存预占、分布式锁、Redis、并发控制、Lua脚本、秒杀系统、库存一致性