WMS与OMS集成——打通订单履约的关键链路

引言:WMS与OMS的关系 OMS是大脑,WMS是手脚: OMS负责订单决策:接单、拆单、分仓 WMS负责实物执行:拣货、打包、发货 集成的核心目标: 指令准确传递:OMS的出库指令准确下发到WMS 库存实时同步:WMS的实物库存实时同步到OMS 状态及时回传:WMS的执行状态及时回传OMS 一、集成架构 1.1 系统边界 ┌─────────────────────────────────────────────────────────────┐ │ OMS │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │订单管理 │ │库存预占 │ │履约路由 │ │状态跟踪 │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ └───────┼───────────┼───────────┼───────────┼─────────────────┘ │ │ │ │ │ ┌──────┴───────────┴───────────┴──────┐ │ │ 消息队列/API │ │ └──────┬───────────┬───────────┬──────┘ │ │ │ │ ┌───────┼───────────┼───────────┼───────────┼─────────────────┐ │ ▼ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │出库管理 │ │库存管理 │ │拣货管理 │ │发货管理 │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ WMS │ └─────────────────────────────────────────────────────────────┘ 1.2 数据流向 数据 方向 说明 出库指令 OMS → WMS 订单履约指令 可售库存 WMS → OMS 库存同步 出库状态 WMS → OMS 拣货、发货状态 库存变动 WMS → OMS 入库、盘点等 二、出库指令下发 2.1 出库单数据结构 /** * OMS下发给WMS的出库指令 */ public class OutboundCommand { private String commandId; // 指令ID private String orderNo; // OMS订单号 private String warehouseId; // 仓库ID // 收货信息 private String consignee; // 收货人 private String phone; // 电话 private String province; // 省 private String city; // 市 private String district; // 区 private String address; // 详细地址 // 物流信息 private String carrierCode; // 承运商编码 private String serviceType; // 服务类型 // 时效要求 private Integer priority; // 优先级 private LocalDateTime deadline; // 截止时间 // 商品明细 private List<OutboundItem> items; // 扩展信息 private Map<String, String> extInfo; } public class OutboundItem { private String skuId; // SKU编码 private String skuName; // SKU名称 private Integer quantity; // 数量 private String barcode; // 条码 } 2.2 指令下发服务 @Service public class OutboundCommandService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * OMS调用:下发出库指令 */ public void sendOutboundCommand(OutboundCommand command) { // 1. 参数校验 validateCommand(command); // 2. 幂等检查 if (commandRepository.existsByCommandId(command.getCommandId())) { log.warn("重复指令: {}", command.getCommandId()); return; } // 3. 保存指令记录 OutboundCommandRecord record = new OutboundCommandRecord(); record.setCommandId(command.getCommandId()); record.setOrderNo(command.getOrderNo()); record.setWarehouseId(command.getWarehouseId()); record.setStatus(CommandStatus.SENT); record.setContent(JSON.toJSONString(command)); record.setCreatedAt(LocalDateTime.now()); commandRepository.save(record); // 4. 发送消息到WMS String topic = "WMS_OUTBOUND_COMMAND_" + command.getWarehouseId(); rocketMQTemplate.syncSend(topic, command); log.info("出库指令已下发: {}", command.getCommandId()); } } 2.3 WMS接收处理 @Service @RocketMQMessageListener( topic = "WMS_OUTBOUND_COMMAND_${warehouse.id}", consumerGroup = "wms-outbound-consumer" ) public class OutboundCommandConsumer implements RocketMQListener<OutboundCommand> { @Override public void onMessage(OutboundCommand command) { try { // 1. 幂等检查 if (outboundOrderRepository.existsByOmsOrderNo(command.getOrderNo())) { log.warn("订单已存在: {}", command.getOrderNo()); return; } // 2. 创建WMS出库单 OutboundOrder order = createOutboundOrder(command); // 3. 校验库存 boolean stockAvailable = checkStock(order); if (!stockAvailable) { // 库存不足,回传异常 sendStockShortageCallback(command, order); return; } // 4. 锁定库存 lockStock(order); // 5. 回传接收成功 sendReceiveCallback(command.getCommandId(), "SUCCESS"); } catch (Exception e) { log.error("处理出库指令失败: {}", command.getCommandId(), e); sendReceiveCallback(command.getCommandId(), "FAIL", e.getMessage()); } } private OutboundOrder createOutboundOrder(OutboundCommand command) { OutboundOrder order = new OutboundOrder(); order.setOutboundNo(generateOutboundNo()); order.setOmsOrderNo(command.getOrderNo()); order.setWarehouseId(command.getWarehouseId()); order.setConsignee(command.getConsignee()); order.setPhone(command.getPhone()); order.setAddress(buildFullAddress(command)); order.setCarrierCode(command.getCarrierCode()); order.setPriority(command.getPriority()); order.setDeadline(command.getDeadline()); order.setStatus(OutboundStatus.CREATED); outboundOrderRepository.save(order); // 保存明细 for (OutboundItem item : command.getItems()) { OutboundOrderItem orderItem = new OutboundOrderItem(); orderItem.setOutboundNo(order.getOutboundNo()); orderItem.setSkuId(item.getSkuId()); orderItem.setQuantity(item.getQuantity()); outboundItemRepository.save(orderItem); } return order; } } 三、库存同步 3.1 库存同步策略 策略 说明 适用场景 实时同步 每次变动立即同步 库存紧张、高并发 定时同步 定时批量同步 库存充足 增量同步 只同步变化部分 数据量大 全量同步 定期全量覆盖 数据校准 3.2 实时库存同步 @Service public class InventorySyncService { /** * 库存变动时触发同步 */ @EventListener public void onInventoryChange(InventoryChangeEvent event) { // 计算可售库存 int availableQty = calculateAvailableQty( event.getWarehouseId(), event.getSkuId() ); // 发送同步消息 InventorySyncMessage message = new InventorySyncMessage(); message.setWarehouseId(event.getWarehouseId()); message.setSkuId(event.getSkuId()); message.setAvailableQty(availableQty); message.setChangeType(event.getChangeType()); message.setChangeQty(event.getChangeQty()); message.setTimestamp(LocalDateTime.now()); rocketMQTemplate.asyncSend("OMS_INVENTORY_SYNC", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("库存同步成功: {} {}", event.getSkuId(), availableQty); } @Override public void onException(Throwable e) { log.error("库存同步失败: {}", event.getSkuId(), e); // 加入重试队列 retrySyncQueue.add(message); } }); } /** * 计算可售库存 * 可售 = 实物库存 - 锁定库存 - 安全库存 */ private int calculateAvailableQty(String warehouseId, String skuId) { // 查询实物库存 int physicalQty = inventoryRepository.sumQuantity(warehouseId, skuId); // 查询锁定库存 int lockedQty = inventoryRepository.sumLockedQty(warehouseId, skuId); // 查询安全库存 int safetyStock = skuService.getSafetyStock(skuId); return Math.max(0, physicalQty - lockedQty - safetyStock); } } 3.3 OMS接收库存同步 @Service @RocketMQMessageListener( topic = "OMS_INVENTORY_SYNC", consumerGroup = "oms-inventory-consumer" ) public class InventorySyncConsumer implements RocketMQListener<InventorySyncMessage> { @Override public void onMessage(InventorySyncMessage message) { // 1. 更新OMS库存 OmsInventory inventory = omsInventoryRepository .findByWarehouseAndSku(message.getWarehouseId(), message.getSkuId()); if (inventory == null) { inventory = new OmsInventory(); inventory.setWarehouseId(message.getWarehouseId()); inventory.setSkuId(message.getSkuId()); } inventory.setAvailableQty(message.getAvailableQty()); inventory.setLastSyncTime(message.getTimestamp()); omsInventoryRepository.save(inventory); // 2. 更新渠道库存(如果需要) channelInventoryService.syncToChannels(message.getSkuId()); // 3. 检查库存预警 checkInventoryAlert(message); } } 3.4 定时全量同步 @Service public class FullInventorySyncService { /** * 每天凌晨全量同步库存 */ @Scheduled(cron = "0 0 2 * * ?") public void fullSync() { log.info("开始全量库存同步"); List<String> warehouses = warehouseService.getAllActiveWarehouses(); for (String warehouseId : warehouses) { // 获取WMS所有库存 List<InventorySummary> wmsInventories = wmsClient.getAllInventory(warehouseId); // 批量同步到OMS List<InventorySyncMessage> messages = wmsInventories.stream() .map(inv -> { InventorySyncMessage msg = new InventorySyncMessage(); msg.setWarehouseId(warehouseId); msg.setSkuId(inv.getSkuId()); msg.setAvailableQty(inv.getAvailableQty()); msg.setChangeType("FULL_SYNC"); return msg; }) .collect(Collectors.toList()); // 批量发送 rocketMQTemplate.syncSend("OMS_INVENTORY_SYNC_BATCH", messages); } log.info("全量库存同步完成"); } } 四、状态回传 4.1 状态流转 OMS订单状态: 待发货 ──> 拣货中 ──> 已发货 ──> 已签收 │ │ │ │ WMS状态回传: │ │ │ │ ▼ ▼ ▼ ▼ 接收成功 开始拣货 发货完成 物流回传 4.2 状态回传消息 /** * WMS回传给OMS的状态消息 */ public class OutboundStatusCallback { private String callbackId; // 回传ID private String omsOrderNo; // OMS订单号 private String wmsOutboundNo; // WMS出库单号 private String status; // 状态 private LocalDateTime statusTime; // 状态时间 // 发货信息(发货状态时填写) private String trackingNo; // 物流单号 private String carrierCode; // 承运商 private Integer packageCount; // 包裹数 private BigDecimal weight; // 重量 // 异常信息 private String errorCode; // 错误码 private String errorMessage; // 错误信息 } 4.3 状态回传服务 @Service public class StatusCallbackService { /** * 拣货开始回传 */ public void callbackPickStart(OutboundOrder order) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("PICKING"); callback.setStatusTime(LocalDateTime.now()); sendCallback(callback); } /** * 发货完成回传 */ public void callbackShipped(OutboundOrder order, ShipmentInfo shipment) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("SHIPPED"); callback.setStatusTime(LocalDateTime.now()); callback.setTrackingNo(shipment.getTrackingNo()); callback.setCarrierCode(shipment.getCarrierCode()); callback.setPackageCount(shipment.getPackageCount()); callback.setWeight(shipment.getWeight()); sendCallback(callback); } /** * 异常回传 */ public void callbackException(OutboundOrder order, String errorCode, String errorMessage) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("EXCEPTION"); callback.setStatusTime(LocalDateTime.now()); callback.setErrorCode(errorCode); callback.setErrorMessage(errorMessage); sendCallback(callback); } private void sendCallback(OutboundStatusCallback callback) { // 保存回传记录 CallbackRecord record = new CallbackRecord(); record.setCallbackId(callback.getCallbackId()); record.setContent(JSON.toJSONString(callback)); record.setStatus(CallbackStatus.PENDING); callbackRepository.save(record); // 发送消息 rocketMQTemplate.asyncSend("OMS_STATUS_CALLBACK", callback, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { record.setStatus(CallbackStatus.SUCCESS); callbackRepository.save(record); } @Override public void onException(Throwable e) { record.setStatus(CallbackStatus.FAIL); record.setErrorMessage(e.getMessage()); callbackRepository.save(record); // 加入重试队列 retryQueue.add(callback); } }); } } 4.4 OMS接收状态回传 @Service @RocketMQMessageListener( topic = "OMS_STATUS_CALLBACK", consumerGroup = "oms-callback-consumer" ) public class StatusCallbackConsumer implements RocketMQListener<OutboundStatusCallback> { @Override public void onMessage(OutboundStatusCallback callback) { // 1. 幂等检查 if (callbackRepository.existsByCallbackId(callback.getCallbackId())) { return; } // 2. 查找订单 Order order = orderRepository.findByOrderNo(callback.getOmsOrderNo()); if (order == null) { log.error("订单不存在: {}", callback.getOmsOrderNo()); return; } // 3. 更新订单状态 switch (callback.getStatus()) { case "PICKING": order.setStatus(OrderStatus.PICKING); order.setPickStartTime(callback.getStatusTime()); break; case "SHIPPED": order.setStatus(OrderStatus.SHIPPED); order.setShipTime(callback.getStatusTime()); order.setTrackingNo(callback.getTrackingNo()); order.setCarrierCode(callback.getCarrierCode()); // 释放库存预占 inventoryService.releaseReservation(order.getOrderNo()); break; case "EXCEPTION": order.setStatus(OrderStatus.EXCEPTION); order.setExceptionCode(callback.getErrorCode()); order.setExceptionMessage(callback.getErrorMessage()); // 触发异常处理流程 exceptionHandler.handle(order, callback); break; } orderRepository.save(order); // 4. 通知下游(如通知客户) notifyService.notifyOrderStatusChange(order); } } 五、异常处理 5.1 常见异常场景 异常 原因 处理方式 库存不足 WMS实际库存不够 重新分仓或取消 地址异常 地址无法配送 人工处理 SKU不存在 WMS没有该商品 检查主数据 超时未处理 WMS未及时处理 催促或转仓 5.2 异常处理服务 @Service public class IntegrationExceptionHandler { /** * 处理库存不足异常 */ public void handleStockShortage(Order order, OutboundStatusCallback callback) { // 1. 尝试重新分仓 String newWarehouse = fulfillmentRouter.findAlternativeWarehouse(order); if (newWarehouse != null) { // 取消原出库指令 wmsClient.cancelOutbound(order.getOrderNo(), order.getWarehouseId()); // 下发新出库指令 order.setWarehouseId(newWarehouse); outboundCommandService.sendOutboundCommand(buildCommand(order)); log.info("订单{}重新分仓到{}", order.getOrderNo(), newWarehouse); } else { // 无可用仓库,标记异常 order.setStatus(OrderStatus.STOCK_SHORTAGE); orderRepository.save(order); // 通知客服 alertService.notifyCustomerService(order, "库存不足,需人工处理"); } } /** * 处理超时未处理 */ @Scheduled(fixedRate = 300000) // 每5分钟检查 public void checkTimeoutOrders() { // 查找超时订单 LocalDateTime timeout = LocalDateTime.now().minusHours(4); List<Order> timeoutOrders = orderRepository .findByStatusAndCreatedBefore(OrderStatus.PENDING_SHIP, timeout); for (Order order : timeoutOrders) { // 查询WMS状态 OutboundOrder wmsOrder = wmsClient.getOutboundOrder(order.getOrderNo()); if (wmsOrder == null || wmsOrder.getStatus() == OutboundStatus.CREATED) { // WMS未处理,发送催促 alertService.notifyWarehouse(order.getWarehouseId(), "订单" + order.getOrderNo() + "超时未处理"); } } } } 六、接口设计 6.1 REST API接口 @RestController @RequestMapping("/api/wms") public class WmsIntegrationController { /** * OMS调用:下发出库指令 */ @PostMapping("/outbound/command") public Result<Void> sendOutboundCommand(@RequestBody OutboundCommand command) { outboundCommandService.sendOutboundCommand(command); return Result.success(); } /** * OMS调用:取消出库指令 */ @PostMapping("/outbound/cancel") public Result<Void> cancelOutbound(@RequestBody CancelRequest request) { wmsService.cancelOutbound(request.getOrderNo(), request.getReason()); return Result.success(); } /** * OMS调用:查询库存 */ @GetMapping("/inventory") public Result<InventoryVO> queryInventory( @RequestParam String warehouseId, @RequestParam String skuId) { InventoryVO inventory = inventoryService.queryInventory(warehouseId, skuId); return Result.success(inventory); } /** * WMS调用:状态回传 */ @PostMapping("/callback/status") public Result<Void> statusCallback(@RequestBody OutboundStatusCallback callback) { callbackService.handleCallback(callback); return Result.success(); } /** * WMS调用:库存同步 */ @PostMapping("/inventory/sync") public Result<Void> syncInventory(@RequestBody InventorySyncRequest request) { inventorySyncService.sync(request); return Result.success(); } } 6.2 接口幂等设计 @Aspect @Component public class IdempotentAspect { @Around("@annotation(idempotent)") public Object checkIdempotent(ProceedingJoinPoint point, Idempotent idempotent) throws Throwable { // 获取幂等键 String key = getIdempotentKey(point, idempotent); // 检查是否已处理 String result = redisTemplate.opsForValue().get(key); if (result != null) { log.info("重复请求,返回缓存结果: {}", key); return JSON.parseObject(result, point.getSignature().getDeclaringType()); } // 执行业务 Object response = point.proceed(); // 缓存结果 redisTemplate.opsForValue().set(key, JSON.toJSONString(response), idempotent.expireSeconds(), TimeUnit.SECONDS); return response; } } 七、监控与告警 7.1 集成监控指标 指标 说明 告警阈值 指令下发成功率 成功/总数 < 99% 库存同步延迟 同步耗时 > 5秒 状态回传延迟 回传耗时 > 10秒 消息积压数 队列积压 > 1000 7.2 监控实现 @Service public class IntegrationMonitorService { @Scheduled(fixedRate = 60000) public void monitor() { // 检查消息积压 long pendingCount = messageRepository.countPending(); if (pendingCount > 1000) { alertService.sendAlert("消息积压告警", "待处理消息数: " + pendingCount); } // 检查同步延迟 LocalDateTime lastSync = inventorySyncRepository.getLastSyncTime(); if (lastSync.isBefore(LocalDateTime.now().minusMinutes(5))) { alertService.sendAlert("库存同步延迟", "最后同步时间: " + lastSync); } // 检查失败率 long totalCount = callbackRepository.countByHour(); long failCount = callbackRepository.countFailByHour(); double failRate = failCount * 100.0 / totalCount; if (failRate > 1) { alertService.sendAlert("回传失败率告警", String.format("失败率: %.2f%%", failRate)); } } } 八、总结 8.1 集成核心要点 指令下发:幂等、可靠、可追溯 库存同步:实时、准确、有兜底 状态回传:及时、完整、可重试 异常处理:自动重试、人工兜底 8.2 最佳实践 实践 说明 消息队列解耦 异步处理,削峰填谷 幂等设计 防止重复处理 重试机制 失败自动重试 监控告警 及时发现问题 系列文章导航 ...

2026-02-02 · maneng

订单履约管理与物流追踪:从仓库到用户手中的全链路

引言 订单履约(Order Fulfillment)是将用户的购买意图转化为实物交付的关键环节。从用户点击"确认下单"的那一刻起,订单就进入了履约流程:库存预占、下发仓库、拣货打包、物流配送,直到用户签收。 这个过程看似简单,实则涉及多个系统的协同:OMS、WMS、TMS、物流商系统。任何一个环节出现问题,都可能导致发货延迟、错发漏发、物流异常等问题,直接影响用户体验。 本文将从第一性原理出发,系统性地探讨订单履约的全流程管理和物流追踪系统的设计与实现。 订单履约全流程 履约流程概览 订单履约流程(7个阶段): 1. 订单创建 → 2. 支付确认 → 3. 下发WMS → 4. 仓库处理 → 5. 发货出库 → 6. 物流配送 → 7. 用户签收 每个阶段的时间要求: - 支付确认:实时(秒级) - 下发WMS:5分钟内 - 仓库处理:2-24小时(视订单量) - 物流配送:1-3天(视距离) - 用户签收:配送当天 履约流程设计 class OrderFulfillmentService: """订单履约服务""" def __init__(self, wms_client, tms_client, logistics_client): self.wms_client = wms_client self.tms_client = tms_client self.logistics_client = logistics_client def fulfill_order(self, order): """ 订单履约主流程 流程: 1. 校验订单状态 2. 下发WMS(仓库管理系统) 3. 等待WMS拣货打包 4. 创建物流单 5. 推送物流信息 6. 监控配送状态 """ try: # 1. 校验订单 self._validate_order_for_fulfillment(order) # 2. 下发WMS wms_result = self._dispatch_to_wms(order) # 3. 更新订单状态 self._update_order_status( order, status='DISPATCHED_TO_WMS', wms_order_id=wms_result.wms_order_id ) # 4. 异步等待WMS完成 self._register_wms_callback(order) return FulfillmentResult( success=True, wms_order_id=wms_result.wms_order_id ) except Exception as e: self._handle_fulfillment_error(order, e) raise def _validate_order_for_fulfillment(self, order): """校验订单可履约性""" # 检查订单状态 if order.status not in ['PAID', 'CONFIRMED']: raise InvalidOrderStatusException( f"订单状态不正确: {order.status}" ) # 检查库存预占 if not self._check_inventory_reserved(order): raise InventoryNotReservedException( f"订单库存未预占: {order.order_id}" ) # 检查收货地址 if not self._validate_delivery_address(order.delivery_address): raise InvalidDeliveryAddressException( "收货地址不完整或不可达" ) def _dispatch_to_wms(self, order): """ 下发WMS WMS单据结构: { "wms_order_id": "WO123456", "oms_order_id": "O123456", "warehouse_id": "WH001", "items": [ { "sku_id": "SKU001", "quantity": 2, "location": "A-01-02" } ], "delivery_info": {...}, "priority": "NORMAL" # URGENT, NORMAL, LOW } """ wms_order = self._build_wms_order(order) # 调用WMS接口 result = self.wms_client.create_order(wms_order) # 记录下发日志 self._log_wms_dispatch( order_id=order.order_id, wms_order_id=result.wms_order_id, warehouse_id=order.warehouse_id ) return result def _build_wms_order(self, order): """构建WMS订单""" return WMSOrder( wms_order_id=self._generate_wms_order_id(), oms_order_id=order.order_id, warehouse_id=order.warehouse_id, items=[ WMSOrderItem( sku_id=item.sku_id, quantity=item.quantity, # 查询SKU在仓库中的位置 location=self._get_sku_location( order.warehouse_id, item.sku_id ) ) for item in order.items ], delivery_info=DeliveryInfo( recipient=order.delivery_address.recipient, phone=order.delivery_address.phone, address=order.delivery_address.full_address, postcode=order.delivery_address.postcode ), # 根据订单类型设置优先级 priority=self._determine_priority(order), # 特殊要求 special_requirements=order.special_requirements, # 预计发货时间 expected_ship_time=order.expected_ship_time ) 订单下发WMS流程 WMS接口设计 class WMSClient: """WMS客户端""" def __init__(self, base_url, api_key): self.base_url = base_url self.api_key = api_key def create_order(self, wms_order): """ 创建WMS订单 接口:POST /api/v1/orders """ try: response = self._request( method='POST', endpoint='/api/v1/orders', data=wms_order.to_dict(), timeout=10 ) if response.status_code == 200: return WMSOrderResult( success=True, wms_order_id=response.json()['wms_order_id'], estimated_ship_time=response.json()['estimated_ship_time'] ) else: raise WMSException( f"WMS创建订单失败: {response.json()}" ) except requests.RequestException as e: # 网络异常,进入重试队列 self._enqueue_retry(wms_order) raise WMSNetworkException(str(e)) def query_order_status(self, wms_order_id): """ 查询WMS订单状态 接口:GET /api/v1/orders/{wms_order_id} """ response = self._request( method='GET', endpoint=f'/api/v1/orders/{wms_order_id}' ) return WMSOrderStatus( wms_order_id=wms_order_id, status=response.json()['status'], picked_at=response.json().get('picked_at'), packed_at=response.json().get('packed_at'), shipped_at=response.json().get('shipped_at'), tracking_number=response.json().get('tracking_number') ) def cancel_order(self, wms_order_id, reason): """ 取消WMS订单 接口:POST /api/v1/orders/{wms_order_id}/cancel """ response = self._request( method='POST', endpoint=f'/api/v1/orders/{wms_order_id}/cancel', data={'reason': reason} ) return response.json()['success'] def _request(self, method, endpoint, data=None, timeout=30): """统一请求方法""" url = f"{self.base_url}{endpoint}" headers = { 'Content-Type': 'application/json', 'X-API-Key': self.api_key } response = requests.request( method=method, url=url, headers=headers, json=data, timeout=timeout ) return response WMS回调处理 class WMSCallbackHandler: """WMS回调处理器""" def handle_callback(self, callback_data): """ 处理WMS回调 WMS会在以下节点回调OMS: 1. 拣货完成 2. 打包完成 3. 出库发货 4. 发货异常(缺货、商品损坏等) """ event_type = callback_data['event_type'] if event_type == 'PICKED': return self._handle_picked(callback_data) elif event_type == 'PACKED': return self._handle_packed(callback_data) elif event_type == 'SHIPPED': return self._handle_shipped(callback_data) elif event_type == 'EXCEPTION': return self._handle_exception(callback_data) else: raise UnknownCallbackEventException(event_type) def _handle_picked(self, data): """处理拣货完成回调""" order = self._get_order(data['oms_order_id']) # 更新订单状态 order.status = 'PICKED' order.picked_at = datetime.now() # 记录拣货信息 self._log_picking_info( order_id=order.order_id, picker=data['picker'], picked_at=data['picked_at'] ) self.order_repo.update(order) def _handle_packed(self, data): """处理打包完成回调""" order = self._get_order(data['oms_order_id']) # 更新订单状态 order.status = 'PACKED' order.packed_at = datetime.now() # 记录包裹信息 package_info = PackageInfo( package_id=data['package_id'], weight=data['weight'], dimensions=data['dimensions'], packer=data['packer'] ) order.package_info = package_info self.order_repo.update(order) def _handle_shipped(self, data): """处理发货出库回调""" order = self._get_order(data['oms_order_id']) # 更新订单状态 order.status = 'SHIPPED' order.shipped_at = datetime.now() order.tracking_number = data['tracking_number'] order.logistics_company = data['logistics_company'] # 消费库存预占 self._consume_inventory_reservation(order) # 创建物流追踪记录 self._create_tracking_record(order) # 发送发货通知 self._send_shipment_notification(order) self.order_repo.update(order) def _handle_exception(self, data): """处理异常回调""" order = self._get_order(data['oms_order_id']) exception_type = data['exception_type'] if exception_type == 'OUT_OF_STOCK': # 库存不足 self._handle_out_of_stock(order, data) elif exception_type == 'DAMAGED_GOODS': # 商品损坏 self._handle_damaged_goods(order, data) elif exception_type == 'ADDRESS_INCOMPLETE': # 地址不完整 self._handle_address_issue(order, data) else: # 其他异常 self._handle_unknown_exception(order, data) def _consume_inventory_reservation(self, order): """消费库存预占""" for item in order.items: # 更新库存:预占转为已售 self.inventory_service.consume_reservation( sku_id=item.sku_id, warehouse_id=order.warehouse_id, quantity=item.quantity, order_id=order.order_id ) 物流追踪系统对接 物流公司接口对接 class LogisticsProvider: """物流服务商抽象接口""" def create_waybill(self, order): """创建物流运单""" raise NotImplementedError def query_tracking(self, tracking_number): """查询物流轨迹""" raise NotImplementedError def cancel_waybill(self, tracking_number): """取消物流单""" raise NotImplementedError class SFExpressProvider(LogisticsProvider): """顺丰速运对接""" def create_waybill(self, order): """ 调用顺丰API创建运单 接口文档:https://qiao.sf-express.com/pages/developDoc/index.html """ request_data = { "orderId": order.order_id, "cargoDetails": [ { "name": item.product_name, "count": item.quantity } for item in order.items ], "consigneeInfo": { "name": order.delivery_address.recipient, "mobile": order.delivery_address.phone, "province": order.delivery_address.province, "city": order.delivery_address.city, "county": order.delivery_address.district, "address": order.delivery_address.detail } } # 调用顺丰API response = self._call_sf_api( api='EXP_RECE_CREATE_ORDER', data=request_data ) return WaybillResult( tracking_number=response['waybillNoInfoList'][0]['waybillNo'], routing_code=response['routeLabelInfo']['routeCode'] ) def query_tracking(self, tracking_number): """查询顺丰物流轨迹""" response = self._call_sf_api( api='EXP_RECE_SEARCH_ROUTES', data={'trackingNumber': tracking_number} ) # 解析物流轨迹 routes = [] for route in response['routeResps']: routes.append(TrackingNode( time=route['acceptTime'], location=route['acceptAddress'], description=route['remark'], operator=route['opCode'] )) return TrackingInfo( tracking_number=tracking_number, status=response['orderStatus'], routes=routes ) class LogisticsAdapter: """物流适配器(统一不同物流商的接口)""" def __init__(self): self.providers = { 'SF': SFExpressProvider(), 'YTO': YTOExpressProvider(), 'ZTO': ZTOExpressProvider(), # ... 其他物流商 } def get_provider(self, logistics_code): """获取物流服务商""" return self.providers.get(logistics_code) def create_waybill(self, order, logistics_code): """统一创建运单接口""" provider = self.get_provider(logistics_code) if not provider: raise UnsupportedLogisticsException(logistics_code) return provider.create_waybill(order) def query_tracking(self, tracking_number, logistics_code): """统一查询物流接口""" provider = self.get_provider(logistics_code) if not provider: raise UnsupportedLogisticsException(logistics_code) return provider.query_tracking(tracking_number) 物流轨迹实时更新 class TrackingUpdateService: """物流轨迹更新服务""" def __init__(self, logistics_adapter): self.logistics_adapter = logistics_adapter def start_tracking(self, order): """ 开始追踪订单物流 策略: 1. 订单发货后,立即查询一次物流信息 2. 每小时轮询一次物流API 3. 收到物流商推送时,实时更新 4. 签收后停止追踪 """ # 立即查询一次 self._update_tracking_info(order) # 加入定时任务队列 self._schedule_tracking_job(order) def _update_tracking_info(self, order): """更新物流信息""" try: # 调用物流API tracking_info = self.logistics_adapter.query_tracking( tracking_number=order.tracking_number, logistics_code=order.logistics_company ) # 保存物流轨迹 self._save_tracking_routes(order, tracking_info.routes) # 更新订单状态 self._update_order_delivery_status(order, tracking_info.status) # 推送用户通知 if self._should_notify_user(tracking_info): self._send_tracking_notification(order, tracking_info) except Exception as e: self.logger.error( f"更新物流信息失败: {order.order_id}", exc_info=e ) def _save_tracking_routes(self, order, routes): """保存物流轨迹""" for route in routes: # 去重:避免重复保存同一个节点 existing = self.tracking_repo.find_by_time( order_id=order.order_id, time=route.time ) if not existing: tracking_node = TrackingNode( order_id=order.order_id, tracking_number=order.tracking_number, time=route.time, location=route.location, description=route.description, operator=route.operator ) self.tracking_repo.save(tracking_node) def _update_order_delivery_status(self, order, logistics_status): """根据物流状态更新订单状态""" status_mapping = { 'IN_TRANSIT': 'DELIVERING', # 运输中 'OUT_FOR_DELIVERY': 'DELIVERING', # 派送中 'DELIVERED': 'DELIVERED', # 已签收 'FAILED': 'DELIVERY_FAILED' # 派送失败 } new_status = status_mapping.get(logistics_status) if new_status and order.status != new_status: order.status = new_status if new_status == 'DELIVERED': order.delivered_at = datetime.now() self.order_repo.update(order) def handle_logistics_push(self, push_data): """ 处理物流商主动推送 物流商会在关键节点推送: 1. 揽件 2. 到达转运中心 3. 派送中 4. 签收 5. 异常(拒收、破损等) """ tracking_number = push_data['tracking_number'] order = self.order_repo.find_by_tracking_number(tracking_number) if not order: self.logger.warning( f"未找到物流单对应的订单: {tracking_number}" ) return # 保存推送的物流节点 route = TrackingNode( order_id=order.order_id, tracking_number=tracking_number, time=push_data['time'], location=push_data['location'], description=push_data['description'] ) self.tracking_repo.save(route) # 更新订单状态 self._update_order_delivery_status( order, push_data['status'] ) # 发送用户通知 self._send_tracking_notification(order, push_data) 订单异常处理 异常场景分类 class FulfillmentExceptionHandler: """履约异常处理器""" def handle_exception(self, order, exception): """处理履约异常""" if isinstance(exception, OutOfStockException): # 场景1:仓库缺货 return self._handle_out_of_stock(order, exception) elif isinstance(exception, DelayedShipmentException): # 场景2:延迟发货 return self._handle_delayed_shipment(order, exception) elif isinstance(exception, DamagedGoodsException): # 场景3:商品损坏 return self._handle_damaged_goods(order, exception) elif isinstance(exception, DeliveryFailedException): # 场景4:配送失败 return self._handle_delivery_failed(order, exception) elif isinstance(exception, AddressIssueException): # 场景5:地址问题 return self._handle_address_issue(order, exception) def _handle_out_of_stock(self, order, exception): """处理缺货异常""" # 方案1:尝试跨仓调拨 transfer_result = self._try_warehouse_transfer( order, exception.missing_items ) if transfer_result.success: # 重新下发WMS return self._retry_fulfillment(order) # 方案2:部分发货 available_items = [ item for item in order.items if item not in exception.missing_items ] if available_items and order.allow_partial_shipment: return self._create_partial_shipment(order, available_items) # 方案3:取消订单并退款 return self._cancel_and_refund( order, reason="库存不足", compensation=True # 给予补偿券 ) def _handle_delayed_shipment(self, order, exception): """处理延迟发货""" # 更新预计发货时间 order.expected_ship_time = exception.new_expected_time # 发送延迟通知 self._send_delay_notification( order, delay_reason=exception.reason, new_time=exception.new_expected_time ) # 如果延迟超过24小时,提供补偿 delay_hours = ( exception.new_expected_time - order.original_ship_time ).total_seconds() / 3600 if delay_hours > 24: self._offer_compensation( order, compensation_type='COUPON', amount=10 # 10元优惠券 ) self.order_repo.update(order) def _handle_delivery_failed(self, order, exception): """处理配送失败""" failure_reason = exception.reason if failure_reason == 'RECIPIENT_REFUSED': # 收件人拒收 return self._handle_refusal(order) elif failure_reason == 'ADDRESS_NOT_FOUND': # 地址找不到 return self._request_address_correction(order) elif failure_reason == 'RECIPIENT_NOT_AVAILABLE': # 收件人不在 return self._schedule_redelivery(order) elif failure_reason == 'DAMAGED_IN_TRANSIT': # 运输途中损坏 return self._handle_transit_damage(order) 异常监控与告警 class FulfillmentMonitor: """履约监控""" def __init__(self): # 定义监控指标 self.metrics = { 'delayed_shipment_rate': 0.0, # 延迟发货率 'out_of_stock_rate': 0.0, # 缺货率 'delivery_failure_rate': 0.0, # 配送失败率 'avg_fulfillment_time': 0.0, # 平均履约时间 'on_time_delivery_rate': 0.0 # 按时送达率 } # 定义告警阈值 self.alert_thresholds = { 'delayed_shipment_rate': 0.05, # 5% 'out_of_stock_rate': 0.02, # 2% 'delivery_failure_rate': 0.03, # 3% 'avg_fulfillment_time': 48, # 48小时 'on_time_delivery_rate': 0.95 # 95% } def collect_metrics(self): """收集履约指标""" now = datetime.now() today_start = now.replace(hour=0, minute=0, second=0) # 今日订单总数 total_orders = self.order_repo.count_by_date(today_start) # 延迟发货订单数 delayed_orders = self.order_repo.count_delayed_shipment(today_start) self.metrics['delayed_shipment_rate'] = delayed_orders / total_orders # 缺货订单数 out_of_stock_orders = self.order_repo.count_out_of_stock(today_start) self.metrics['out_of_stock_rate'] = out_of_stock_orders / total_orders # 配送失败订单数 failed_deliveries = self.order_repo.count_delivery_failed(today_start) self.metrics['delivery_failure_rate'] = failed_deliveries / total_orders # 平均履约时间 avg_time = self.order_repo.calculate_avg_fulfillment_time(today_start) self.metrics['avg_fulfillment_time'] = avg_time # 按时送达率 on_time = self.order_repo.count_on_time_delivery(today_start) self.metrics['on_time_delivery_rate'] = on_time / total_orders # 检查告警 self._check_alerts() def _check_alerts(self): """检查告警阈值""" for metric, value in self.metrics.items(): threshold = self.alert_thresholds.get(metric) if not threshold: continue # 比率类指标:超过阈值告警 if metric.endswith('_rate'): if value > threshold: self._send_alert( metric=metric, current=value, threshold=threshold, level='WARNING' ) # 时间类指标:超过阈值告警 elif metric == 'avg_fulfillment_time': if value > threshold: self._send_alert( metric=metric, current=value, threshold=threshold, level='WARNING' ) # 达成率指标:低于阈值告警 elif metric == 'on_time_delivery_rate': if value < threshold: self._send_alert( metric=metric, current=value, threshold=threshold, level='CRITICAL' ) 实时订单追踪系统设计 用户端追踪界面 class OrderTrackingAPI: """订单追踪API""" def get_tracking_info(self, order_id, user_id): """ 获取订单追踪信息 返回结构: { "order_id": "O123456", "status": "DELIVERING", "current_location": "北京市朝阳区XX配送站", "estimated_delivery": "今日18:00前", "tracking_number": "SF1234567890", "courier": { "name": "张师傅", "phone": "138****1234" }, "timeline": [ { "time": "2025-11-22 08:00:00", "status": "SHIPPED", "description": "您的订单已从【北京仓】发出" }, { "time": "2025-11-22 10:00:00", "status": "IN_TRANSIT", "description": "快件已到达【北京转运中心】" }, { "time": "2025-11-22 14:00:00", "status": "OUT_FOR_DELIVERY", "description": "快件正在派送中,配送员【张师傅】" } ] } """ # 1. 查询订单 order = self.order_repo.find_by_id(order_id) # 2. 权限校验 if order.user_id != user_id: raise UnauthorizedException("无权查看该订单") # 3. 查询物流轨迹 tracking_routes = self.tracking_repo.find_by_order(order_id) # 4. 查询配送员信息 courier_info = None if order.status == 'OUT_FOR_DELIVERY': courier_info = self._get_courier_info(order.tracking_number) # 5. 组装返回数据 return { "order_id": order.order_id, "status": order.status, "current_location": self._get_current_location(tracking_routes), "estimated_delivery": self._estimate_delivery_time(order), "tracking_number": order.tracking_number, "courier": courier_info, "timeline": [ { "time": route.time.strftime("%Y-%m-%d %H:%M:%S"), "status": route.status, "description": route.description } for route in tracking_routes ] } WebSocket实时推送 class RealtimeTrackingService: """实时追踪服务""" def __init__(self, websocket_server): self.websocket_server = websocket_server # 订阅物流更新事件 self.event_bus.subscribe('logistics.updated', self._handle_update) def _handle_update(self, event): """处理物流更新事件""" order_id = event['order_id'] tracking_info = event['tracking_info'] # 推送给订阅该订单的用户 self.websocket_server.send_to_room( room=f"order:{order_id}", event='tracking_update', data=tracking_info ) 总结 订单履约管理是连接线上订单与线下交付的关键环节。关键要点: ...

2025-11-22 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...