引言
当用户在北京下单购买3件商品时,OMS系统面临一个关键决策:这个订单应该从哪个仓库发货?
- 北京仓:距离最近,1天送达,但库存只有2件
- 上海仓:库存充足,但需要2天送达
- 广州仓:库存充足,运费最低,但需要3天送达
这个看似简单的问题,实际上是一个多目标优化问题。订单路由不仅要考虑距离和时效,还要权衡库存分布、物流成本、仓库负载等多个因素。本文将从第一性原理出发,系统性地探讨订单路由的算法设计与实现。
订单路由的目标
核心优化目标
订单路由要在以下三个核心目标之间寻找最优解:
1. 成本最优
总成本 = 物流成本 + 仓储成本 + 操作成本
物流成本:
- 首重成本(如8元/公斤)
- 续重成本(如3元/公斤)
- 偏远地区附加费
仓储成本:
- 库存积压成本(滞销商品优先出库)
- 仓租成本(高租金仓库优先出库)
操作成本:
- 拣货成本(订单密度高的仓库更高效)
- 打包成本
2. 时效最优
配送时效 = 仓库处理时间 + 物流运输时间
仓库处理时间:
- 拣货时间(受订单量影响)
- 打包时间
- 出库时间
物流运输时间:
- 仓库到配送站距离
- 运输方式(陆运、空运)
- 天气、节假日等因素
3. 库存最优
库存健康度 = f(库存周转率, 库存分布均衡度, 安全库存)
优先级:
1. 滞销商品优先出库
2. 临期商品优先出库
3. 均衡各仓库库存水位
4. 保证核心仓库安全库存
目标权重配置
class RoutingObjective:
"""路由目标配置"""
# 默认权重配置
DEFAULT_WEIGHTS = {
'cost': 0.3, # 成本权重
'time': 0.5, # 时效权重
'inventory': 0.2 # 库存权重
}
# 不同场景的权重策略
SCENARIO_WEIGHTS = {
# 大促场景:优先时效
'PROMOTION': {
'cost': 0.2,
'time': 0.6,
'inventory': 0.2
},
# 清仓场景:优先库存
'CLEARANCE': {
'cost': 0.2,
'time': 0.3,
'inventory': 0.5
},
# 成本优化场景
'COST_SAVING': {
'cost': 0.6,
'time': 0.2,
'inventory': 0.2
}
}
@classmethod
def get_weights(cls, scenario='DEFAULT'):
"""获取场景对应的权重"""
return cls.SCENARIO_WEIGHTS.get(
scenario,
cls.DEFAULT_WEIGHTS
)
路由策略详解
策略1:就近原则
核心思想:选择距离用户最近的仓库,优先保证时效。
class NearestWarehouseStrategy:
"""就近仓库策略"""
def route(self, order, available_warehouses):
"""
按距离排序,选择最近的有货仓库
Args:
order: 订单对象
available_warehouses: 有库存的仓库列表
Returns:
selected_warehouse: 选中的仓库
"""
delivery_address = order.delivery_address
# 1. 计算所有仓库的距离
warehouse_distances = []
for warehouse in available_warehouses:
distance = self._calculate_distance(
warehouse.location,
delivery_address
)
warehouse_distances.append((warehouse, distance))
# 2. 按距离排序
warehouse_distances.sort(key=lambda x: x[1])
# 3. 选择最近的有充足库存的仓库
for warehouse, distance in warehouse_distances:
if self._check_inventory_sufficient(warehouse, order.items):
return RoutingResult(
warehouse=warehouse,
distance=distance,
estimated_delivery_days=self._estimate_delivery_time(distance)
)
raise NoAvailableWarehouseException("没有可用仓库")
def _calculate_distance(self, warehouse_location, delivery_address):
"""
计算距离(简化版)
实际生产中可以使用:
1. 高德/百度地图API
2. 预计算的距离矩阵
3. 物流网络图的最短路径
"""
# 简化计算:使用经纬度计算直线距离
lat1, lon1 = warehouse_location
lat2, lon2 = delivery_address.lat, delivery_address.lon
# Haversine公式
R = 6371 # 地球半径(公里)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2)**2 +
math.cos(math.radians(lat1)) *
math.cos(math.radians(lat2)) *
math.sin(dlon/2)**2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
return distance
def _estimate_delivery_time(self, distance):
"""
估算配送时间
规则:
- 0-200km: 1天
- 200-800km: 2天
- 800-2000km: 3天
- 2000km以上: 4-5天
"""
if distance < 200:
return 1
elif distance < 800:
return 2
elif distance < 2000:
return 3
else:
return 5
策略2:负载均衡
核心思想:将订单均匀分配到各个仓库,避免单仓库过载。
class LoadBalancingStrategy:
"""负载均衡策略"""
def route(self, order, available_warehouses):
"""
按仓库负载排序,选择负载最低的仓库
负载计算:
负载 = (当前订单数 / 最大处理能力) × 0.6 +
(当前库存占用率) × 0.4
"""
warehouse_loads = []
for warehouse in available_warehouses:
# 1. 计算订单处理负载
current_orders = self._get_pending_orders_count(warehouse)
max_capacity = warehouse.max_daily_orders
order_load = current_orders / max_capacity
# 2. 计算库存负载
current_inventory = self._get_current_inventory(warehouse)
max_inventory = warehouse.max_inventory
inventory_load = current_inventory / max_inventory
# 3. 综合负载
total_load = order_load * 0.6 + inventory_load * 0.4
warehouse_loads.append((warehouse, total_load))
# 按负载排序(负载低的优先)
warehouse_loads.sort(key=lambda x: x[1])
# 选择负载最低且有库存的仓库
for warehouse, load in warehouse_loads:
if self._check_inventory_sufficient(warehouse, order.items):
return RoutingResult(
warehouse=warehouse,
load=load,
reason='LOAD_BALANCING'
)
raise NoAvailableWarehouseException("所有仓库负载过高")
策略3:库存优先
核心思想:优先消化滞销库存、临期库存。
class InventoryPriorityStrategy:
"""库存优先策略"""
def route(self, order, available_warehouses):
"""
按库存优先级排序
优先级规则:
1. 滞销商品(周转率低)
2. 临期商品(距离过期时间短)
3. 高库存仓库(库存水位高)
"""
warehouse_scores = []
for warehouse in available_warehouses:
score = self._calculate_inventory_priority_score(
warehouse,
order.items
)
warehouse_scores.append((warehouse, score))
# 按优先级排序(分数高的优先)
warehouse_scores.sort(key=lambda x: x[1], reverse=True)
for warehouse, score in warehouse_scores:
if self._check_inventory_sufficient(warehouse, order.items):
return RoutingResult(
warehouse=warehouse,
priority_score=score,
reason='INVENTORY_PRIORITY'
)
raise NoAvailableWarehouseException("没有库存优先仓库")
def _calculate_inventory_priority_score(self, warehouse, items):
"""
计算库存优先级分数
分数 = 滞销分数 × 0.4 + 临期分数 × 0.3 + 库存水位分数 × 0.3
"""
slow_moving_score = 0
expiring_score = 0
stock_level_score = 0
for item in items:
# 1. 滞销分数(周转率越低分数越高)
turnover_rate = self._get_turnover_rate(
warehouse,
item.sku_id
)
if turnover_rate < 0.5: # 周转率低于50%
slow_moving_score += (1 - turnover_rate) * 100
# 2. 临期分数(距离过期时间越短分数越高)
expiry_date = self._get_expiry_date(
warehouse,
item.sku_id
)
if expiry_date:
days_to_expire = (expiry_date - datetime.now()).days
if days_to_expire < 90: # 3个月内过期
expiring_score += (90 - days_to_expire) / 90 * 100
# 3. 库存水位分数
stock_level = self._get_stock_level(
warehouse,
item.sku_id
)
if stock_level > 0.8: # 库存水位超过80%
stock_level_score += stock_level * 100
# 综合分数
total_score = (
slow_moving_score * 0.4 +
expiring_score * 0.3 +
stock_level_score * 0.3
)
return total_score
多仓库路由算法设计
单仓发货 vs 多仓发货
class MultiWarehouseRouter:
"""多仓库路由器"""
def route(self, order, available_warehouses):
"""
路由决策流程:
1. 尝试单仓发货(用户体验最好)
2. 如果单仓无法满足,尝试多仓发货
3. 评估多仓方案的成本和时效
4. 选择最优方案
"""
# 1. 尝试单仓发货
single_warehouse_plan = self._try_single_warehouse(
order,
available_warehouses
)
if single_warehouse_plan:
return single_warehouse_plan
# 2. 生成多仓发货方案
multi_warehouse_plans = self._generate_multi_warehouse_plans(
order,
available_warehouses
)
# 3. 评估并选择最优方案
best_plan = self._select_best_plan(multi_warehouse_plans)
return best_plan
def _try_single_warehouse(self, order, warehouses):
"""尝试单仓发货"""
for warehouse in warehouses:
# 检查库存充足性
if not self._check_all_items_available(warehouse, order.items):
continue
# 计算单仓方案的得分
score = self._calculate_plan_score(
plan=SingleWarehousePlan(warehouse, order),
order=order
)
return RoutingPlan(
warehouses=[warehouse],
items_allocation={warehouse: order.items},
total_score=score,
plan_type='SINGLE_WAREHOUSE'
)
return None
def _generate_multi_warehouse_plans(self, order, warehouses):
"""
生成多仓发货方案
使用动态规划求解最优商品分配
"""
plans = []
# 生成所有可行的2仓组合
for wh1, wh2 in itertools.combinations(warehouses, 2):
allocation = self._allocate_items_to_warehouses(
order.items,
[wh1, wh2]
)
if allocation:
plan = RoutingPlan(
warehouses=[wh1, wh2],
items_allocation=allocation,
plan_type='TWO_WAREHOUSE'
)
plans.append(plan)
# 如果2仓不够,尝试3仓
if not plans:
for wh1, wh2, wh3 in itertools.combinations(warehouses, 3):
allocation = self._allocate_items_to_warehouses(
order.items,
[wh1, wh2, wh3]
)
if allocation:
plan = RoutingPlan(
warehouses=[wh1, wh2, wh3],
items_allocation=allocation,
plan_type='THREE_WAREHOUSE'
)
plans.append(plan)
return plans
def _allocate_items_to_warehouses(self, items, warehouses):
"""
将商品分配到仓库
目标:最小化物流成本
"""
allocation = defaultdict(list)
for item in items:
# 找到距离最近且有库存的仓库
best_warehouse = None
min_cost = float('inf')
for warehouse in warehouses:
stock = self._get_stock(warehouse, item.sku_id)
if stock >= item.quantity:
cost = self._calculate_shipping_cost(
warehouse,
item
)
if cost < min_cost:
min_cost = cost
best_warehouse = warehouse
if best_warehouse:
allocation[best_warehouse].append(item)
else:
return None # 无法满足所有商品
return allocation
def _select_best_plan(self, plans):
"""选择最优方案"""
if not plans:
raise NoAvailableWarehouseException("无可用路由方案")
# 计算每个方案的综合得分
scored_plans = []
for plan in plans:
score = self._calculate_plan_score(plan)
scored_plans.append((plan, score))
# 选择得分最高的方案
best_plan = max(scored_plans, key=lambda x: x[1])[0]
return best_plan
def _calculate_plan_score(self, plan):
"""
计算方案得分
得分 = 成本得分 × 0.3 + 时效得分 × 0.5 + 体验得分 × 0.2
"""
# 成本得分(成本越低得分越高)
total_cost = self._calculate_total_cost(plan)
cost_score = 1.0 / (1 + total_cost / 100)
# 时效得分(时效越短得分越高)
max_delivery_days = self._calculate_max_delivery_days(plan)
time_score = 1.0 / max_delivery_days
# 体验得分(包裹越少得分越高)
num_packages = len(plan.warehouses)
if num_packages == 1:
experience_score = 1.0
elif num_packages == 2:
experience_score = 0.6
else:
experience_score = 0.3
# 综合得分
total_score = (
cost_score * 0.3 +
time_score * 0.5 +
experience_score * 0.2
)
return total_score
路由规则引擎实现
规则定义
class RoutingRule:
"""路由规则"""
def __init__(self, name, priority, condition, action):
self.name = name
self.priority = priority # 优先级(数字越大越优先)
self.condition = condition # 条件函数
self.action = action # 动作函数
def match(self, context):
"""判断规则是否匹配"""
return self.condition(context)
def execute(self, context):
"""执行规则动作"""
return self.action(context)
class RoutingRuleEngine:
"""路由规则引擎"""
def __init__(self):
self.rules = []
def add_rule(self, rule):
"""添加规则"""
self.rules.append(rule)
# 按优先级排序
self.rules.sort(key=lambda r: r.priority, reverse=True)
def execute(self, order):
"""
执行规则引擎
流程:
1. 按优先级遍历规则
2. 找到第一个匹配的规则
3. 执行规则动作
"""
context = RoutingContext(order)
for rule in self.rules:
if rule.match(context):
result = rule.execute(context)
if result:
return result
# 没有匹配的规则,使用默认策略
return self._default_routing(order)
预定义规则示例
# 规则1:VIP用户优先就近仓库
vip_rule = RoutingRule(
name="VIP用户就近发货",
priority=100,
condition=lambda ctx: ctx.order.user.is_vip,
action=lambda ctx: NearestWarehouseStrategy().route(
ctx.order,
ctx.available_warehouses
)
)
# 规则2:大促期间优先负载均衡
promotion_rule = RoutingRule(
name="大促负载均衡",
priority=90,
condition=lambda ctx: ctx.is_promotion_period(),
action=lambda ctx: LoadBalancingStrategy().route(
ctx.order,
ctx.available_warehouses
)
)
# 规则3:清仓商品优先库存策略
clearance_rule = RoutingRule(
name="清仓商品优先",
priority=80,
condition=lambda ctx: any(
item.is_clearance for item in ctx.order.items
),
action=lambda ctx: InventoryPriorityStrategy().route(
ctx.order,
ctx.available_warehouses
)
)
# 规则4:偏远地区优先省会仓库
remote_area_rule = RoutingRule(
name="偏远地区路由",
priority=70,
condition=lambda ctx: ctx.order.delivery_address.is_remote,
action=lambda ctx: ProvinceCapitalWarehouseStrategy().route(
ctx.order,
ctx.available_warehouses
)
)
# 规则5:生鲜商品优先最近仓库
fresh_rule = RoutingRule(
name="生鲜商品就近",
priority=95,
condition=lambda ctx: any(
item.category == 'FRESH' for item in ctx.order.items
),
action=lambda ctx: NearestWarehouseStrategy().route(
ctx.order,
ctx.available_warehouses
)
)
# 注册规则
engine = RoutingRuleEngine()
engine.add_rule(vip_rule)
engine.add_rule(promotion_rule)
engine.add_rule(clearance_rule)
engine.add_rule(remote_area_rule)
engine.add_rule(fresh_rule)
动态规则配置
class DynamicRuleManager:
"""动态规则管理器"""
def __init__(self, config_service):
self.config_service = config_service
self.engine = RoutingRuleEngine()
def load_rules_from_config(self):
"""从配置中心加载规则"""
rules_config = self.config_service.get('routing_rules')
for rule_config in rules_config:
rule = self._create_rule_from_config(rule_config)
self.engine.add_rule(rule)
def _create_rule_from_config(self, config):
"""
从配置创建规则
配置示例:
{
"name": "大件商品路由",
"priority": 85,
"condition": {
"type": "item_weight",
"operator": ">",
"value": 20
},
"action": {
"strategy": "nearest_warehouse_with_installation"
}
}
"""
condition_func = self._build_condition(config['condition'])
action_func = self._build_action(config['action'])
return RoutingRule(
name=config['name'],
priority=config['priority'],
condition=condition_func,
action=action_func
)
def _build_condition(self, condition_config):
"""构建条件函数"""
condition_type = condition_config['type']
operator = condition_config['operator']
value = condition_config['value']
def condition(ctx):
if condition_type == 'user_level':
return getattr(ctx.order.user.level, operator)(value)
elif condition_type == 'order_amount':
return getattr(ctx.order.total_amount, operator)(value)
elif condition_type == 'item_weight':
total_weight = sum(item.weight for item in ctx.order.items)
return operator_map[operator](total_weight, value)
# ... 更多条件类型
return condition
def _build_action(self, action_config):
"""构建动作函数"""
strategy_name = action_config['strategy']
strategy_class = STRATEGY_MAP[strategy_name]
def action(ctx):
strategy = strategy_class()
return strategy.route(ctx.order, ctx.available_warehouses)
return action
路由失败降级策略
降级场景
class RoutingFallbackStrategy:
"""路由降级策略"""
def handle_routing_failure(self, order, failure_reason):
"""处理路由失败"""
if isinstance(failure_reason, NoInventoryException):
# 场景1:所有仓库都无库存
return self._handle_no_inventory(order)
elif isinstance(failure_reason, AllWarehousesOverloadException):
# 场景2:所有仓库负载过高
return self._handle_overload(order)
elif isinstance(failure_reason, DeliveryAreaNotCoveredException):
# 场景3:收货地址不在配送范围
return self._handle_uncovered_area(order)
else:
# 其他异常
return self._handle_unknown_failure(order)
def _handle_no_inventory(self, order):
"""处理库存不足"""
# 降级方案1:拆分订单,部分发货
partial_result = self._try_partial_fulfillment(order)
if partial_result.is_acceptable():
return partial_result
# 降级方案2:预售转化
presale_result = self._try_convert_to_presale(order)
if presale_result.is_acceptable():
return presale_result
# 降级方案3:取消订单,退款
return self._cancel_order_with_refund(order)
def _handle_overload(self, order):
"""处理仓库过载"""
# 降级方案1:延迟发货
return self._schedule_delayed_shipment(order)
# 降级方案2:外部仓库协作
return self._route_to_partner_warehouse(order)
def _handle_uncovered_area(self, order):
"""处理不可达地址"""
# 降级方案:寻找最近的可达配送点
nearest_pickup_point = self._find_nearest_pickup_point(
order.delivery_address
)
return RoutingResult(
delivery_type='PICKUP',
pickup_point=nearest_pickup_point,
reason='地址不在配送范围,请到自提点取货'
)
实战案例:京东的订单路由
京东路由策略分析
class JDRoutingStrategy:
"""京东式路由策略(简化版)"""
def route(self, order):
"""
京东路由核心逻辑:
1. 优先211限时达(上午11点前下单当日达)
2. 次优次日达
3. 保底隔日达
路由决策因素:
- 用户等级(Plus会员优先)
- 商品类型(生鲜、大家电特殊处理)
- 地理位置(一线城市优先)
- 库存分布(就近原则)
- 仓库负载(负载均衡)
"""
# 1. 判断是否可以211
if self._can_211_delivery(order):
warehouse = self._find_211_warehouse(order)
return RoutingResult(
warehouse=warehouse,
delivery_type='211_SAME_DAY',
estimated_delivery='今日送达'
)
# 2. 判断是否可以次日达
if self._can_next_day_delivery(order):
warehouse = self._find_next_day_warehouse(order)
return RoutingResult(
warehouse=warehouse,
delivery_type='NEXT_DAY',
estimated_delivery='次日送达'
)
# 3. 普通配送
warehouse = self._find_standard_warehouse(order)
return RoutingResult(
warehouse=warehouse,
delivery_type='STANDARD',
estimated_delivery='2-3日送达'
)
def _can_211_delivery(self, order):
"""
判断是否可以211
条件:
1. 当前时间 < 11:00
2. 城市在211覆盖范围
3. 30公里内有仓库且有货
4. 仓库未达到当日配送上限
"""
now = datetime.now()
# 条件1:时间限制
if now.hour >= 11:
return False
# 条件2:城市覆盖
if order.delivery_address.city not in self.JD_211_CITIES:
return False
# 条件3:30公里内有仓库
nearby_warehouses = self._find_warehouses_within(
order.delivery_address,
radius_km=30
)
for warehouse in nearby_warehouses:
# 检查库存和配送能力
if (self._check_inventory(warehouse, order.items) and
self._check_delivery_capacity(warehouse)):
return True
return False
总结
订单路由是OMS系统的智能大脑,决定了履约效率和用户体验。关键要点:
- 多目标优化:在成本、时效、库存之间寻找平衡
- 策略组合:根据场景选择就近、负载均衡、库存优先等策略
- 规则引擎:通过规则引擎实现灵活的路由决策
- 降级方案:提供完善的降级策略,保证系统可用性
- 动态调整:根据实时数据动态调整路由策略
下一篇文章,我们将探讨订单履约管理与物流追踪,这是路由决策的执行环节。
关键词:订单路由、智能调度、多目标优化、规则引擎、负载均衡、就近原则、库存优先