引言:数据孤岛的痛

数字化转型后,企业通常会有多个系统:

  • OMS订单系统
  • WMS仓储系统
  • ERP财务系统
  • TMS物流系统
  • BI报表系统

如果这些系统之间数据不通,就会形成"数据孤岛"

  • 同一个数据,各系统不一致
  • 需要人工在多个系统之间搬运数据
  • 无法获得全局视图

系统集成的目标:让数据自动流转,保持一致。


一、集成架构模式

1.1 三种集成模式对比

模式1:点对点集成

┌─────┐     ┌─────┐
│ OMS │◄───►│ WMS │
└──┬──┘     └──┬──┘
   │           │
   │  ┌─────┐  │
   └─►│ ERP │◄─┘
      └─────┘
优点缺点
简单直接耦合度高
实现快维护难
扩展性差

模式2:ESB企业服务总线

┌─────┐  ┌─────┐  ┌─────┐
│ OMS │  │ WMS │  │ ERP │
└──┬──┘  └──┬──┘  └──┬──┘
   │        │        │
   └────────┼────────┘
            │
      ┌─────▼─────┐
      │    ESB    │
      │ 企业服务总线│
      └───────────┘
优点缺点
统一管理架构复杂
解耦单点风险
可监控成本高

模式3:事件驱动架构(推荐)

┌─────┐  ┌─────┐  ┌─────┐
│ OMS │  │ WMS │  │ ERP │
└──┬──┘  └──┬──┘  └──┬──┘
   │        │        │
   │  发布  │  订阅  │
   └────────┼────────┘
            │
      ┌─────▼─────┐
      │  消息队列  │
      │ (RocketMQ) │
      └───────────┘
优点缺点
高度解耦最终一致
可扩展调试复杂
高可用需要幂等

1.2 推荐架构

5-7亿规模跨境电商,推荐:事件驱动 + 同步调用混合

┌─────────────────────────────────────────────────────┐
│                    API网关                           │
└─────────────────────┬───────────────────────────────┘
                      │
    ┌─────────────────┼─────────────────┐
    │                 │                 │
┌───▼───┐        ┌────▼────┐       ┌────▼────┐
│  OMS  │◄──────►│   WMS   │◄─────►│   ERP   │
└───┬───┘ 同步   └────┬────┘  同步  └────┬────┘
    │                 │                  │
    │     发布事件    │      发布事件    │
    └─────────────────┼──────────────────┘
                      │
              ┌───────▼───────┐
              │   RocketMQ    │
              └───────┬───────┘
                      │
              ┌───────▼───────┐
              │      BI       │
              │   数据分析    │
              └───────────────┘

原则

  • 核心链路用同步:如库存预占、订单下发
  • 非核心链路用异步:如状态同步、数据分析

二、API设计规范

2.1 RESTful API设计

URL命名规范

# 资源命名用名词复数
GET    /api/v1/orders           # 获取订单列表
GET    /api/v1/orders/{id}      # 获取单个订单
POST   /api/v1/orders           # 创建订单
PUT    /api/v1/orders/{id}      # 更新订单
DELETE /api/v1/orders/{id}      # 删除订单

# 子资源
GET    /api/v1/orders/{id}/items    # 获取订单明细
POST   /api/v1/orders/{id}/items    # 添加订单明细

# 操作用动词
POST   /api/v1/orders/{id}/cancel   # 取消订单
POST   /api/v1/orders/{id}/ship     # 发货

HTTP状态码

状态码含义使用场景
200成功GET/PUT/DELETE成功
201已创建POST创建成功
400请求错误参数校验失败
401未授权未登录或token过期
403禁止访问无权限
404未找到资源不存在
500服务器错误系统异常

2.2 统一响应格式

{
    "code": 0,
    "message": "success",
    "data": {
        "orderId": "ORD202401290001",
        "status": "PENDING"
    },
    "timestamp": 1706518800000
}

错误响应

{
    "code": 40001,
    "message": "库存不足",
    "data": null,
    "timestamp": 1706518800000,
    "details": {
        "skuId": "SKU001",
        "required": 10,
        "available": 5
    }
}

2.3 接口安全

认证方式

# 内部系统:API Key + 签名
Authorization: ApiKey xxx
X-Signature: sha256(timestamp + apiKey + body + secret)
X-Timestamp: 1706518800000

# 外部系统:OAuth 2.0
Authorization: Bearer eyJhbGciOiJIUzI1NiIs...

签名算法

public class SignatureUtil {

    public static String sign(String timestamp, String apiKey,
                             String body, String secret) {
        String content = timestamp + apiKey + body + secret;
        return DigestUtils.sha256Hex(content);
    }

    public static boolean verify(String timestamp, String apiKey,
                                String body, String secret, String signature) {
        // 验证时间戳(防重放,5分钟内有效)
        long ts = Long.parseLong(timestamp);
        if (System.currentTimeMillis() - ts > 5 * 60 * 1000) {
            return false;
        }

        String expected = sign(timestamp, apiKey, body, secret);
        return expected.equals(signature);
    }
}

2.4 接口版本管理

URL版本(推荐)

/api/v1/orders
/api/v2/orders

Header版本

Accept: application/vnd.company.v1+json

版本升级策略

  1. 新版本发布后,旧版本保留6个月
  2. 提前3个月通知下游系统升级
  3. 监控旧版本调用量,确认无调用后下线

三、数据同步策略

3.1 同步方式对比

方式实时性复杂度适用场景
同步调用实时核心链路
消息队列秒级状态同步
定时任务分钟级批量同步
CDC秒级数据库同步

3.2 消息队列同步

事件定义

// 订单创建事件
public class OrderCreatedEvent {
    private String eventId;
    private String eventType = "ORDER_CREATED";
    private Long timestamp;
    private OrderDTO data;
}

// 库存变动事件
public class InventoryChangedEvent {
    private String eventId;
    private String eventType = "INVENTORY_CHANGED";
    private Long timestamp;
    private String warehouseId;
    private String skuId;
    private Integer beforeQty;
    private Integer afterQty;
    private String changeType; // IN/OUT/ADJUST
}

发布事件

@Service
public class OrderEventPublisher {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = new OrderCreatedEvent();
        event.setEventId(UUID.randomUUID().toString());
        event.setTimestamp(System.currentTimeMillis());
        event.setData(OrderDTO.from(order));

        rocketMQTemplate.convertAndSend("order-topic", event);
    }
}

消费事件

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "wms-consumer-group"
)
public class OrderEventConsumer implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        // 幂等检查
        if (eventRepository.exists(event.getEventId())) {
            return;
        }

        // 处理事件
        processOrderCreated(event);

        // 记录已处理
        eventRepository.save(event.getEventId());
    }
}

3.3 数据一致性保障

问题:消息发送和数据库操作如何保证一致?

方案1:本地消息表

@Service
@Transactional
public class OrderService {

    public void createOrder(Order order) {
        // 1. 保存订单
        orderRepository.save(order);

        // 2. 保存本地消息(同一事务)
        LocalMessage message = new LocalMessage();
        message.setTopic("order-topic");
        message.setBody(JSON.toJSONString(order));
        message.setStatus("PENDING");
        localMessageRepository.save(message);
    }
}

// 定时任务发送消息
@Scheduled(fixedRate = 1000)
public void sendPendingMessages() {
    List<LocalMessage> messages = localMessageRepository.findPending();
    for (LocalMessage msg : messages) {
        try {
            rocketMQTemplate.convertAndSend(msg.getTopic(), msg.getBody());
            msg.setStatus("SENT");
            localMessageRepository.save(msg);
        } catch (Exception e) {
            // 重试
        }
    }
}

方案2:事务消息

@Service
public class OrderService {

    public void createOrder(Order order) {
        // 发送半消息
        rocketMQTemplate.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(order).build(),
            order
        );
    }
}

@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = (Order) arg;
            orderRepository.save(order);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务是否成功
        Order order = JSON.parseObject(msg.getBody(), Order.class);
        if (orderRepository.exists(order.getOrderId())) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

四、异常处理机制

4.1 异常分类

类型说明处理策略
业务异常库存不足、订单不存在返回错误码,不重试
系统异常数据库超时、网络错误重试
第三方异常物流接口超时重试+降级

4.2 重试机制

@Service
public class RetryableService {

    @Retryable(
        value = {SystemException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void callExternalService() {
        // 调用外部服务
    }

    @Recover
    public void recover(SystemException e) {
        // 重试失败后的处理
        log.error("重试失败", e);
        alertService.send("外部服务调用失败");
    }
}

4.3 熔断降级

@Service
public class CircuitBreakerService {

    @CircuitBreaker(name = "logistics", fallbackMethod = "fallback")
    public TrackingInfo getTracking(String trackingNo) {
        return logisticsClient.getTracking(trackingNo);
    }

    public TrackingInfo fallback(String trackingNo, Exception e) {
        // 降级处理:返回缓存数据或默认值
        return trackingCache.get(trackingNo);
    }
}

4.4 对账机制

@Service
public class ReconciliationService {

    /**
     * 每日对账:比对OMS和WMS的库存
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
    public void reconcileInventory() {
        List<String> skuIds = skuService.getAllSkuIds();

        for (String skuId : skuIds) {
            int omsQty = omsClient.getAvailableQty(skuId);
            int wmsQty = wmsClient.getTotalQty(skuId);

            if (omsQty != wmsQty) {
                // 记录差异
                ReconciliationRecord record = new ReconciliationRecord();
                record.setSkuId(skuId);
                record.setOmsQty(omsQty);
                record.setWmsQty(wmsQty);
                record.setDifference(omsQty - wmsQty);
                reconciliationRepository.save(record);

                // 告警
                if (Math.abs(omsQty - wmsQty) > 10) {
                    alertService.send("库存差异告警: " + skuId);
                }
            }
        }
    }
}

五、监控告警

5.1 监控指标

指标说明告警阈值
接口响应时间P99延迟> 1s
接口成功率成功请求占比< 99%
消息积压未消费消息数> 10000
同步延迟数据同步延迟> 5min

5.2 日志规范

// 接口调用日志
log.info("调用WMS接口 | method={} | request={} | response={} | cost={}ms",
    "pushOutboundOrder", request, response, cost);

// 异常日志
log.error("调用WMS接口失败 | method={} | request={} | error={}",
    "pushOutboundOrder", request, e.getMessage(), e);

六、总结

6.1 核心要点

  1. 架构选择:事件驱动 + 同步调用混合
  2. API规范:RESTful设计、统一响应格式、安全认证
  3. 数据同步:核心链路同步、非核心异步
  4. 一致性保障:本地消息表或事务消息
  5. 异常处理:重试、熔断、降级、对账

6.2 下一步

  • 设计系统间接口规范
  • 搭建消息队列基础设施
  • 实现核心集成链路
  • 建立监控告警体系

系列文章导航

本文是《跨境电商数字化转型指南》系列的第25篇

  • 01-24. 前序文章
  • 25. 系统集成架构设计(本文)
  • 26. API设计最佳实践
  • 27. 数据同步策略详解