引言
订单履约(Order Fulfillment)是将用户的购买意图转化为实物交付的关键环节。从用户点击"确认下单"的那一刻起,订单就进入了履约流程:库存预占、下发仓库、拣货打包、物流配送,直到用户签收。
这个过程看似简单,实则涉及多个系统的协同:OMS、WMS、TMS、物流商系统。任何一个环节出现问题,都可能导致发货延迟、错发漏发、物流异常等问题,直接影响用户体验。
本文将从第一性原理出发,系统性地探讨订单履约的全流程管理和物流追踪系统的设计与实现。
订单履约全流程
履约流程概览
订单履约流程(7个阶段):
1. 订单创建 → 2. 支付确认 → 3. 下发WMS → 4. 仓库处理 →
5. 发货出库 → 6. 物流配送 → 7. 用户签收
每个阶段的时间要求:
- 支付确认:实时(秒级)
- 下发WMS:5分钟内
- 仓库处理:2-24小时(视订单量)
- 物流配送:1-3天(视距离)
- 用户签收:配送当天
履约流程设计
class OrderFulfillmentService:
"""订单履约服务"""
def __init__(self, wms_client, tms_client, logistics_client):
self.wms_client = wms_client
self.tms_client = tms_client
self.logistics_client = logistics_client
def fulfill_order(self, order):
"""
订单履约主流程
流程:
1. 校验订单状态
2. 下发WMS(仓库管理系统)
3. 等待WMS拣货打包
4. 创建物流单
5. 推送物流信息
6. 监控配送状态
"""
try:
# 1. 校验订单
self._validate_order_for_fulfillment(order)
# 2. 下发WMS
wms_result = self._dispatch_to_wms(order)
# 3. 更新订单状态
self._update_order_status(
order,
status='DISPATCHED_TO_WMS',
wms_order_id=wms_result.wms_order_id
)
# 4. 异步等待WMS完成
self._register_wms_callback(order)
return FulfillmentResult(
success=True,
wms_order_id=wms_result.wms_order_id
)
except Exception as e:
self._handle_fulfillment_error(order, e)
raise
def _validate_order_for_fulfillment(self, order):
"""校验订单可履约性"""
# 检查订单状态
if order.status not in ['PAID', 'CONFIRMED']:
raise InvalidOrderStatusException(
f"订单状态不正确: {order.status}"
)
# 检查库存预占
if not self._check_inventory_reserved(order):
raise InventoryNotReservedException(
f"订单库存未预占: {order.order_id}"
)
# 检查收货地址
if not self._validate_delivery_address(order.delivery_address):
raise InvalidDeliveryAddressException(
"收货地址不完整或不可达"
)
def _dispatch_to_wms(self, order):
"""
下发WMS
WMS单据结构:
{
"wms_order_id": "WO123456",
"oms_order_id": "O123456",
"warehouse_id": "WH001",
"items": [
{
"sku_id": "SKU001",
"quantity": 2,
"location": "A-01-02"
}
],
"delivery_info": {...},
"priority": "NORMAL" # URGENT, NORMAL, LOW
}
"""
wms_order = self._build_wms_order(order)
# 调用WMS接口
result = self.wms_client.create_order(wms_order)
# 记录下发日志
self._log_wms_dispatch(
order_id=order.order_id,
wms_order_id=result.wms_order_id,
warehouse_id=order.warehouse_id
)
return result
def _build_wms_order(self, order):
"""构建WMS订单"""
return WMSOrder(
wms_order_id=self._generate_wms_order_id(),
oms_order_id=order.order_id,
warehouse_id=order.warehouse_id,
items=[
WMSOrderItem(
sku_id=item.sku_id,
quantity=item.quantity,
# 查询SKU在仓库中的位置
location=self._get_sku_location(
order.warehouse_id,
item.sku_id
)
)
for item in order.items
],
delivery_info=DeliveryInfo(
recipient=order.delivery_address.recipient,
phone=order.delivery_address.phone,
address=order.delivery_address.full_address,
postcode=order.delivery_address.postcode
),
# 根据订单类型设置优先级
priority=self._determine_priority(order),
# 特殊要求
special_requirements=order.special_requirements,
# 预计发货时间
expected_ship_time=order.expected_ship_time
)
订单下发WMS流程
WMS接口设计
class WMSClient:
"""WMS客户端"""
def __init__(self, base_url, api_key):
self.base_url = base_url
self.api_key = api_key
def create_order(self, wms_order):
"""
创建WMS订单
接口:POST /api/v1/orders
"""
try:
response = self._request(
method='POST',
endpoint='/api/v1/orders',
data=wms_order.to_dict(),
timeout=10
)
if response.status_code == 200:
return WMSOrderResult(
success=True,
wms_order_id=response.json()['wms_order_id'],
estimated_ship_time=response.json()['estimated_ship_time']
)
else:
raise WMSException(
f"WMS创建订单失败: {response.json()}"
)
except requests.RequestException as e:
# 网络异常,进入重试队列
self._enqueue_retry(wms_order)
raise WMSNetworkException(str(e))
def query_order_status(self, wms_order_id):
"""
查询WMS订单状态
接口:GET /api/v1/orders/{wms_order_id}
"""
response = self._request(
method='GET',
endpoint=f'/api/v1/orders/{wms_order_id}'
)
return WMSOrderStatus(
wms_order_id=wms_order_id,
status=response.json()['status'],
picked_at=response.json().get('picked_at'),
packed_at=response.json().get('packed_at'),
shipped_at=response.json().get('shipped_at'),
tracking_number=response.json().get('tracking_number')
)
def cancel_order(self, wms_order_id, reason):
"""
取消WMS订单
接口:POST /api/v1/orders/{wms_order_id}/cancel
"""
response = self._request(
method='POST',
endpoint=f'/api/v1/orders/{wms_order_id}/cancel',
data={'reason': reason}
)
return response.json()['success']
def _request(self, method, endpoint, data=None, timeout=30):
"""统一请求方法"""
url = f"{self.base_url}{endpoint}"
headers = {
'Content-Type': 'application/json',
'X-API-Key': self.api_key
}
response = requests.request(
method=method,
url=url,
headers=headers,
json=data,
timeout=timeout
)
return response
WMS回调处理
class WMSCallbackHandler:
"""WMS回调处理器"""
def handle_callback(self, callback_data):
"""
处理WMS回调
WMS会在以下节点回调OMS:
1. 拣货完成
2. 打包完成
3. 出库发货
4. 发货异常(缺货、商品损坏等)
"""
event_type = callback_data['event_type']
if event_type == 'PICKED':
return self._handle_picked(callback_data)
elif event_type == 'PACKED':
return self._handle_packed(callback_data)
elif event_type == 'SHIPPED':
return self._handle_shipped(callback_data)
elif event_type == 'EXCEPTION':
return self._handle_exception(callback_data)
else:
raise UnknownCallbackEventException(event_type)
def _handle_picked(self, data):
"""处理拣货完成回调"""
order = self._get_order(data['oms_order_id'])
# 更新订单状态
order.status = 'PICKED'
order.picked_at = datetime.now()
# 记录拣货信息
self._log_picking_info(
order_id=order.order_id,
picker=data['picker'],
picked_at=data['picked_at']
)
self.order_repo.update(order)
def _handle_packed(self, data):
"""处理打包完成回调"""
order = self._get_order(data['oms_order_id'])
# 更新订单状态
order.status = 'PACKED'
order.packed_at = datetime.now()
# 记录包裹信息
package_info = PackageInfo(
package_id=data['package_id'],
weight=data['weight'],
dimensions=data['dimensions'],
packer=data['packer']
)
order.package_info = package_info
self.order_repo.update(order)
def _handle_shipped(self, data):
"""处理发货出库回调"""
order = self._get_order(data['oms_order_id'])
# 更新订单状态
order.status = 'SHIPPED'
order.shipped_at = datetime.now()
order.tracking_number = data['tracking_number']
order.logistics_company = data['logistics_company']
# 消费库存预占
self._consume_inventory_reservation(order)
# 创建物流追踪记录
self._create_tracking_record(order)
# 发送发货通知
self._send_shipment_notification(order)
self.order_repo.update(order)
def _handle_exception(self, data):
"""处理异常回调"""
order = self._get_order(data['oms_order_id'])
exception_type = data['exception_type']
if exception_type == 'OUT_OF_STOCK':
# 库存不足
self._handle_out_of_stock(order, data)
elif exception_type == 'DAMAGED_GOODS':
# 商品损坏
self._handle_damaged_goods(order, data)
elif exception_type == 'ADDRESS_INCOMPLETE':
# 地址不完整
self._handle_address_issue(order, data)
else:
# 其他异常
self._handle_unknown_exception(order, data)
def _consume_inventory_reservation(self, order):
"""消费库存预占"""
for item in order.items:
# 更新库存:预占转为已售
self.inventory_service.consume_reservation(
sku_id=item.sku_id,
warehouse_id=order.warehouse_id,
quantity=item.quantity,
order_id=order.order_id
)
物流追踪系统对接
物流公司接口对接
class LogisticsProvider:
"""物流服务商抽象接口"""
def create_waybill(self, order):
"""创建物流运单"""
raise NotImplementedError
def query_tracking(self, tracking_number):
"""查询物流轨迹"""
raise NotImplementedError
def cancel_waybill(self, tracking_number):
"""取消物流单"""
raise NotImplementedError
class SFExpressProvider(LogisticsProvider):
"""顺丰速运对接"""
def create_waybill(self, order):
"""
调用顺丰API创建运单
接口文档:https://qiao.sf-express.com/pages/developDoc/index.html
"""
request_data = {
"orderId": order.order_id,
"cargoDetails": [
{
"name": item.product_name,
"count": item.quantity
}
for item in order.items
],
"consigneeInfo": {
"name": order.delivery_address.recipient,
"mobile": order.delivery_address.phone,
"province": order.delivery_address.province,
"city": order.delivery_address.city,
"county": order.delivery_address.district,
"address": order.delivery_address.detail
}
}
# 调用顺丰API
response = self._call_sf_api(
api='EXP_RECE_CREATE_ORDER',
data=request_data
)
return WaybillResult(
tracking_number=response['waybillNoInfoList'][0]['waybillNo'],
routing_code=response['routeLabelInfo']['routeCode']
)
def query_tracking(self, tracking_number):
"""查询顺丰物流轨迹"""
response = self._call_sf_api(
api='EXP_RECE_SEARCH_ROUTES',
data={'trackingNumber': tracking_number}
)
# 解析物流轨迹
routes = []
for route in response['routeResps']:
routes.append(TrackingNode(
time=route['acceptTime'],
location=route['acceptAddress'],
description=route['remark'],
operator=route['opCode']
))
return TrackingInfo(
tracking_number=tracking_number,
status=response['orderStatus'],
routes=routes
)
class LogisticsAdapter:
"""物流适配器(统一不同物流商的接口)"""
def __init__(self):
self.providers = {
'SF': SFExpressProvider(),
'YTO': YTOExpressProvider(),
'ZTO': ZTOExpressProvider(),
# ... 其他物流商
}
def get_provider(self, logistics_code):
"""获取物流服务商"""
return self.providers.get(logistics_code)
def create_waybill(self, order, logistics_code):
"""统一创建运单接口"""
provider = self.get_provider(logistics_code)
if not provider:
raise UnsupportedLogisticsException(logistics_code)
return provider.create_waybill(order)
def query_tracking(self, tracking_number, logistics_code):
"""统一查询物流接口"""
provider = self.get_provider(logistics_code)
if not provider:
raise UnsupportedLogisticsException(logistics_code)
return provider.query_tracking(tracking_number)
物流轨迹实时更新
class TrackingUpdateService:
"""物流轨迹更新服务"""
def __init__(self, logistics_adapter):
self.logistics_adapter = logistics_adapter
def start_tracking(self, order):
"""
开始追踪订单物流
策略:
1. 订单发货后,立即查询一次物流信息
2. 每小时轮询一次物流API
3. 收到物流商推送时,实时更新
4. 签收后停止追踪
"""
# 立即查询一次
self._update_tracking_info(order)
# 加入定时任务队列
self._schedule_tracking_job(order)
def _update_tracking_info(self, order):
"""更新物流信息"""
try:
# 调用物流API
tracking_info = self.logistics_adapter.query_tracking(
tracking_number=order.tracking_number,
logistics_code=order.logistics_company
)
# 保存物流轨迹
self._save_tracking_routes(order, tracking_info.routes)
# 更新订单状态
self._update_order_delivery_status(order, tracking_info.status)
# 推送用户通知
if self._should_notify_user(tracking_info):
self._send_tracking_notification(order, tracking_info)
except Exception as e:
self.logger.error(
f"更新物流信息失败: {order.order_id}",
exc_info=e
)
def _save_tracking_routes(self, order, routes):
"""保存物流轨迹"""
for route in routes:
# 去重:避免重复保存同一个节点
existing = self.tracking_repo.find_by_time(
order_id=order.order_id,
time=route.time
)
if not existing:
tracking_node = TrackingNode(
order_id=order.order_id,
tracking_number=order.tracking_number,
time=route.time,
location=route.location,
description=route.description,
operator=route.operator
)
self.tracking_repo.save(tracking_node)
def _update_order_delivery_status(self, order, logistics_status):
"""根据物流状态更新订单状态"""
status_mapping = {
'IN_TRANSIT': 'DELIVERING', # 运输中
'OUT_FOR_DELIVERY': 'DELIVERING', # 派送中
'DELIVERED': 'DELIVERED', # 已签收
'FAILED': 'DELIVERY_FAILED' # 派送失败
}
new_status = status_mapping.get(logistics_status)
if new_status and order.status != new_status:
order.status = new_status
if new_status == 'DELIVERED':
order.delivered_at = datetime.now()
self.order_repo.update(order)
def handle_logistics_push(self, push_data):
"""
处理物流商主动推送
物流商会在关键节点推送:
1. 揽件
2. 到达转运中心
3. 派送中
4. 签收
5. 异常(拒收、破损等)
"""
tracking_number = push_data['tracking_number']
order = self.order_repo.find_by_tracking_number(tracking_number)
if not order:
self.logger.warning(
f"未找到物流单对应的订单: {tracking_number}"
)
return
# 保存推送的物流节点
route = TrackingNode(
order_id=order.order_id,
tracking_number=tracking_number,
time=push_data['time'],
location=push_data['location'],
description=push_data['description']
)
self.tracking_repo.save(route)
# 更新订单状态
self._update_order_delivery_status(
order,
push_data['status']
)
# 发送用户通知
self._send_tracking_notification(order, push_data)
订单异常处理
异常场景分类
class FulfillmentExceptionHandler:
"""履约异常处理器"""
def handle_exception(self, order, exception):
"""处理履约异常"""
if isinstance(exception, OutOfStockException):
# 场景1:仓库缺货
return self._handle_out_of_stock(order, exception)
elif isinstance(exception, DelayedShipmentException):
# 场景2:延迟发货
return self._handle_delayed_shipment(order, exception)
elif isinstance(exception, DamagedGoodsException):
# 场景3:商品损坏
return self._handle_damaged_goods(order, exception)
elif isinstance(exception, DeliveryFailedException):
# 场景4:配送失败
return self._handle_delivery_failed(order, exception)
elif isinstance(exception, AddressIssueException):
# 场景5:地址问题
return self._handle_address_issue(order, exception)
def _handle_out_of_stock(self, order, exception):
"""处理缺货异常"""
# 方案1:尝试跨仓调拨
transfer_result = self._try_warehouse_transfer(
order,
exception.missing_items
)
if transfer_result.success:
# 重新下发WMS
return self._retry_fulfillment(order)
# 方案2:部分发货
available_items = [
item for item in order.items
if item not in exception.missing_items
]
if available_items and order.allow_partial_shipment:
return self._create_partial_shipment(order, available_items)
# 方案3:取消订单并退款
return self._cancel_and_refund(
order,
reason="库存不足",
compensation=True # 给予补偿券
)
def _handle_delayed_shipment(self, order, exception):
"""处理延迟发货"""
# 更新预计发货时间
order.expected_ship_time = exception.new_expected_time
# 发送延迟通知
self._send_delay_notification(
order,
delay_reason=exception.reason,
new_time=exception.new_expected_time
)
# 如果延迟超过24小时,提供补偿
delay_hours = (
exception.new_expected_time - order.original_ship_time
).total_seconds() / 3600
if delay_hours > 24:
self._offer_compensation(
order,
compensation_type='COUPON',
amount=10 # 10元优惠券
)
self.order_repo.update(order)
def _handle_delivery_failed(self, order, exception):
"""处理配送失败"""
failure_reason = exception.reason
if failure_reason == 'RECIPIENT_REFUSED':
# 收件人拒收
return self._handle_refusal(order)
elif failure_reason == 'ADDRESS_NOT_FOUND':
# 地址找不到
return self._request_address_correction(order)
elif failure_reason == 'RECIPIENT_NOT_AVAILABLE':
# 收件人不在
return self._schedule_redelivery(order)
elif failure_reason == 'DAMAGED_IN_TRANSIT':
# 运输途中损坏
return self._handle_transit_damage(order)
异常监控与告警
class FulfillmentMonitor:
"""履约监控"""
def __init__(self):
# 定义监控指标
self.metrics = {
'delayed_shipment_rate': 0.0, # 延迟发货率
'out_of_stock_rate': 0.0, # 缺货率
'delivery_failure_rate': 0.0, # 配送失败率
'avg_fulfillment_time': 0.0, # 平均履约时间
'on_time_delivery_rate': 0.0 # 按时送达率
}
# 定义告警阈值
self.alert_thresholds = {
'delayed_shipment_rate': 0.05, # 5%
'out_of_stock_rate': 0.02, # 2%
'delivery_failure_rate': 0.03, # 3%
'avg_fulfillment_time': 48, # 48小时
'on_time_delivery_rate': 0.95 # 95%
}
def collect_metrics(self):
"""收集履约指标"""
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0)
# 今日订单总数
total_orders = self.order_repo.count_by_date(today_start)
# 延迟发货订单数
delayed_orders = self.order_repo.count_delayed_shipment(today_start)
self.metrics['delayed_shipment_rate'] = delayed_orders / total_orders
# 缺货订单数
out_of_stock_orders = self.order_repo.count_out_of_stock(today_start)
self.metrics['out_of_stock_rate'] = out_of_stock_orders / total_orders
# 配送失败订单数
failed_deliveries = self.order_repo.count_delivery_failed(today_start)
self.metrics['delivery_failure_rate'] = failed_deliveries / total_orders
# 平均履约时间
avg_time = self.order_repo.calculate_avg_fulfillment_time(today_start)
self.metrics['avg_fulfillment_time'] = avg_time
# 按时送达率
on_time = self.order_repo.count_on_time_delivery(today_start)
self.metrics['on_time_delivery_rate'] = on_time / total_orders
# 检查告警
self._check_alerts()
def _check_alerts(self):
"""检查告警阈值"""
for metric, value in self.metrics.items():
threshold = self.alert_thresholds.get(metric)
if not threshold:
continue
# 比率类指标:超过阈值告警
if metric.endswith('_rate'):
if value > threshold:
self._send_alert(
metric=metric,
current=value,
threshold=threshold,
level='WARNING'
)
# 时间类指标:超过阈值告警
elif metric == 'avg_fulfillment_time':
if value > threshold:
self._send_alert(
metric=metric,
current=value,
threshold=threshold,
level='WARNING'
)
# 达成率指标:低于阈值告警
elif metric == 'on_time_delivery_rate':
if value < threshold:
self._send_alert(
metric=metric,
current=value,
threshold=threshold,
level='CRITICAL'
)
实时订单追踪系统设计
用户端追踪界面
class OrderTrackingAPI:
"""订单追踪API"""
def get_tracking_info(self, order_id, user_id):
"""
获取订单追踪信息
返回结构:
{
"order_id": "O123456",
"status": "DELIVERING",
"current_location": "北京市朝阳区XX配送站",
"estimated_delivery": "今日18:00前",
"tracking_number": "SF1234567890",
"courier": {
"name": "张师傅",
"phone": "138****1234"
},
"timeline": [
{
"time": "2025-11-22 08:00:00",
"status": "SHIPPED",
"description": "您的订单已从【北京仓】发出"
},
{
"time": "2025-11-22 10:00:00",
"status": "IN_TRANSIT",
"description": "快件已到达【北京转运中心】"
},
{
"time": "2025-11-22 14:00:00",
"status": "OUT_FOR_DELIVERY",
"description": "快件正在派送中,配送员【张师傅】"
}
]
}
"""
# 1. 查询订单
order = self.order_repo.find_by_id(order_id)
# 2. 权限校验
if order.user_id != user_id:
raise UnauthorizedException("无权查看该订单")
# 3. 查询物流轨迹
tracking_routes = self.tracking_repo.find_by_order(order_id)
# 4. 查询配送员信息
courier_info = None
if order.status == 'OUT_FOR_DELIVERY':
courier_info = self._get_courier_info(order.tracking_number)
# 5. 组装返回数据
return {
"order_id": order.order_id,
"status": order.status,
"current_location": self._get_current_location(tracking_routes),
"estimated_delivery": self._estimate_delivery_time(order),
"tracking_number": order.tracking_number,
"courier": courier_info,
"timeline": [
{
"time": route.time.strftime("%Y-%m-%d %H:%M:%S"),
"status": route.status,
"description": route.description
}
for route in tracking_routes
]
}
WebSocket实时推送
class RealtimeTrackingService:
"""实时追踪服务"""
def __init__(self, websocket_server):
self.websocket_server = websocket_server
# 订阅物流更新事件
self.event_bus.subscribe('logistics.updated', self._handle_update)
def _handle_update(self, event):
"""处理物流更新事件"""
order_id = event['order_id']
tracking_info = event['tracking_info']
# 推送给订阅该订单的用户
self.websocket_server.send_to_room(
room=f"order:{order_id}",
event='tracking_update',
data=tracking_info
)
总结
订单履约管理是连接线上订单与线下交付的关键环节。关键要点:
- 流程标准化:定义清晰的履约流程和状态流转
- 系统集成:与WMS、TMS、物流商系统深度对接
- 异常处理:建立完善的异常处理机制和降级方案
- 实时监控:通过监控指标和告警及时发现问题
- 用户体验:提供实时、透明的物流追踪信息
下一篇文章,我们将探讨售后管理:退货退款与换货补发,这是履约闭环的重要组成部分。
关键词:订单履约、WMS对接、物流追踪、实时推送、异常处理、SLA监控