WMS与OMS集成——打通订单履约的关键链路

引言:WMS与OMS的关系 OMS是大脑,WMS是手脚: OMS负责订单决策:接单、拆单、分仓 WMS负责实物执行:拣货、打包、发货 集成的核心目标: 指令准确传递:OMS的出库指令准确下发到WMS 库存实时同步:WMS的实物库存实时同步到OMS 状态及时回传:WMS的执行状态及时回传OMS 一、集成架构 1.1 系统边界 ┌─────────────────────────────────────────────────────────────┐ │ OMS │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │订单管理 │ │库存预占 │ │履约路由 │ │状态跟踪 │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ └───────┼───────────┼───────────┼───────────┼─────────────────┘ │ │ │ │ │ ┌──────┴───────────┴───────────┴──────┐ │ │ 消息队列/API │ │ └──────┬───────────┬───────────┬──────┘ │ │ │ │ ┌───────┼───────────┼───────────┼───────────┼─────────────────┐ │ ▼ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │出库管理 │ │库存管理 │ │拣货管理 │ │发货管理 │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ WMS │ └─────────────────────────────────────────────────────────────┘ 1.2 数据流向 数据 方向 说明 出库指令 OMS → WMS 订单履约指令 可售库存 WMS → OMS 库存同步 出库状态 WMS → OMS 拣货、发货状态 库存变动 WMS → OMS 入库、盘点等 二、出库指令下发 2.1 出库单数据结构 /** * OMS下发给WMS的出库指令 */ public class OutboundCommand { private String commandId; // 指令ID private String orderNo; // OMS订单号 private String warehouseId; // 仓库ID // 收货信息 private String consignee; // 收货人 private String phone; // 电话 private String province; // 省 private String city; // 市 private String district; // 区 private String address; // 详细地址 // 物流信息 private String carrierCode; // 承运商编码 private String serviceType; // 服务类型 // 时效要求 private Integer priority; // 优先级 private LocalDateTime deadline; // 截止时间 // 商品明细 private List<OutboundItem> items; // 扩展信息 private Map<String, String> extInfo; } public class OutboundItem { private String skuId; // SKU编码 private String skuName; // SKU名称 private Integer quantity; // 数量 private String barcode; // 条码 } 2.2 指令下发服务 @Service public class OutboundCommandService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * OMS调用:下发出库指令 */ public void sendOutboundCommand(OutboundCommand command) { // 1. 参数校验 validateCommand(command); // 2. 幂等检查 if (commandRepository.existsByCommandId(command.getCommandId())) { log.warn("重复指令: {}", command.getCommandId()); return; } // 3. 保存指令记录 OutboundCommandRecord record = new OutboundCommandRecord(); record.setCommandId(command.getCommandId()); record.setOrderNo(command.getOrderNo()); record.setWarehouseId(command.getWarehouseId()); record.setStatus(CommandStatus.SENT); record.setContent(JSON.toJSONString(command)); record.setCreatedAt(LocalDateTime.now()); commandRepository.save(record); // 4. 发送消息到WMS String topic = "WMS_OUTBOUND_COMMAND_" + command.getWarehouseId(); rocketMQTemplate.syncSend(topic, command); log.info("出库指令已下发: {}", command.getCommandId()); } } 2.3 WMS接收处理 @Service @RocketMQMessageListener( topic = "WMS_OUTBOUND_COMMAND_${warehouse.id}", consumerGroup = "wms-outbound-consumer" ) public class OutboundCommandConsumer implements RocketMQListener<OutboundCommand> { @Override public void onMessage(OutboundCommand command) { try { // 1. 幂等检查 if (outboundOrderRepository.existsByOmsOrderNo(command.getOrderNo())) { log.warn("订单已存在: {}", command.getOrderNo()); return; } // 2. 创建WMS出库单 OutboundOrder order = createOutboundOrder(command); // 3. 校验库存 boolean stockAvailable = checkStock(order); if (!stockAvailable) { // 库存不足,回传异常 sendStockShortageCallback(command, order); return; } // 4. 锁定库存 lockStock(order); // 5. 回传接收成功 sendReceiveCallback(command.getCommandId(), "SUCCESS"); } catch (Exception e) { log.error("处理出库指令失败: {}", command.getCommandId(), e); sendReceiveCallback(command.getCommandId(), "FAIL", e.getMessage()); } } private OutboundOrder createOutboundOrder(OutboundCommand command) { OutboundOrder order = new OutboundOrder(); order.setOutboundNo(generateOutboundNo()); order.setOmsOrderNo(command.getOrderNo()); order.setWarehouseId(command.getWarehouseId()); order.setConsignee(command.getConsignee()); order.setPhone(command.getPhone()); order.setAddress(buildFullAddress(command)); order.setCarrierCode(command.getCarrierCode()); order.setPriority(command.getPriority()); order.setDeadline(command.getDeadline()); order.setStatus(OutboundStatus.CREATED); outboundOrderRepository.save(order); // 保存明细 for (OutboundItem item : command.getItems()) { OutboundOrderItem orderItem = new OutboundOrderItem(); orderItem.setOutboundNo(order.getOutboundNo()); orderItem.setSkuId(item.getSkuId()); orderItem.setQuantity(item.getQuantity()); outboundItemRepository.save(orderItem); } return order; } } 三、库存同步 3.1 库存同步策略 策略 说明 适用场景 实时同步 每次变动立即同步 库存紧张、高并发 定时同步 定时批量同步 库存充足 增量同步 只同步变化部分 数据量大 全量同步 定期全量覆盖 数据校准 3.2 实时库存同步 @Service public class InventorySyncService { /** * 库存变动时触发同步 */ @EventListener public void onInventoryChange(InventoryChangeEvent event) { // 计算可售库存 int availableQty = calculateAvailableQty( event.getWarehouseId(), event.getSkuId() ); // 发送同步消息 InventorySyncMessage message = new InventorySyncMessage(); message.setWarehouseId(event.getWarehouseId()); message.setSkuId(event.getSkuId()); message.setAvailableQty(availableQty); message.setChangeType(event.getChangeType()); message.setChangeQty(event.getChangeQty()); message.setTimestamp(LocalDateTime.now()); rocketMQTemplate.asyncSend("OMS_INVENTORY_SYNC", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("库存同步成功: {} {}", event.getSkuId(), availableQty); } @Override public void onException(Throwable e) { log.error("库存同步失败: {}", event.getSkuId(), e); // 加入重试队列 retrySyncQueue.add(message); } }); } /** * 计算可售库存 * 可售 = 实物库存 - 锁定库存 - 安全库存 */ private int calculateAvailableQty(String warehouseId, String skuId) { // 查询实物库存 int physicalQty = inventoryRepository.sumQuantity(warehouseId, skuId); // 查询锁定库存 int lockedQty = inventoryRepository.sumLockedQty(warehouseId, skuId); // 查询安全库存 int safetyStock = skuService.getSafetyStock(skuId); return Math.max(0, physicalQty - lockedQty - safetyStock); } } 3.3 OMS接收库存同步 @Service @RocketMQMessageListener( topic = "OMS_INVENTORY_SYNC", consumerGroup = "oms-inventory-consumer" ) public class InventorySyncConsumer implements RocketMQListener<InventorySyncMessage> { @Override public void onMessage(InventorySyncMessage message) { // 1. 更新OMS库存 OmsInventory inventory = omsInventoryRepository .findByWarehouseAndSku(message.getWarehouseId(), message.getSkuId()); if (inventory == null) { inventory = new OmsInventory(); inventory.setWarehouseId(message.getWarehouseId()); inventory.setSkuId(message.getSkuId()); } inventory.setAvailableQty(message.getAvailableQty()); inventory.setLastSyncTime(message.getTimestamp()); omsInventoryRepository.save(inventory); // 2. 更新渠道库存(如果需要) channelInventoryService.syncToChannels(message.getSkuId()); // 3. 检查库存预警 checkInventoryAlert(message); } } 3.4 定时全量同步 @Service public class FullInventorySyncService { /** * 每天凌晨全量同步库存 */ @Scheduled(cron = "0 0 2 * * ?") public void fullSync() { log.info("开始全量库存同步"); List<String> warehouses = warehouseService.getAllActiveWarehouses(); for (String warehouseId : warehouses) { // 获取WMS所有库存 List<InventorySummary> wmsInventories = wmsClient.getAllInventory(warehouseId); // 批量同步到OMS List<InventorySyncMessage> messages = wmsInventories.stream() .map(inv -> { InventorySyncMessage msg = new InventorySyncMessage(); msg.setWarehouseId(warehouseId); msg.setSkuId(inv.getSkuId()); msg.setAvailableQty(inv.getAvailableQty()); msg.setChangeType("FULL_SYNC"); return msg; }) .collect(Collectors.toList()); // 批量发送 rocketMQTemplate.syncSend("OMS_INVENTORY_SYNC_BATCH", messages); } log.info("全量库存同步完成"); } } 四、状态回传 4.1 状态流转 OMS订单状态: 待发货 ──> 拣货中 ──> 已发货 ──> 已签收 │ │ │ │ WMS状态回传: │ │ │ │ ▼ ▼ ▼ ▼ 接收成功 开始拣货 发货完成 物流回传 4.2 状态回传消息 /** * WMS回传给OMS的状态消息 */ public class OutboundStatusCallback { private String callbackId; // 回传ID private String omsOrderNo; // OMS订单号 private String wmsOutboundNo; // WMS出库单号 private String status; // 状态 private LocalDateTime statusTime; // 状态时间 // 发货信息(发货状态时填写) private String trackingNo; // 物流单号 private String carrierCode; // 承运商 private Integer packageCount; // 包裹数 private BigDecimal weight; // 重量 // 异常信息 private String errorCode; // 错误码 private String errorMessage; // 错误信息 } 4.3 状态回传服务 @Service public class StatusCallbackService { /** * 拣货开始回传 */ public void callbackPickStart(OutboundOrder order) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("PICKING"); callback.setStatusTime(LocalDateTime.now()); sendCallback(callback); } /** * 发货完成回传 */ public void callbackShipped(OutboundOrder order, ShipmentInfo shipment) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("SHIPPED"); callback.setStatusTime(LocalDateTime.now()); callback.setTrackingNo(shipment.getTrackingNo()); callback.setCarrierCode(shipment.getCarrierCode()); callback.setPackageCount(shipment.getPackageCount()); callback.setWeight(shipment.getWeight()); sendCallback(callback); } /** * 异常回传 */ public void callbackException(OutboundOrder order, String errorCode, String errorMessage) { OutboundStatusCallback callback = new OutboundStatusCallback(); callback.setCallbackId(generateCallbackId()); callback.setOmsOrderNo(order.getOmsOrderNo()); callback.setWmsOutboundNo(order.getOutboundNo()); callback.setStatus("EXCEPTION"); callback.setStatusTime(LocalDateTime.now()); callback.setErrorCode(errorCode); callback.setErrorMessage(errorMessage); sendCallback(callback); } private void sendCallback(OutboundStatusCallback callback) { // 保存回传记录 CallbackRecord record = new CallbackRecord(); record.setCallbackId(callback.getCallbackId()); record.setContent(JSON.toJSONString(callback)); record.setStatus(CallbackStatus.PENDING); callbackRepository.save(record); // 发送消息 rocketMQTemplate.asyncSend("OMS_STATUS_CALLBACK", callback, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { record.setStatus(CallbackStatus.SUCCESS); callbackRepository.save(record); } @Override public void onException(Throwable e) { record.setStatus(CallbackStatus.FAIL); record.setErrorMessage(e.getMessage()); callbackRepository.save(record); // 加入重试队列 retryQueue.add(callback); } }); } } 4.4 OMS接收状态回传 @Service @RocketMQMessageListener( topic = "OMS_STATUS_CALLBACK", consumerGroup = "oms-callback-consumer" ) public class StatusCallbackConsumer implements RocketMQListener<OutboundStatusCallback> { @Override public void onMessage(OutboundStatusCallback callback) { // 1. 幂等检查 if (callbackRepository.existsByCallbackId(callback.getCallbackId())) { return; } // 2. 查找订单 Order order = orderRepository.findByOrderNo(callback.getOmsOrderNo()); if (order == null) { log.error("订单不存在: {}", callback.getOmsOrderNo()); return; } // 3. 更新订单状态 switch (callback.getStatus()) { case "PICKING": order.setStatus(OrderStatus.PICKING); order.setPickStartTime(callback.getStatusTime()); break; case "SHIPPED": order.setStatus(OrderStatus.SHIPPED); order.setShipTime(callback.getStatusTime()); order.setTrackingNo(callback.getTrackingNo()); order.setCarrierCode(callback.getCarrierCode()); // 释放库存预占 inventoryService.releaseReservation(order.getOrderNo()); break; case "EXCEPTION": order.setStatus(OrderStatus.EXCEPTION); order.setExceptionCode(callback.getErrorCode()); order.setExceptionMessage(callback.getErrorMessage()); // 触发异常处理流程 exceptionHandler.handle(order, callback); break; } orderRepository.save(order); // 4. 通知下游(如通知客户) notifyService.notifyOrderStatusChange(order); } } 五、异常处理 5.1 常见异常场景 异常 原因 处理方式 库存不足 WMS实际库存不够 重新分仓或取消 地址异常 地址无法配送 人工处理 SKU不存在 WMS没有该商品 检查主数据 超时未处理 WMS未及时处理 催促或转仓 5.2 异常处理服务 @Service public class IntegrationExceptionHandler { /** * 处理库存不足异常 */ public void handleStockShortage(Order order, OutboundStatusCallback callback) { // 1. 尝试重新分仓 String newWarehouse = fulfillmentRouter.findAlternativeWarehouse(order); if (newWarehouse != null) { // 取消原出库指令 wmsClient.cancelOutbound(order.getOrderNo(), order.getWarehouseId()); // 下发新出库指令 order.setWarehouseId(newWarehouse); outboundCommandService.sendOutboundCommand(buildCommand(order)); log.info("订单{}重新分仓到{}", order.getOrderNo(), newWarehouse); } else { // 无可用仓库,标记异常 order.setStatus(OrderStatus.STOCK_SHORTAGE); orderRepository.save(order); // 通知客服 alertService.notifyCustomerService(order, "库存不足,需人工处理"); } } /** * 处理超时未处理 */ @Scheduled(fixedRate = 300000) // 每5分钟检查 public void checkTimeoutOrders() { // 查找超时订单 LocalDateTime timeout = LocalDateTime.now().minusHours(4); List<Order> timeoutOrders = orderRepository .findByStatusAndCreatedBefore(OrderStatus.PENDING_SHIP, timeout); for (Order order : timeoutOrders) { // 查询WMS状态 OutboundOrder wmsOrder = wmsClient.getOutboundOrder(order.getOrderNo()); if (wmsOrder == null || wmsOrder.getStatus() == OutboundStatus.CREATED) { // WMS未处理,发送催促 alertService.notifyWarehouse(order.getWarehouseId(), "订单" + order.getOrderNo() + "超时未处理"); } } } } 六、接口设计 6.1 REST API接口 @RestController @RequestMapping("/api/wms") public class WmsIntegrationController { /** * OMS调用:下发出库指令 */ @PostMapping("/outbound/command") public Result<Void> sendOutboundCommand(@RequestBody OutboundCommand command) { outboundCommandService.sendOutboundCommand(command); return Result.success(); } /** * OMS调用:取消出库指令 */ @PostMapping("/outbound/cancel") public Result<Void> cancelOutbound(@RequestBody CancelRequest request) { wmsService.cancelOutbound(request.getOrderNo(), request.getReason()); return Result.success(); } /** * OMS调用:查询库存 */ @GetMapping("/inventory") public Result<InventoryVO> queryInventory( @RequestParam String warehouseId, @RequestParam String skuId) { InventoryVO inventory = inventoryService.queryInventory(warehouseId, skuId); return Result.success(inventory); } /** * WMS调用:状态回传 */ @PostMapping("/callback/status") public Result<Void> statusCallback(@RequestBody OutboundStatusCallback callback) { callbackService.handleCallback(callback); return Result.success(); } /** * WMS调用:库存同步 */ @PostMapping("/inventory/sync") public Result<Void> syncInventory(@RequestBody InventorySyncRequest request) { inventorySyncService.sync(request); return Result.success(); } } 6.2 接口幂等设计 @Aspect @Component public class IdempotentAspect { @Around("@annotation(idempotent)") public Object checkIdempotent(ProceedingJoinPoint point, Idempotent idempotent) throws Throwable { // 获取幂等键 String key = getIdempotentKey(point, idempotent); // 检查是否已处理 String result = redisTemplate.opsForValue().get(key); if (result != null) { log.info("重复请求,返回缓存结果: {}", key); return JSON.parseObject(result, point.getSignature().getDeclaringType()); } // 执行业务 Object response = point.proceed(); // 缓存结果 redisTemplate.opsForValue().set(key, JSON.toJSONString(response), idempotent.expireSeconds(), TimeUnit.SECONDS); return response; } } 七、监控与告警 7.1 集成监控指标 指标 说明 告警阈值 指令下发成功率 成功/总数 < 99% 库存同步延迟 同步耗时 > 5秒 状态回传延迟 回传耗时 > 10秒 消息积压数 队列积压 > 1000 7.2 监控实现 @Service public class IntegrationMonitorService { @Scheduled(fixedRate = 60000) public void monitor() { // 检查消息积压 long pendingCount = messageRepository.countPending(); if (pendingCount > 1000) { alertService.sendAlert("消息积压告警", "待处理消息数: " + pendingCount); } // 检查同步延迟 LocalDateTime lastSync = inventorySyncRepository.getLastSyncTime(); if (lastSync.isBefore(LocalDateTime.now().minusMinutes(5))) { alertService.sendAlert("库存同步延迟", "最后同步时间: " + lastSync); } // 检查失败率 long totalCount = callbackRepository.countByHour(); long failCount = callbackRepository.countFailByHour(); double failRate = failCount * 100.0 / totalCount; if (failRate > 1) { alertService.sendAlert("回传失败率告警", String.format("失败率: %.2f%%", failRate)); } } } 八、总结 8.1 集成核心要点 指令下发:幂等、可靠、可追溯 库存同步:实时、准确、有兜底 状态回传:及时、完整、可重试 异常处理:自动重试、人工兜底 8.2 最佳实践 实践 说明 消息队列解耦 异步处理,削峰填谷 幂等设计 防止重复处理 重试机制 失败自动重试 监控告警 及时发现问题 系列文章导航 ...

2026-02-02 · maneng

履约路由与调度——智能选仓选物流

引言:履约路由的价值 履约路由:决定订单从哪个仓库发货、用哪个物流承运商。 好的履约路由能带来: 降低物流成本(选择最优物流) 提升时效(就近发货) 提高客户满意度 均衡仓库负载 差的履约路由会导致: 物流成本高 时效差 某些仓库爆仓,某些仓库闲置 一、履约路由架构 1.1 路由流程 订单 ──> 库存检查 ──> 选仓 ──> 选物流 ──> 生成履约单 ──> 下发WMS │ │ │ │ │ │ ▼ ▼ ▼ 有货仓库 最优仓库 最优物流 1.2 路由因素 因素 说明 权重 库存 仓库是否有货 必要条件 距离 仓库到收货地的距离 影响时效 成本 物流费用 影响利润 时效 物流时效要求 影响体验 仓库负载 仓库当前处理能力 影响发货速度 承运商能力 承运商覆盖范围、服务质量 影响可选项 二、选仓策略 2.1 策略类型 策略 说明 适用场景 就近发货 选择离收货地最近的仓库 时效优先 成本最优 选择物流成本最低的仓库 成本优先 库存均衡 优先选择库存多的仓库 均衡库存 负载均衡 优先选择负载低的仓库 均衡产能 综合评分 多因素加权评分 综合考虑 2.2 就近发货策略 @Component public class NearestWarehouseStrategy implements WarehouseStrategy { @Override public String getStrategyType() { return "NEAREST"; } @Override public String selectWarehouse(Order order, List<WarehouseInventory> candidates) { Address destination = order.getShippingAddress(); return candidates.stream() .filter(w -> w.getAvailableQty() >= getRequiredQty(order, w.getSkuId())) .min(Comparator.comparingDouble(w -> calculateDistance(w.getWarehouseId(), destination))) .map(WarehouseInventory::getWarehouseId) .orElseThrow(() -> new NoAvailableWarehouseException(order.getOrderId())); } /** * 计算仓库到目的地的距离 * 简化版:使用经纬度计算直线距离 */ private double calculateDistance(String warehouseId, Address destination) { Warehouse warehouse = warehouseService.getWarehouse(warehouseId); double lat1 = warehouse.getLatitude(); double lon1 = warehouse.getLongitude(); double lat2 = getLatitude(destination); double lon2 = getLongitude(destination); // Haversine公式计算球面距离 double R = 6371; // 地球半径(公里) double dLat = Math.toRadians(lat2 - lat1); double dLon = Math.toRadians(lon2 - lon1); double a = Math.sin(dLat/2) * Math.sin(dLat/2) + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) * Math.sin(dLon/2) * Math.sin(dLon/2); double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a)); return R * c; } } 2.3 成本最优策略 @Component public class CostOptimalWarehouseStrategy implements WarehouseStrategy { @Override public String getStrategyType() { return "COST_OPTIMAL"; } @Override public String selectWarehouse(Order order, List<WarehouseInventory> candidates) { Address destination = order.getShippingAddress(); double totalWeight = calculateTotalWeight(order); return candidates.stream() .filter(w -> w.getAvailableQty() >= getRequiredQty(order, w.getSkuId())) .min(Comparator.comparingDouble(w -> calculateShippingCost(w.getWarehouseId(), destination, totalWeight))) .map(WarehouseInventory::getWarehouseId) .orElseThrow(() -> new NoAvailableWarehouseException(order.getOrderId())); } /** * 计算物流成本 */ private double calculateShippingCost(String warehouseId, Address destination, double weight) { // 获取该仓库可用的物流商 List<Carrier> carriers = carrierService.getCarriersForWarehouse(warehouseId); // 计算每个物流商的费用,取最低 return carriers.stream() .filter(c -> c.canDeliver(destination)) .mapToDouble(c -> c.calculateFee(warehouseId, destination, weight)) .min() .orElse(Double.MAX_VALUE); } } 2.4 综合评分策略(推荐) @Component public class ScoringWarehouseStrategy implements WarehouseStrategy { // 权重配置 private static final double DISTANCE_WEIGHT = 0.3; private static final double COST_WEIGHT = 0.4; private static final double INVENTORY_WEIGHT = 0.2; private static final double LOAD_WEIGHT = 0.1; @Override public String getStrategyType() { return "SCORING"; } @Override public String selectWarehouse(Order order, List<WarehouseInventory> candidates) { Address destination = order.getShippingAddress(); // 过滤有库存的仓库 List<WarehouseInventory> available = candidates.stream() .filter(w -> w.getAvailableQty() >= getRequiredQty(order, w.getSkuId())) .collect(Collectors.toList()); if (available.isEmpty()) { throw new NoAvailableWarehouseException(order.getOrderId()); } // 计算各维度的归一化值 double maxDistance = available.stream() .mapToDouble(w -> calculateDistance(w.getWarehouseId(), destination)) .max().orElse(1); double maxCost = available.stream() .mapToDouble(w -> calculateShippingCost(w.getWarehouseId(), destination, order)) .max().orElse(1); double maxInventory = available.stream() .mapToDouble(WarehouseInventory::getAvailableQty) .max().orElse(1); double maxLoad = available.stream() .mapToDouble(w -> getWarehouseLoad(w.getWarehouseId())) .max().orElse(1); // 计算综合得分 return available.stream() .max(Comparator.comparingDouble(w -> { double distanceScore = 1 - calculateDistance(w.getWarehouseId(), destination) / maxDistance; double costScore = 1 - calculateShippingCost(w.getWarehouseId(), destination, order) / maxCost; double inventoryScore = w.getAvailableQty() / maxInventory; double loadScore = 1 - getWarehouseLoad(w.getWarehouseId()) / maxLoad; return distanceScore * DISTANCE_WEIGHT + costScore * COST_WEIGHT + inventoryScore * INVENTORY_WEIGHT + loadScore * LOAD_WEIGHT; })) .map(WarehouseInventory::getWarehouseId) .orElseThrow(); } } 三、选物流策略 3.1 物流选择因素 因素 说明 覆盖范围 物流商是否能送达目的地 时效 预计送达时间 费用 物流费用 服务质量 丢包率、破损率、投诉率 追踪能力 是否支持物流追踪 3.2 物流选择实现 @Service public class CarrierSelectionService { /** * 选择最优物流 */ public Carrier selectCarrier(String warehouseId, Address destination, double weight, String serviceLevel) { // 1. 获取仓库可用的物流商 List<Carrier> carriers = carrierService.getCarriersForWarehouse(warehouseId); // 2. 过滤能送达的物流商 List<Carrier> available = carriers.stream() .filter(c -> c.canDeliver(destination)) .filter(c -> c.getMaxWeight() >= weight) .collect(Collectors.toList()); if (available.isEmpty()) { throw new NoAvailableCarrierException(warehouseId, destination); } // 3. 根据服务等级选择 switch (serviceLevel) { case "EXPRESS": // 时效优先 return selectByTimeEfficiency(available, destination); case "ECONOMY": // 成本优先 return selectByCost(available, warehouseId, destination, weight); default: // 综合评分 return selectByScore(available, warehouseId, destination, weight); } } /** * 时效优先选择 */ private Carrier selectByTimeEfficiency(List<Carrier> carriers, Address destination) { return carriers.stream() .min(Comparator.comparingInt(c -> c.getEstimatedDays(destination))) .orElseThrow(); } /** * 成本优先选择 */ private Carrier selectByCost(List<Carrier> carriers, String warehouseId, Address destination, double weight) { return carriers.stream() .min(Comparator.comparingDouble(c -> c.calculateFee(warehouseId, destination, weight))) .orElseThrow(); } /** * 综合评分选择 */ private Carrier selectByScore(List<Carrier> carriers, String warehouseId, Address destination, double weight) { return carriers.stream() .max(Comparator.comparingDouble(c -> { double costScore = 100 - c.calculateFee(warehouseId, destination, weight); double timeScore = 100 - c.getEstimatedDays(destination) * 10; double qualityScore = c.getQualityScore(); // 0-100 return costScore * 0.4 + timeScore * 0.3 + qualityScore * 0.3; })) .orElseThrow(); } } 四、规则引擎 4.1 规则配置 -- 路由规则表 CREATE TABLE t_routing_rule ( id BIGINT PRIMARY KEY AUTO_INCREMENT, rule_name VARCHAR(64) NOT NULL COMMENT '规则名称', rule_type VARCHAR(32) NOT NULL COMMENT '类型:WAREHOUSE/CARRIER', priority INT DEFAULT 0 COMMENT '优先级(越大越优先)', -- 条件 condition_channel VARCHAR(32) COMMENT '渠道条件', condition_country VARCHAR(32) COMMENT '国家条件', condition_sku_category VARCHAR(32) COMMENT 'SKU分类条件', condition_weight_min DECIMAL(10,2) COMMENT '最小重量', condition_weight_max DECIMAL(10,2) COMMENT '最大重量', condition_amount_min DECIMAL(12,2) COMMENT '最小金额', -- 动作 action_warehouse_id VARCHAR(32) COMMENT '指定仓库', action_carrier_id VARCHAR(32) COMMENT '指定物流', action_strategy VARCHAR(32) COMMENT '选择策略', enabled TINYINT DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, KEY idx_type_priority (rule_type, priority DESC) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='路由规则表'; -- 示例规则 INSERT INTO t_routing_rule (rule_name, rule_type, priority, condition_country, action_warehouse_id) VALUES ('美国订单走美西仓', 'WAREHOUSE', 100, 'US', 'WH_US_WEST'), ('欧洲订单走德国仓', 'WAREHOUSE', 100, 'DE,FR,IT,ES', 'WH_DE'); INSERT INTO t_routing_rule (rule_name, rule_type, priority, condition_channel, action_carrier_id) VALUES ('Amazon订单用UPS', 'CARRIER', 100, 'AMAZON', 'UPS'), ('eBay订单用USPS', 'CARRIER', 90, 'EBAY', 'USPS'); 4.2 规则引擎实现 @Service public class RoutingRuleEngine { @Autowired private RoutingRuleRepository ruleRepository; /** * 匹配仓库规则 */ public String matchWarehouseRule(Order order) { List<RoutingRule> rules = ruleRepository.findByTypeAndEnabled("WAREHOUSE", true); // 按优先级排序 rules.sort(Comparator.comparingInt(RoutingRule::getPriority).reversed()); for (RoutingRule rule : rules) { if (matchCondition(rule, order)) { if (rule.getActionWarehouseId() != null) { return rule.getActionWarehouseId(); } if (rule.getActionStrategy() != null) { return rule.getActionStrategy(); // 返回策略名称 } } } return "SCORING"; // 默认使用评分策略 } /** * 匹配物流规则 */ public String matchCarrierRule(Order order, String warehouseId) { List<RoutingRule> rules = ruleRepository.findByTypeAndEnabled("CARRIER", true); rules.sort(Comparator.comparingInt(RoutingRule::getPriority).reversed()); for (RoutingRule rule : rules) { if (matchCondition(rule, order)) { if (rule.getActionCarrierId() != null) { return rule.getActionCarrierId(); } } } return null; // 无匹配规则,使用默认策略 } /** * 条件匹配 */ private boolean matchCondition(RoutingRule rule, Order order) { // 渠道匹配 if (rule.getConditionChannel() != null && !rule.getConditionChannel().contains(order.getChannel())) { return false; } // 国家匹配 if (rule.getConditionCountry() != null) { String country = order.getShippingAddress().getCountryCode(); if (!rule.getConditionCountry().contains(country)) { return false; } } // 重量匹配 double weight = calculateWeight(order); if (rule.getConditionWeightMin() != null && weight < rule.getConditionWeightMin()) { return false; } if (rule.getConditionWeightMax() != null && weight > rule.getConditionWeightMax()) { return false; } // 金额匹配 if (rule.getConditionAmountMin() != null && order.getTotalAmount().compareTo(rule.getConditionAmountMin()) < 0) { return false; } return true; } } 五、履约调度服务 5.1 完整调度流程 @Service @Slf4j public class FulfillmentRoutingService { @Autowired private RoutingRuleEngine ruleEngine; @Autowired private Map<String, WarehouseStrategy> warehouseStrategies; @Autowired private CarrierSelectionService carrierSelectionService; /** * 订单履约路由 */ @Transactional public FulfillmentOrder route(Order order) { log.info("开始履约路由: orderId={}", order.getOrderId()); // 1. 匹配仓库规则 String warehouseResult = ruleEngine.matchWarehouseRule(order); String warehouseId; if (isWarehouseId(warehouseResult)) { // 规则指定了具体仓库 warehouseId = warehouseResult; log.info("规则指定仓库: {}", warehouseId); } else { // 规则指定了策略,执行策略选仓 WarehouseStrategy strategy = warehouseStrategies.get(warehouseResult); List<WarehouseInventory> candidates = getWarehouseCandidates(order); warehouseId = strategy.selectWarehouse(order, candidates); log.info("策略选仓: strategy={}, warehouse={}", warehouseResult, warehouseId); } // 2. 匹配物流规则 String carrierId = ruleEngine.matchCarrierRule(order, warehouseId); if (carrierId == null) { // 无规则匹配,使用策略选物流 Carrier carrier = carrierSelectionService.selectCarrier( warehouseId, order.getShippingAddress(), calculateWeight(order), order.getServiceLevel() ); carrierId = carrier.getCarrierId(); } log.info("选择物流: {}", carrierId); // 3. 创建履约订单 FulfillmentOrder fulfillmentOrder = new FulfillmentOrder(); fulfillmentOrder.setFulfillmentOrderId(generateFulfillmentOrderId()); fulfillmentOrder.setSourceOrderId(order.getOrderId()); fulfillmentOrder.setWarehouseId(warehouseId); fulfillmentOrder.setCarrierId(carrierId); fulfillmentOrder.setShippingAddress(order.getShippingAddress()); fulfillmentOrder.setItems(convertItems(order.getItems())); fulfillmentOrder.setStatus(FulfillmentStatus.PENDING); fulfillmentOrderRepository.save(fulfillmentOrder); log.info("履约路由完成: orderId={}, fulfillmentOrderId={}, warehouse={}, carrier={}", order.getOrderId(), fulfillmentOrder.getFulfillmentOrderId(), warehouseId, carrierId); return fulfillmentOrder; } } 六、总结 6.1 核心要点 选仓策略:就近、成本最优、综合评分 选物流策略:时效优先、成本优先、综合评分 规则引擎:支持灵活配置路由规则 评分模型:多因素加权,可调整权重 6.2 实施建议 先实现基础的就近发货策略 再实现规则引擎,支持特殊场景 最后实现综合评分策略 持续优化权重参数 系列文章导航 ...

2026-01-29 · maneng

库存预占与释放——防止超卖的核心机制

引言:超卖的代价 超卖:卖出的数量超过实际库存,导致无法发货。 超卖的后果: 客户投诉、差评 平台处罚(Amazon可能封店) 紧急采购成本高 品牌信誉受损 防止超卖的核心:库存预占机制。 一、库存模型设计 1.1 库存类型 ┌─────────────────────────────────────────────────────┐ │ 库存类型 │ ├─────────────────────────────────────────────────────┤ │ 实物库存 = WMS系统中的实际库存数量 │ │ │ │ 可售库存 = 实物库存 - 预占库存 - 锁定库存 │ │ │ │ 预占库存 = 订单已预占但未发货的库存 │ │ │ │ 锁定库存 = 因其他原因锁定的库存(盘点、质量问题等) │ │ │ │ 在途库存 = 采购已下单但未入库的库存 │ └─────────────────────────────────────────────────────┘ 1.2 库存计算公式 可售库存 = 实物库存 - 预占库存 - 锁定库存 其中: - 实物库存:从WMS同步 - 预占库存:OMS计算(订单预占) - 锁定库存:手动锁定或系统锁定 1.3 数据模型 -- SKU库存汇总表(OMS) CREATE TABLE t_sku_inventory ( id BIGINT PRIMARY KEY AUTO_INCREMENT, sku_id VARCHAR(32) NOT NULL COMMENT 'SKU编码', warehouse_id VARCHAR(32) NOT NULL COMMENT '仓库ID', physical_qty INT NOT NULL DEFAULT 0 COMMENT '实物库存(从WMS同步)', reserved_qty INT NOT NULL DEFAULT 0 COMMENT '预占库存', locked_qty INT NOT NULL DEFAULT 0 COMMENT '锁定库存', available_qty INT NOT NULL DEFAULT 0 COMMENT '可售库存(计算字段)', in_transit_qty INT NOT NULL DEFAULT 0 COMMENT '在途库存', version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本', updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id), KEY idx_sku_id (sku_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='SKU库存汇总表'; -- 库存预占明细表 CREATE TABLE t_inventory_reservation ( id BIGINT PRIMARY KEY AUTO_INCREMENT, reservation_id VARCHAR(32) NOT NULL COMMENT '预占ID', order_id VARCHAR(32) NOT NULL COMMENT '订单号', sku_id VARCHAR(32) NOT NULL COMMENT 'SKU编码', warehouse_id VARCHAR(32) NOT NULL COMMENT '仓库ID', reserved_qty INT NOT NULL COMMENT '预占数量', status VARCHAR(16) NOT NULL DEFAULT 'RESERVED' COMMENT '状态:RESERVED/RELEASED/DEDUCTED', reserved_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '预占时间', released_at DATETIME COMMENT '释放时间', UNIQUE KEY uk_reservation_id (reservation_id), KEY idx_order_id (order_id), KEY idx_sku_warehouse (sku_id, warehouse_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存预占明细表'; 二、预占流程设计 2.1 预占时机 下单 ──> 支付 ──> 审核 ──> 拆单 ──> 下发WMS ──> 发货 │ │ └──────── 预占库存 ─────────────────>│ │ 扣减库存 两种预占策略: ...

2026-01-29 · maneng

订单拆分与合并策略——复杂场景的处理逻辑

引言:为什么需要拆单和合单 拆单场景: 一个订单的商品分布在不同仓库,需要从多个仓库发货 部分商品有货,部分商品需要预售 订单重量超过物流限制,需要拆成多个包裹 合单场景: 同一客户短时间内下了多个订单,可以合并发货节省运费 促销活动中的凑单场景 这些逻辑看似简单,但实现起来非常复杂,涉及库存、物流、财务等多个方面。 一、订单模型设计 1.1 三层订单模型 ┌─────────────────────────────────────────────────────┐ │ 原始订单 │ │ (Source Order) │ │ 客户下单时的原始订单 │ └─────────────────────┬───────────────────────────────┘ │ 拆分/合并 ▼ ┌─────────────────────────────────────────────────────┐ │ 履约订单 │ │ (Fulfillment Order) │ │ 实际执行发货的订单单元 │ └─────────────────────┬───────────────────────────────┘ │ 下发 ▼ ┌─────────────────────────────────────────────────────┐ │ 出库单 │ │ (Outbound Order) │ │ WMS执行的出库指令 │ └─────────────────────────────────────────────────────┘ 1.2 数据模型 /** * 原始订单 */ @Data public class SourceOrder { private String sourceOrderId; // 原始订单号 private String channelOrderId; // 渠道订单号 private String channel; // 渠道 private String buyerId; // 买家ID private Address shippingAddress; // 收货地址 private List<SourceOrderItem> items; // 订单明细 private Money totalAmount; // 订单总额 private OrderStatus status; // 状态 } /** * 履约订单 */ @Data public class FulfillmentOrder { private String fulfillmentOrderId; // 履约订单号 private String sourceOrderId; // 关联原始订单号 private String warehouseId; // 发货仓库 private String carrierId; // 承运商 private Address shippingAddress; // 收货地址 private List<FulfillmentOrderItem> items; // 履约明细 private Money shippingFee; // 运费 private FulfillmentStatus status; // 状态 } /** * 订单关系表 */ @Data public class OrderRelation { private String sourceOrderId; // 原始订单号 private String fulfillmentOrderId; // 履约订单号 private String relationType; // 关系类型:SPLIT/MERGE } 1.3 数据库设计 -- 原始订单表 CREATE TABLE t_source_order ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source_order_id VARCHAR(32) NOT NULL, channel_order_id VARCHAR(64), channel VARCHAR(32), buyer_id VARCHAR(64), total_amount DECIMAL(12,2), currency VARCHAR(8), status VARCHAR(32), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_source_order_id (source_order_id) ); -- 履约订单表 CREATE TABLE t_fulfillment_order ( id BIGINT PRIMARY KEY AUTO_INCREMENT, fulfillment_order_id VARCHAR(32) NOT NULL, source_order_id VARCHAR(32) NOT NULL, warehouse_id VARCHAR(32), carrier_id VARCHAR(32), shipping_fee DECIMAL(12,2), status VARCHAR(32), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_fulfillment_order_id (fulfillment_order_id), KEY idx_source_order_id (source_order_id) ); -- 订单关系表 CREATE TABLE t_order_relation ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source_order_id VARCHAR(32) NOT NULL, fulfillment_order_id VARCHAR(32) NOT NULL, relation_type VARCHAR(16) NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, KEY idx_source_order_id (source_order_id), KEY idx_fulfillment_order_id (fulfillment_order_id) ); 二、拆单策略 2.1 拆单场景分类 场景 触发条件 处理方式 多仓拆单 商品分布在不同仓库 按仓库拆分 预售拆单 部分商品无货 有货先发,无货等待 超重拆单 包裹超过物流限重 按重量拆分 赠品拆单 赠品从不同仓库发 赠品单独发货 组合商品拆单 组合商品需要拆开 拆成单品发货 2.2 多仓拆单 场景:订单包含SKU-A和SKU-B,SKU-A在深圳仓,SKU-B在义乌仓 ...

2026-01-29 · maneng

多渠道订单接入——Amazon、eBay、独立站统一处理

引言:多渠道的挑战 跨境电商通常在多个平台销售: Amazon(美国、欧洲、日本) eBay Shopify独立站 速卖通 Wish TikTok Shop 每个渠道的订单格式和API都不同,如果不做统一处理: 每个渠道单独处理,代码重复 下游系统(WMS、TMS)要对接多种格式 数据分析困难 解决方案:设计统一的订单模型,通过适配器模式对接各渠道。 一、各渠道订单差异分析 1.1 Amazon订单特点 订单结构: { "AmazonOrderId": "111-1234567-1234567", "PurchaseDate": "2024-01-29T10:00:00Z", "OrderStatus": "Unshipped", "FulfillmentChannel": "MFN", "SalesChannel": "Amazon.com", "OrderTotal": { "CurrencyCode": "USD", "Amount": "99.99" }, "ShippingAddress": { "Name": "John Doe", "AddressLine1": "123 Main St", "City": "Seattle", "StateOrRegion": "WA", "PostalCode": "98101", "CountryCode": "US" } } 特殊点: FBA/FBM区分 多站点(US、UK、DE、JP等) 订单状态较多 有Prime标识 1.2 eBay订单特点 订单结构: { "orderId": "12-12345-12345", "creationDate": "2024-01-29T10:00:00.000Z", "orderFulfillmentStatus": "NOT_STARTED", "pricingSummary": { "total": { "value": "99.99", "currency": "USD" } }, "fulfillmentStartInstructions": [{ "shippingStep": { "shipTo": { "fullName": "John Doe", "contactAddress": { "addressLine1": "123 Main St", "city": "Seattle", "stateOrProvince": "WA", "postalCode": "98101", "countryCode": "US" } } } }] } 特殊点: ...

2026-01-29 · maneng

哪些系统适合自研?——跨境电商的特殊性分析

引言:不是所有系统都要自研 上一篇我们讲了自研vs采购的决策框架。但具体到跨境电商,哪些系统适合自研? 跨境电商有其特殊性: 多渠道、多币种、多仓库 复杂的订单逻辑 特殊的仓储需求 跨境合规要求 这些特殊性决定了,有些系统必须自研,有些系统采购更划算。 一、跨境电商的业务特殊性 1.1 多渠道复杂性 渠道类型: 平台渠道:Amazon、eBay、Wish、速卖通 独立站:Shopify、自建站 社交电商:TikTok Shop、Facebook Shop 每个渠道的差异: 渠道 订单结构 API方式 结算周期 特殊规则 Amazon 复杂 REST API 14天 FBA/FBM eBay 中等 REST API 即时 拍卖模式 Shopify 简单 GraphQL 即时 Webhook 速卖通 复杂 REST API 15天 纠纷规则 挑战: 每个渠道的订单格式不同 每个渠道的API不同 每个渠道的规则不同 需要统一管理 1.2 多币种复杂性 涉及的币种: 销售币种:USD、EUR、GBP、JPY、AUD… 采购币种:CNY、USD 结算币种:USD、CNY 挑战: 汇率波动影响利润 多币种成本核算 汇兑损益处理 1.3 多仓库复杂性 仓库类型: 国内仓:深圳、义乌 海外仓:美国、欧洲、日本 FBA仓:亚马逊仓库 第三方仓:海外第三方仓 挑战: 库存分布在多个仓库 不同仓库的操作流程不同 跨仓调拨 库存同步 1.4 订单逻辑复杂性 复杂场景: ...

2026-01-29 · maneng

OMS订单系统自研实战——架构设计与核心模块

引言:为什么OMS是核心 在跨境电商的系统矩阵中,OMS(Order Management System)是绝对的核心: 连接前端:对接Amazon、eBay、独立站等多个销售渠道 连接后端:驱动WMS仓储、TMS物流、ERP财务 数据中枢:订单数据是最有价值的业务数据 OMS做得好不好,直接决定了整个数字化转型的成败。 本文将详细讲解如何从0到1自研一套OMS订单系统。 一、OMS系统定位 1.1 OMS在系统矩阵中的位置 ┌─────────────────────────────────────────────────────┐ │ 销售渠道 │ │ Amazon │ eBay │ Shopify │ 独立站 │ ... │ └─────────────────────┬───────────────────────────────┘ │ 订单 ▼ ┌─────────────────────────────────────────────────────┐ │ OMS │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │订单接入 │ │订单处理 │ │库存管理 │ │履约调度 │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └───────┬─────────────┬─────────────┬─────────────────┘ │ │ │ ▼ ▼ ▼ ┌───────┐ ┌───────┐ ┌───────┐ │ WMS │ │ TMS │ │ ERP │ │ 仓储 │ │ 物流 │ │ 财务 │ └───────┘ └───────┘ └───────┘ 1.2 OMS的核心职责 职责 说明 订单接入 从各渠道获取订单,统一格式 订单处理 审核、拆分、合并、取消 库存管理 可售库存计算、库存预占 履约调度 选仓、选物流、下发执行 状态管理 订单全生命周期状态跟踪 异常处理 缺货、超时、取消等异常 1.3 OMS与其他系统的边界 功能 OMS WMS TMS ERP 订单创建 ✓ 库存预占 ✓ 实物库存 ✓ 拣货发货 ✓ 物流跟踪 ✓ 财务核算 ✓ 二、架构设计 2.1 整体架构 ┌─────────────────────────────────────────────────────┐ │ 接入层 │ │ Amazon适配器 │ eBay适配器 │ Shopify适配器 │ ... │ └─────────────────────┬───────────────────────────────┘ │ ┌─────────────────────▼───────────────────────────────┐ │ 服务层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 订单服务 │ │ 库存服务 │ │ 履约服务 │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 规则引擎 │ │ 消息服务 │ │ 调度服务 │ │ │ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────┬───────────────────────────────┘ │ ┌─────────────────────▼───────────────────────────────┐ │ 数据层 │ │ MySQL │ Redis │ RocketMQ │ Elasticsearch │ └─────────────────────────────────────────────────────┘ 2.2 核心服务拆分 订单服务(Order Service): ...

2026-01-29 · maneng

OMS-WMS集成:从订单到出库的无缝衔接

引言 OMS与WMS的集成是供应链系统的核心环节,直接影响订单履约效率。本文将深入探讨OMS-WMS集成的方案设计和技术实现。 1. 集成场景 1.1 出库场景 流程: 1. OMS推送出库指令 ↓ 2. WMS创建出库单 ↓ 3. WMS拣货打包 ↓ 4. WMS出库确认 ↓ 5. WMS回调OMS(运单号、发货时间) 1.2 入库场景 采购入库: 采购系统 → OMS → WMS创建入库单 退货入库: OMS创建退货单 → WMS验收入库 2. 接口设计 2.1 出库指令接口 接口定义: POST /api/wms/outbound/create Content-Type: application/json 请求体: { "orderNo": "OMS202511220001", "warehouseCode": "WH001", "priority": "HIGH/NORMAL/LOW", "timeRequirement": "STANDARD/EXPRESS", "items": [ { "sku": "SKU001", "quantity": 2, "batchNo": "BATCH001" } ], "consignee": { "name": "张三", "phone": "13800138000", "province": "北京市", "city": "北京市", "district": "朝阳区", "address": "XX街道XX号" } } 响应体: { "code": 200, "message": "success", "data": { "outboundNo": "WMS202511220001", "status": "PENDING" } } 2.2 出库确认回调 接口定义: ...

2025-11-22 · maneng

供应链系统集成全景图:打通OMS、WMS、TMS、库存的完整方案

引言 供应链系统集成是打通各个独立系统,实现数据互通和业务协同的关键。本文将系统化介绍供应链集成的架构设计和技术实现。 1. 供应链系统集成概述 1.1 什么是供应链集成 定义:将独立的供应链系统有机整合,实现数据互通和业务协同。 核心目标: 消除信息孤岛 提升协同效率 降低人工成本 1.2 集成的价值 效率提升: 订单处理时间缩短50%+ 人工录入减少80%+ 数据准确率提升至99%+ 成本降低: 人工成本降低30% 库存成本降低20% 运输成本降低15% 体验改善: 订单全程可视化 实时库存查询 配送状态透明化 2. 供应链系统关系图 ┌─────────────────────────────────────────────┐ │ 供应链系统全景 │ ├─────────────────────────────────────────────┤ │ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ OMS │───→│ WMS │───→│ TMS │ │ │ │订单管理 │ │仓储管理 │ │运输管理 │ │ │ └────────┘ └────────┘ └────────┘ │ │ ↓ ↓ ↓ │ │ ┌──────────────────────────────────────┐ │ │ │ 库存中心 │ │ │ │ 实时查询 | 预占扣减 | 数据同步 │ │ │ └──────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────┐ │ │ │ ERP财务系统 │ │ │ │ 成本核算 | 账单结算 | 财务报表 │ │ │ └──────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 系统职责: ...

2025-11-22 · maneng

OMS未来趋势:智能化与自动化的演进之路

引言 从最早的人工记录订单,到Excel管理订单,再到今天的OMS系统,订单管理已经经历了数次技术革命。然而,随着人工智能、大数据、云计算等技术的成熟,OMS系统正站在新一轮变革的起点。 未来的OMS系统将不再是简单的订单管理工具,而是具备自主决策、智能优化、预测分析能力的智能大脑。本文将探讨OMS系统的未来发展趋势,展望智能化与自动化的演进之路。 OMS智能化发展趋势 智能化演进路线图 OMS智能化演进路线(3个阶段): 第一阶段:规则驱动(Rule-Based) - 特点:基于预设规则执行 - 案例:订单超时自动取消、库存不足自动下架 - 局限:规则固定,无法应对复杂场景 第二阶段:数据驱动(Data-Driven) - 特点:基于历史数据分析决策 - 案例:根据历史销量预测库存、根据用户行为推荐商品 - 优势:决策更准确,但需要大量数据 第三阶段:智能驱动(AI-Driven) - 特点:机器学习模型自主决策 - 案例:智能路由、智能定价、智能客服 - 优势:持续优化,自适应环境变化 智能化能力框架 class IntelligentOMSFramework: """智能化OMS框架""" def __init__(self): self.capabilities = { # 1. 智能预测 'prediction': { 'order_volume_forecast': '订单量预测', 'demand_forecast': '需求预测', 'return_rate_prediction': '退货率预测', 'delivery_time_prediction': '配送时效预测' }, # 2. 智能决策 'decision': { 'smart_routing': '智能路由', 'dynamic_pricing': '动态定价', 'auto_approval': '智能审核', 'risk_control': '风险控制' }, # 3. 智能优化 'optimization': { 'inventory_optimization': '库存优化', 'logistics_optimization': '物流优化', 'resource_allocation': '资源分配优化', 'cost_optimization': '成本优化' }, # 4. 智能交互 'interaction': { 'smart_customer_service': '智能客服', 'voice_assistant': '语音助手', 'chatbot': '聊天机器人', 'auto_reply': '自动回复' } } AI在订单处理中的应用 订单量预测 class OrderVolumeForecastModel: """订单量预测模型""" def __init__(self): # 使用时间序列模型(如LSTM) self.model = self._build_lstm_model() def predict_next_week(self, historical_data): """ 预测未来7天的订单量 输入特征: - 历史订单量(过去30天) - 星期几(1-7) - 是否节假日 - 促销活动 - 天气数据 - 历史同期数据(去年同期) 输出: - 未来7天每天的预测订单量 - 预测置信区间 """ # 1. 特征工程 features = self._extract_features(historical_data) # 2. 模型预测 predictions = self.model.predict(features) # 3. 后处理(平滑、异常检测) smoothed_predictions = self._smooth_predictions(predictions) return ForecastResult( predictions=smoothed_predictions, confidence_interval=self._calculate_confidence_interval( predictions ), influential_factors=self._analyze_influential_factors(features) ) def _extract_features(self, data): """特征提取""" features = [] for date in data.dates: feature = { # 时间特征 'day_of_week': date.weekday(), 'day_of_month': date.day, 'month': date.month, 'is_weekend': date.weekday() >= 5, 'is_holiday': self._is_holiday(date), # 历史订单量 'orders_last_7days': data.get_orders_range(date, -7, 0), 'orders_last_30days': data.get_orders_range(date, -30, 0), 'orders_same_day_last_year': data.get_orders_same_day_last_year(date), # 促销活动 'has_promotion': self._has_promotion(date), 'promotion_intensity': self._get_promotion_intensity(date), # 外部因素 'weather': self._get_weather(date), 'temperature': self._get_temperature(date) } features.append(feature) return features def optimize_inventory(self, forecast_result): """ 根据预测结果优化库存 策略: 1. 预测订单量 × 安全系数 = 建议备货量 2. 考虑库存周转率 3. 考虑资金占用成本 """ recommended_inventory = {} for sku_id, predicted_sales in forecast_result.items(): # 计算建议备货量 safety_factor = 1.2 # 安全系数20% recommended_qty = int(predicted_sales * safety_factor) # 考虑库存周转率 turnover_rate = self._get_turnover_rate(sku_id) if turnover_rate < 0.5: # 周转率低于50% recommended_qty = int(recommended_qty * 0.8) # 减少20% recommended_inventory[sku_id] = recommended_qty return recommended_inventory 智能路由算法优化 class AIRoutingOptimizer: """AI智能路由优化器""" def __init__(self): # 使用强化学习模型(如DQN) self.model = self._build_dqn_model() # 定义状态空间 self.state_space = [ 'order_value', # 订单价值 'user_level', # 用户等级 'delivery_distance', # 配送距离 'warehouse_load', # 仓库负载 'inventory_level', # 库存水位 'weather_condition', # 天气状况 'time_of_day' # 时段 ] # 定义动作空间(选择哪个仓库) self.action_space = self._get_available_warehouses() def route(self, order): """ 使用强化学习模型进行路由决策 强化学习框架: - 状态(State):订单特征、仓库状态 - 动作(Action):选择哪个仓库 - 奖励(Reward):综合成本、时效、用户满意度 """ # 1. 构建当前状态 state = self._build_state(order) # 2. 模型预测(选择动作) warehouse_scores = self.model.predict(state) # 3. 选择得分最高的仓库 best_warehouse_id = np.argmax(warehouse_scores) # 4. 执行路由 routing_result = self._execute_routing( order, best_warehouse_id ) # 5. 记录结果(用于模型训练) self._record_routing_result( state, best_warehouse_id, routing_result ) return routing_result def calculate_reward(self, routing_result): """ 计算奖励(用于模型训练) 奖励函数: reward = -cost × w1 + satisfaction × w2 - delivery_time × w3 权重配置: - w1 = 0.3(成本) - w2 = 0.5(满意度) - w3 = 0.2(时效) """ # 成本归一化(0-1) cost_normalized = routing_result.cost / 100 # 用户满意度(基于实际配送时间 vs 预期) satisfaction = 1.0 if routing_result.on_time else 0.5 # 配送时间归一化 delivery_time_normalized = routing_result.delivery_hours / 72 # 计算综合奖励 reward = ( -cost_normalized * 0.3 + satisfaction * 0.5 + -delivery_time_normalized * 0.2 ) return reward def train(self, episodes=10000): """ 训练路由模型 使用历史订单数据进行离线训练 """ for episode in range(episodes): # 1. 采样历史订单 historical_orders = self._sample_historical_orders(batch_size=32) # 2. 执行路由决策 for order in historical_orders: state = self._build_state(order) action = self.model.predict(state) routing_result = self._execute_routing(order, action) # 3. 计算奖励 reward = self.calculate_reward(routing_result) # 4. 更新模型 next_state = self._build_state(order, after_routing=True) self.model.update(state, action, reward, next_state) # 5. 定期评估模型 if episode % 100 == 0: evaluation_result = self._evaluate_model() print(f"Episode {episode}, Avg Reward: {evaluation_result}") 智能客服与售后处理 class IntelligentCustomerService: """智能客服系统""" def __init__(self): # NLP模型(意图识别、实体抽取) self.nlp_model = self._load_nlp_model() # 知识库 self.knowledge_base = self._load_knowledge_base() # 对话管理器 self.dialogue_manager = DialogueManager() def handle_customer_query(self, user_id, query): """ 处理用户咨询 流程: 1. 意图识别(退货、物流查询、商品咨询等) 2. 实体抽取(订单号、商品名称等) 3. 知识库检索 4. 生成回复 5. 判断是否需要转人工 """ # 1. 意图识别 intent = self.nlp_model.classify_intent(query) # 2. 实体抽取 entities = self.nlp_model.extract_entities(query) # 3. 根据意图处理 if intent == 'track_order': return self._handle_tracking_query(user_id, entities) elif intent == 'apply_refund': return self._handle_refund_request(user_id, entities) elif intent == 'product_inquiry': return self._handle_product_inquiry(entities) elif intent == 'complaint': # 投诉类问题直接转人工 return self._transfer_to_human_agent(user_id, query) else: # 未识别意图,从知识库搜索 return self._search_knowledge_base(query) def _handle_tracking_query(self, user_id, entities): """处理物流查询""" # 提取订单号 order_id = entities.get('order_id') if not order_id: # 没有订单号,列出用户最近订单 recent_orders = self._get_recent_orders(user_id, limit=5) return { 'type': 'order_list', 'message': '请问您要查询哪个订单的物流?', 'orders': recent_orders } # 查询物流信息 tracking_info = self._get_tracking_info(order_id) return { 'type': 'tracking_info', 'message': self._format_tracking_message(tracking_info), 'tracking_info': tracking_info } def _handle_refund_request(self, user_id, entities): """处理退货申请""" order_id = entities.get('order_id') reason = entities.get('refund_reason') # 检查订单是否可退货 order = self._get_order(order_id) if not self._can_refund(order): return { 'type': 'refund_rejected', 'message': '抱歉,该订单不满足退货条件。', 'reason': self._get_refund_rejection_reason(order) } # 自动创建退货单 refund = self._create_refund_request( order_id=order_id, reason=reason, auto_approved=True ) return { 'type': 'refund_approved', 'message': f'退货申请已提交,预计3-5个工作日退款到账。', 'refund': refund } def auto_approve_refund(self, refund_request): """ 智能审核退货申请 规则: 1. 用户信誉良好(退货率<5%) 2. 订单金额<500元 3. 退货原因合理 4. 有完整凭证 → 自动通过 """ # 1. 检查用户信誉 user_credit_score = self._calculate_user_credit( refund_request.user_id ) if user_credit_score < 60: return AutoApprovalResult( approved=False, reason='用户信誉评分过低,需人工审核' ) # 2. 检查订单金额 order = self._get_order(refund_request.order_id) if order.total_amount > 500: return AutoApprovalResult( approved=False, reason='订单金额超过阈值,需人工审核' ) # 3. 检查退货原因 if refund_request.reason in ['QUALITY_ISSUE', 'DAMAGED']: # 质量问题,检查凭证 if not refund_request.evidence: return AutoApprovalResult( approved=False, reason='缺少质量问题凭证' ) # 4. 自动通过 return AutoApprovalResult( approved=True, reason='自动审核通过' ) 大数据分析在OMS中的应用 订单数据分析平台 class OrderDataAnalyticsPlatform: """订单数据分析平台""" def __init__(self): # 数据仓库连接 self.dw_client = DataWarehouseClient() # Spark分析引擎 self.spark = SparkSession.builder.appName("OMS Analytics").getOrCreate() def analyze_user_behavior(self, start_date, end_date): """ 用户行为分析 指标: 1. RFM模型(最近购买、频率、金额) 2. 用户生命周期价值(LTV) 3. 复购率 4. 流失率 """ # 1. 加载订单数据 orders_df = self.spark.read.parquet( f"s3://data-warehouse/orders/date={start_date}..{end_date}" ) # 2. RFM分析 rfm_df = orders_df.groupBy("user_id").agg( F.max("order_date").alias("recency"), # 最近购买 F.count("order_id").alias("frequency"), # 购买频率 F.sum("total_amount").alias("monetary") # 购买金额 ) # 3. RFM评分(1-5分) rfm_scored_df = rfm_df.withColumn( "r_score", F.ntile(5).over(Window.orderBy(F.desc("recency"))) ).withColumn( "f_score", F.ntile(5).over(Window.orderBy(F.desc("frequency"))) ).withColumn( "m_score", F.ntile(5).over(Window.orderBy(F.desc("monetary"))) ) # 4. 用户分层 user_segments = rfm_scored_df.withColumn( "segment", F.when( (F.col("r_score") >= 4) & (F.col("f_score") >= 4), "重要价值客户" ).when( (F.col("r_score") >= 4) & (F.col("f_score") < 4), "重要发展客户" ).when( (F.col("r_score") < 4) & (F.col("f_score") >= 4), "重要保持客户" ).otherwise("一般客户") ) return user_segments def predict_churn_probability(self, user_id): """ 预测用户流失概率 特征: - 最近购买时间(天) - 购买频率 - 平均订单金额 - 最近3次订单间隔 - 客服咨询次数 - 退货率 """ # 1. 提取特征 features = self._extract_churn_features(user_id) # 2. 模型预测 churn_probability = self.churn_model.predict_proba(features)[0][1] # 3. 制定挽留策略 if churn_probability > 0.7: # 高风险用户,发放优惠券 strategy = { 'action': 'send_coupon', 'coupon_amount': 50, 'validity_days': 7 } elif churn_probability > 0.5: # 中风险用户,推送个性化推荐 strategy = { 'action': 'personalized_recommendation', 'products': self._get_recommended_products(user_id) } else: strategy = None return ChurnPredictionResult( user_id=user_id, churn_probability=churn_probability, retention_strategy=strategy ) def analyze_sales_trend(self): """销售趋势分析""" # 使用Prophet进行时间序列预测 from fbprophet import Prophet # 1. 准备历史数据 historical_sales = self._get_historical_sales_data() df = pd.DataFrame({ 'ds': historical_sales['date'], 'y': historical_sales['sales'] }) # 2. 训练模型 model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False ) model.fit(df) # 3. 预测未来30天 future = model.make_future_dataframe(periods=30) forecast = model.predict(future) return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']] 实时数据大屏 class RealTimeDashboard: """实时数据大屏""" def get_realtime_metrics(self): """ 获取实时指标(每秒更新) 指标: 1. 实时订单量(今日/当前小时) 2. 实时GMV 3. 实时转化率 4. TOP商品 5. 区域分布 6. 异常告警 """ now = datetime.now() # 从Redis获取实时数据 metrics = { # 今日订单量 'today_orders': self._get_today_orders(), # 当前小时订单量 'current_hour_orders': self._get_current_hour_orders(), # 今日GMV 'today_gmv': self._get_today_gmv(), # 实时转化率 'conversion_rate': self._calculate_conversion_rate(), # TOP10商品 'top_products': self._get_top_products(limit=10), # 区域分布 'regional_distribution': self._get_regional_distribution(), # 异常订单数 'abnormal_orders': self._get_abnormal_orders(), # 系统健康度 'system_health': self._check_system_health() } return metrics def _get_today_orders(self): """获取今日订单量(从Redis)""" today = datetime.now().strftime("%Y%m%d") key = f"metrics:orders:daily:{today}" return int(self.redis.get(key) or 0) def _calculate_conversion_rate(self): """计算实时转化率""" # 浏览量 pv = self._get_today_pv() # 订单量 orders = self._get_today_orders() # 转化率 conversion_rate = orders / pv if pv > 0 else 0 return round(conversion_rate * 100, 2) 云原生OMS:SaaS化趋势 SaaS化架构设计 class SaaSOMSArchitecture: """SaaS化OMS架构""" def __init__(self): self.architecture = { # 1. 多租户隔离 'multi_tenancy': { 'data_isolation': '数据隔离(Schema隔离)', 'resource_isolation': '资源隔离(Namespace隔离)', 'performance_isolation': '性能隔离(资源配额)' }, # 2. 弹性伸缩 'auto_scaling': { 'horizontal_scaling': '水平扩展(Pod自动伸缩)', 'vertical_scaling': '垂直扩展(资源动态调整)', 'scheduled_scaling': '定时伸缩(大促预热)' }, # 3. 配置管理 'configuration': { 'centralized_config': '集中配置(Apollo/Nacos)', 'feature_toggle': '功能开关(灰度发布)', 'tenant_customization': '租户定制化配置' }, # 4. 计费系统 'billing': { 'usage_based': '按用量计费', 'subscription': '订阅制', 'tiered_pricing': '阶梯定价' } } def isolate_tenant_data(self, tenant_id): """ 多租户数据隔离 方案1:独立数据库(隔离性最好,成本最高) 方案2:共享数据库,独立Schema(平衡方案) 方案3:共享Schema,数据行级隔离(成本最低) """ # 推荐方案:共享数据库 + 独立Schema schema_name = f"tenant_{tenant_id}" # 动态切换数据源 datasource = DataSourceContextHolder.getDataSource(schema_name) return datasource def calculate_billing(self, tenant_id, billing_period): """ 计费计算 计费维度: 1. 订单量(按单收费) 2. 存储空间(按GB收费) 3. API调用量(按次收费) 4. 增值服务(按功能收费) """ # 1. 订单量费用 order_count = self._get_order_count(tenant_id, billing_period) order_fee = self._calculate_order_fee(order_count) # 2. 存储费用 storage_size = self._get_storage_size(tenant_id) storage_fee = self._calculate_storage_fee(storage_size) # 3. API调用费用 api_calls = self._get_api_calls(tenant_id, billing_period) api_fee = self._calculate_api_fee(api_calls) # 4. 增值服务费用 addon_fees = self._calculate_addon_fees(tenant_id) # 5. 总费用 total_fee = order_fee + storage_fee + api_fee + addon_fees return BillingResult( tenant_id=tenant_id, billing_period=billing_period, order_fee=order_fee, storage_fee=storage_fee, api_fee=api_fee, addon_fees=addon_fees, total_fee=total_fee ) 云原生部署 # Kubernetes部署配置 apiVersion: apps/v1 kind: Deployment metadata: name: oms-order-service namespace: oms-saas spec: replicas: 3 selector: matchLabels: app: oms-order-service template: metadata: labels: app: oms-order-service spec: containers: - name: order-service image: registry.example.com/oms/order-service:1.0.0 ports: - containerPort: 8080 env: - name: SPRING_PROFILES_ACTIVE value: "prod" - name: DB_HOST valueFrom: secretKeyRef: name: db-credentials key: host resources: requests: cpu: "500m" memory: "1Gi" limits: cpu: "2000m" memory: "4Gi" livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health/readiness port: 8080 initialDelaySeconds: 30 periodSeconds: 10 --- # HPA自动伸缩 apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: oms-order-service-hpa namespace: oms-saas spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: oms-order-service minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 总结 OMS系统的未来发展方向清晰可见: ...

2025-11-22 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...