WMS与OMS集成——打通订单履约的关键链路
引言:WMS与OMS的关系 OMS是大脑,WMS是手脚: OMS负责订单决策:接单、拆单、分仓 WMS负责实物执行:拣货、打包、发货 集成的核心目标: 指令准确传递:OMS的出库指令准确下发到WMS 库存实时同步:WMS的实物库存实时同步到OMS 状态及时回传:WMS的执行状态及时回传OMS 一、集成架构 1.1 系统边界 ┌─────────────────────────────────────────────────────────────┐ │ OMS │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │订单管理 │ │库存预占 │ │履约路由 │ │状态跟踪 │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ └───────┼───────────┼───────────┼───────────┼─────────────────┘ │ │ │ │ │ ┌──────┴───────────┴───────────┴──────┐ │ │ 消息队列/API │ │ └──────┬───────────┬───────────┬──────┘ │ │ │ │ ┌───────┼───────────┼───────────┼───────────┼─────────────────┐ │ ▼ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │出库管理 │ │库存管理 │ │拣货管理 │ │发货管理 │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ WMS │ └─────────────────────────────────────────────────────────────┘ 1.2 数据流向 数据 方向 说明 出库指令 OMS → WMS 订单履约指令 可售库存 WMS → OMS 库存同步 出库状态 WMS → OMS 拣货、发货状态 库存变动 WMS → OMS 入库、盘点等 二、出库指令下发 2.1 出库单数据结构 /** * OMS下发给WMS的出库指令 */ public class OutboundCommand { private String commandId; // 指令ID private String orderNo; // OMS订单号 private String warehouseId; // 仓库ID // 收货信息 private String consignee; // 收货人 private String phone; // 电话 private String province; // 省 private String city; // 市 private String district; // 区 private String address; // 详细地址 // 物流信息 private String carrierCode; // 承运商编码 private String serviceType; // 服务类型 // 时效要求 private Integer priority; // 优先级 private LocalDateTime deadline; // 截止时间 // 商品明细 private List<OutboundItem> items; // 扩展信息 private Map<String, String> extInfo; } public class OutboundItem { private String skuId; // SKU编码 private String skuName; // SKU名称 private Integer quantity; // 数量 private String barcode; // 条码 } 2.2 指令下发服务 @Service public class OutboundCommandService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * OMS调用:下发出库指令 */ public void sendOutboundCommand(OutboundCommand command) { // 1. 参数校验 validateCommand(command); // 2. 幂等检查 if (commandRepository.existsByCommandId(command.getCommandId())) { log.warn("重复指令: {}", command.getCommandId()); return; } // 3. 保存指令记录 OutboundCommandRecord record = new OutboundCommandRecord(); record.setCommandId(command.getCommandId()); record.setOrderNo(command.getOrderNo()); record.setWarehouseId(command.getWarehouseId()); record.setStatus(CommandStatus.SENT); record.setContent(JSON.toJSONString(command)); record.setCreatedAt(LocalDateTime.now()); commandRepository.save(record); // 4. 发送消息到WMS String topic = "WMS_OUTBOUND_COMMAND_" + command.getWarehouseId(); rocketMQTemplate.syncSend(topic, command); log.info("出库指令已下发: {}", command.getCommandId()); } } 2.3 WMS接收处理 @Service @RocketMQMessageListener( topic = "WMS_OUTBOUND_COMMAND_${warehouse.id}", consumerGroup = "wms-outbound-consumer" ) public class OutboundCommandConsumer implements RocketMQListener<OutboundCommand> { @Override public void onMessage(OutboundCommand command) { try { // 1. 幂等检查 if (outboundOrderRepository.existsByOmsOrderNo(command.getOrderNo())) { log.warn("订单已存在: {}", command.getOrderNo()); return; } // 2. 创建WMS出库单 OutboundOrder order = createOutboundOrder(command); // 3. 校验库存 boolean stockAvailable = checkStock(order); if (!stockAvailable) { // 库存不足,回传异常 sendStockShortageCallback(command, order); return; } // 4. 锁定库存 lockStock(order); // 5. 回传接收成功 sendReceiveCallback(command.getCommandId(), "SUCCESS"); } catch (Exception e) { log.error("处理出库指令失败: {}", command.getCommandId(), e); sendReceiveCallback(command.getCommandId(), "FAIL", e.getMessage()); } } private OutboundOrder createOutboundOrder(OutboundCommand command) { OutboundOrder order = new OutboundOrder(); order.setOutboundNo(generateOutboundNo()); order.setOmsOrderNo(command.getOrderNo()); order.setWarehouseId(command.getWarehouseId()); order.setConsignee(command.getConsignee()); order.setPhone(command.getPhone()); order.setAddress(buildFullAddress(command)); order.setCarrierCode(command.getCarrierCode()); order.setPriority(command.getPriority()); order.setDeadline(command.getDeadline()); order.setStatus(OutboundStatus.CREATED); outboundOrderRepository.save(order); // 保存明细 for (OutboundItem item : command.getItems()) { OutboundOrderItem orderItem = new OutboundOrderItem(); orderItem.setOutboundNo(order.getOutboundNo()); orderItem.setSkuId(item.getSkuId()); orderItem.setQuantity(item.getQuantity()); outboundItemRepository.save(orderItem); } return order; } } 三、库存同步 3.1 库存同步策略 策略 说明 适用场景 实时同步 每次变动立即同步 库存紧张、高并发 定时同步 定时批量同步 库存充足 增量同步 只同步变化部分 数据量大 全量同步 定期全量覆盖 数据校准 3.2 实时库存同步 @Service public class InventorySyncService { /** * 库存变动时触发同步 */ @EventListener public void onInventoryChange(InventoryChangeEvent event) { // 计算可售库存 int availableQty = calculateAvailableQty( event.getWarehouseId(), event.getSkuId() ); // 发送同步消息 InventorySyncMessage message = new InventorySyncMessage(); message.setWarehouseId(event.getWarehouseId()); message.setSkuId(event.getSkuId()); message.setAvailableQty(availableQty); message.setChangeType(event.getChangeType()); message.setChangeQty(event.getChangeQty()); message.setTimestamp(LocalDateTime.now()); rocketMQTemplate.asyncSend("OMS_INVENTORY_SYNC", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("库存同步成功: {} {}", event.getSkuId(), availableQty); } @Override public void onException(Throwable e) { log.error("库存同步失败: {}", event.getSkuId(), e); // 加入重试队列 retrySyncQueue.add(message); } }); } /** * 计算可售库存 * 可售 = 实物库存 - 锁定库存 - 安全库存 */ private int calculateAvailableQty(String warehouseId, String skuId) { // 查询实物库存 int physicalQty = inventoryRepository.sumQuantity(warehouseId, skuId); // 查询锁定库存 int lockedQty = inventoryRepository.sumLockedQty(warehouseId, skuId); // 查询安全库存 int safetyStock = skuService.getSafetyStock(skuId); return Math.max(0, physicalQty - lockedQty - safetyStock); } } 3.3 OMS接收库存同步 @Service @RocketMQMessageListener( topic = "OMS_INVENTORY_SYNC", consumerGroup = "oms-inventory-consumer" ) public class InventorySyncConsumer implements RocketMQListener<InventorySyncMessage> { @Override public void onMessage(InventorySyncMessage message) { // 1. 更新OMS库存 OmsInventory inventory = omsInventoryRepository .findByWarehouseAndSku(message.getWarehouseId(), message.getSkuId()); if (inventory == null) { inventory = new OmsInventory(); inventory.setWarehouseId(message.getWarehouseId()); inventory.setSkuId(message.getSkuId()); } inventory.setAvailableQty(message.getAvailableQty()); inventory.setLastSyncTime(message.getTimestamp()); omsInventoryRepository.save(inventory); // 2. 更新渠道库存(如果需要) channelInventoryService.syncToChannels(message.getSkuId()); // 3. 检查库存预警 checkInventoryAlert(message); } } 3.4 定时全量同步 @Service public class FullInventorySyncService { /** * 每天凌晨全量同步库存 */ @Scheduled(cron = "0 0 2 * * ?") public void fullSync() { log.info("开始全量库存同步"); List<String> warehouses = warehouseService.getAllActiveWarehouses(); for (String warehouseId : warehouses) { // 获取WMS所有库存 List<InventorySummary> wmsInventories = wmsClient.getAllInventory(warehouseId); // 批量同步到OMS List<InventorySyncMessage> messages = wmsInventories.stream() .map(inv -> { InventorySyncMessage msg = new InventorySyncMessage(); msg.setWarehouseId(warehouseId); msg.setSkuId(inv.getSkuId()); msg.setAvailableQty(inv.getAvailableQty()); msg.setChangeType("FULL_SYNC"); return msg; }) .collect(Collectors.toList()); // 批量发送 rocketMQTemplate.syncSend("OMS_INVENTORY_SYNC_BATCH", messages); } log.info("全量库存同步完成"); } } 四、状态回传 4.1 状态流转 OMS订单状态: 待发货 ──> 拣货中 ──> 已发货 ──> 已签收 │ │ │ │ WMS状态回传: │ │ │ │ ▼ ▼ ▼ ▼ 接收成功 开始拣货 发货完成 物流回传 4.2 状态回传消息 /** * WMS回传给OMS的状态消息 */ public class OutboundStatusCallback { private String callbackId; // 回传ID private String omsOrderNo; // OMS订单号 private String wmsOutboundNo; // WMS出库单号 private String status; // 状态 private LocalDateTime statusTime; // 状态时间 // 发货信息(发货状态时填写) private String trackingNo; // 物流单号 private String carrierCode; // 承运商 private Integer packageCount; // 包裹数 private BigDecimal weight; // 重量 // 异常信息 private String errorCode; // 错误码 private String errorMessage; // 错误信息 } 4.3 状态回传服务 @Service public class StatusCallbackService { /** * 拣货开始回传 */ public void callbackPickStart(OutboundOrder order) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("PICKING"); callback.setStatusTime(LocalDateTime.now()); sendCallback(callback); } /** * 发货完成回传 */ public void callbackShipped(OutboundOrder order, ShipmentInfo shipment) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("SHIPPED"); callback.setStatusTime(LocalDateTime.now()); callback.setTrackingNo(shipment.getTrackingNo()); callback.setCarrierCode(shipment.getCarrierCode()); callback.setPackageCount(shipment.getPackageCount()); callback.setWeight(shipment.getWeight()); sendCallback(callback); } /** * 异常回传 */ public void callbackException(OutboundOrder order, String errorCode, String errorMessage) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("EXCEPTION"); callback.setStatusTime(LocalDateTime.now()); callback.setErrorCode(errorCode); callback.setErrorMessage(errorMessage); sendCallback(callback); } private void sendCallback(OutboundStatusCallback callback) { // 保存回传记录 CallbackRecord record = new CallbackRecord(); record.setCallbackId(callback.getCallbackId()); record.setContent(JSON.toJSONString(callback)); record.setStatus(CallbackStatus.PENDING); callbackRepository.save(record); // 发送消息 rocketMQTemplate.asyncSend("OMS_STATUS_CALLBACK", callback, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { record.setStatus(CallbackStatus.SUCCESS); callbackRepository.save(record); } @Override public void onException(Throwable e) { record.setStatus(CallbackStatus.FAIL); record.setErrorMessage(e.getMessage()); callbackRepository.save(record); // 加入重试队列 retryQueue.add(callback); } }); } } 4.4 OMS接收状态回传 @Service @RocketMQMessageListener( topic = "OMS_STATUS_CALLBACK", consumerGroup = "oms-callback-consumer" ) public class StatusCallbackConsumer implements RocketMQListener<OutboundStatusCallback> { @Override public void onMessage(OutboundStatusCallback callback) { // 1. 幂等检查 if (callbackRepository.existsByCallbackId(callback.getCallbackId())) { return; } // 2. 查找订单 Order order = orderRepository.findByOrderNo(callback.getOmsOrderNo()); if (order == null) { log.error("订单不存在: {}", callback.getOmsOrderNo()); return; } // 3. 更新订单状态 switch (callback.getStatus()) { case "PICKING": order.setStatus(OrderStatus.PICKING); order.setPickStartTime(callback.getStatusTime()); break; case "SHIPPED": order.setStatus(OrderStatus.SHIPPED); order.setShipTime(callback.getStatusTime()); order.setTrackingNo(callback.getTrackingNo()); order.setCarrierCode(callback.getCarrierCode()); // 释放库存预占 inventoryService.releaseReservation(order.getOrderNo()); break; case "EXCEPTION": order.setStatus(OrderStatus.EXCEPTION); order.setExceptionCode(callback.getErrorCode()); order.setExceptionMessage(callback.getErrorMessage()); // 触发异常处理流程 exceptionHandler.handle(order, callback); break; } orderRepository.save(order); // 4. 通知下游(如通知客户) notifyService.notifyOrderStatusChange(order); } } 五、异常处理 5.1 常见异常场景 异常 原因 处理方式 库存不足 WMS实际库存不够 重新分仓或取消 地址异常 地址无法配送 人工处理 SKU不存在 WMS没有该商品 检查主数据 超时未处理 WMS未及时处理 催促或转仓 5.2 异常处理服务 @Service public class IntegrationExceptionHandler { /** * 处理库存不足异常 */ public void handleStockShortage(Order order, OutboundStatusCallback callback) { // 1. 尝试重新分仓 String newWarehouse = fulfillmentRouter.findAlternativeWarehouse(order); if (newWarehouse != null) { // 取消原出库指令 wmsClient.cancelOutbound(order.getOrderNo(), order.getWarehouseId()); // 下发新出库指令 order.setWarehouseId(newWarehouse); outboundCommandService.sendOutboundCommand(buildCommand(order)); log.info("订单{}重新分仓到{}", order.getOrderNo(), newWarehouse); } else { // 无可用仓库,标记异常 order.setStatus(OrderStatus.STOCK_SHORTAGE); orderRepository.save(order); // 通知客服 alertService.notifyCustomerService(order, "库存不足,需人工处理"); } } /** * 处理超时未处理 */ @Scheduled(fixedRate = 300000) // 每5分钟检查 public void checkTimeoutOrders() { // 查找超时订单 LocalDateTime timeout = LocalDateTime.now().minusHours(4); List<Order> timeoutOrders = orderRepository .findByStatusAndCreatedBefore(OrderStatus.PENDING_SHIP, timeout); for (Order order : timeoutOrders) { // 查询WMS状态 OutboundOrder wmsOrder = wmsClient.getOutboundOrder(order.getOrderNo()); if (wmsOrder == null || wmsOrder.getStatus() == OutboundStatus.CREATED) { // WMS未处理,发送催促 alertService.notifyWarehouse(order.getWarehouseId(), "订单" + order.getOrderNo() + "超时未处理"); } } } } 六、接口设计 6.1 REST API接口 @RestController @RequestMapping("/api/wms") public class WmsIntegrationController { /** * OMS调用:下发出库指令 */ @PostMapping("/outbound/command") public Result<Void> sendOutboundCommand(@RequestBody OutboundCommand command) { outboundCommandService.sendOutboundCommand(command); return Result.success(); } /** * OMS调用:取消出库指令 */ @PostMapping("/outbound/cancel") public Result<Void> cancelOutbound(@RequestBody CancelRequest request) { wmsService.cancelOutbound(request.getOrderNo(), request.getReason()); return Result.success(); } /** * OMS调用:查询库存 */ @GetMapping("/inventory") public Result<InventoryVO> queryInventory( @RequestParam String warehouseId, @RequestParam String skuId) { InventoryVO inventory = inventoryService.queryInventory(warehouseId, skuId); return Result.success(inventory); } /** * WMS调用:状态回传 */ @PostMapping("/callback/status") public Result<Void> statusCallback(@RequestBody OutboundStatusCallback callback) { callbackService.handleCallback(callback); return Result.success(); } /** * WMS调用:库存同步 */ @PostMapping("/inventory/sync") public Result<Void> syncInventory(@RequestBody InventorySyncRequest request) { inventorySyncService.sync(request); return Result.success(); } } 6.2 接口幂等设计 @Aspect @Component public class IdempotentAspect { @Around("@annotation(idempotent)") public Object checkIdempotent(ProceedingJoinPoint point, Idempotent idempotent) throws Throwable { // 获取幂等键 String key = getIdempotentKey(point, idempotent); // 检查是否已处理 String result = redisTemplate.opsForValue().get(key); if (result != null) { log.info("重复请求,返回缓存结果: {}", key); return JSON.parseObject(result, point.getSignature().getDeclaringType()); } // 执行业务 Object response = point.proceed(); // 缓存结果 redisTemplate.opsForValue().set(key, JSON.toJSONString(response), idempotent.expireSeconds(), TimeUnit.SECONDS); return response; } } 七、监控与告警 7.1 集成监控指标 指标 说明 告警阈值 指令下发成功率 成功/总数 < 99% 库存同步延迟 同步耗时 > 5秒 状态回传延迟 回传耗时 > 10秒 消息积压数 队列积压 > 1000 7.2 监控实现 @Service public class IntegrationMonitorService { @Scheduled(fixedRate = 60000) public void monitor() { // 检查消息积压 long pendingCount = messageRepository.countPending(); if (pendingCount > 1000) { alertService.sendAlert("消息积压告警", "待处理消息数: " + pendingCount); } // 检查同步延迟 LocalDateTime lastSync = inventorySyncRepository.getLastSyncTime(); if (lastSync.isBefore(LocalDateTime.now().minusMinutes(5))) { alertService.sendAlert("库存同步延迟", "最后同步时间: " + lastSync); } // 检查失败率 long totalCount = callbackRepository.countByHour(); long failCount = callbackRepository.countFailByHour(); double failRate = failCount * 100.0 / totalCount; if (failRate > 1) { alertService.sendAlert("回传失败率告警", String.format("失败率: %.2f%%", failRate)); } } } 八、总结 8.1 集成核心要点 指令下发:幂等、可靠、可追溯 库存同步:实时、准确、有兜底 状态回传:及时、完整、可重试 异常处理:自动重试、人工兜底 8.2 最佳实践 实践 说明 消息队列解耦 异步处理,削峰填谷 幂等设计 防止重复处理 重试机制 失败自动重试 监控告警 及时发现问题 系列文章导航 ...