引言:数据孤岛的痛
数字化转型后,企业通常会有多个系统:
- OMS订单系统
- WMS仓储系统
- ERP财务系统
- TMS物流系统
- BI报表系统
如果这些系统之间数据不通,就会形成"数据孤岛":
- 同一个数据,各系统不一致
- 需要人工在多个系统之间搬运数据
- 无法获得全局视图
系统集成的目标:让数据自动流转,保持一致。
一、集成架构模式
1.1 三种集成模式对比
模式1:点对点集成
┌─────┐ ┌─────┐
│ OMS │◄───►│ WMS │
└──┬──┘ └──┬──┘
│ │
│ ┌─────┐ │
└─►│ ERP │◄─┘
└─────┘
| 优点 | 缺点 |
|---|---|
| 简单直接 | 耦合度高 |
| 实现快 | 维护难 |
| 扩展性差 |
模式2:ESB企业服务总线
┌─────┐ ┌─────┐ ┌─────┐
│ OMS │ │ WMS │ │ ERP │
└──┬──┘ └──┬──┘ └──┬──┘
│ │ │
└────────┼────────┘
│
┌─────▼─────┐
│ ESB │
│ 企业服务总线│
└───────────┘
| 优点 | 缺点 |
|---|---|
| 统一管理 | 架构复杂 |
| 解耦 | 单点风险 |
| 可监控 | 成本高 |
模式3:事件驱动架构(推荐)
┌─────┐ ┌─────┐ ┌─────┐
│ OMS │ │ WMS │ │ ERP │
└──┬──┘ └──┬──┘ └──┬──┘
│ │ │
│ 发布 │ 订阅 │
└────────┼────────┘
│
┌─────▼─────┐
│ 消息队列 │
│ (RocketMQ) │
└───────────┘
| 优点 | 缺点 |
|---|---|
| 高度解耦 | 最终一致 |
| 可扩展 | 调试复杂 |
| 高可用 | 需要幂等 |
1.2 推荐架构
5-7亿规模跨境电商,推荐:事件驱动 + 同步调用混合
┌─────────────────────────────────────────────────────┐
│ API网关 │
└─────────────────────┬───────────────────────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌───▼───┐ ┌────▼────┐ ┌────▼────┐
│ OMS │◄──────►│ WMS │◄─────►│ ERP │
└───┬───┘ 同步 └────┬────┘ 同步 └────┬────┘
│ │ │
│ 发布事件 │ 发布事件 │
└─────────────────┼──────────────────┘
│
┌───────▼───────┐
│ RocketMQ │
└───────┬───────┘
│
┌───────▼───────┐
│ BI │
│ 数据分析 │
└───────────────┘
原则:
- 核心链路用同步:如库存预占、订单下发
- 非核心链路用异步:如状态同步、数据分析
二、API设计规范
2.1 RESTful API设计
URL命名规范:
# 资源命名用名词复数
GET /api/v1/orders # 获取订单列表
GET /api/v1/orders/{id} # 获取单个订单
POST /api/v1/orders # 创建订单
PUT /api/v1/orders/{id} # 更新订单
DELETE /api/v1/orders/{id} # 删除订单
# 子资源
GET /api/v1/orders/{id}/items # 获取订单明细
POST /api/v1/orders/{id}/items # 添加订单明细
# 操作用动词
POST /api/v1/orders/{id}/cancel # 取消订单
POST /api/v1/orders/{id}/ship # 发货
HTTP状态码:
| 状态码 | 含义 | 使用场景 |
|---|---|---|
| 200 | 成功 | GET/PUT/DELETE成功 |
| 201 | 已创建 | POST创建成功 |
| 400 | 请求错误 | 参数校验失败 |
| 401 | 未授权 | 未登录或token过期 |
| 403 | 禁止访问 | 无权限 |
| 404 | 未找到 | 资源不存在 |
| 500 | 服务器错误 | 系统异常 |
2.2 统一响应格式
{
"code": 0,
"message": "success",
"data": {
"orderId": "ORD202401290001",
"status": "PENDING"
},
"timestamp": 1706518800000
}
错误响应:
{
"code": 40001,
"message": "库存不足",
"data": null,
"timestamp": 1706518800000,
"details": {
"skuId": "SKU001",
"required": 10,
"available": 5
}
}
2.3 接口安全
认证方式:
# 内部系统:API Key + 签名
Authorization: ApiKey xxx
X-Signature: sha256(timestamp + apiKey + body + secret)
X-Timestamp: 1706518800000
# 外部系统:OAuth 2.0
Authorization: Bearer eyJhbGciOiJIUzI1NiIs...
签名算法:
public class SignatureUtil {
public static String sign(String timestamp, String apiKey,
String body, String secret) {
String content = timestamp + apiKey + body + secret;
return DigestUtils.sha256Hex(content);
}
public static boolean verify(String timestamp, String apiKey,
String body, String secret, String signature) {
// 验证时间戳(防重放,5分钟内有效)
long ts = Long.parseLong(timestamp);
if (System.currentTimeMillis() - ts > 5 * 60 * 1000) {
return false;
}
String expected = sign(timestamp, apiKey, body, secret);
return expected.equals(signature);
}
}
2.4 接口版本管理
URL版本(推荐):
/api/v1/orders
/api/v2/orders
Header版本:
Accept: application/vnd.company.v1+json
版本升级策略:
- 新版本发布后,旧版本保留6个月
- 提前3个月通知下游系统升级
- 监控旧版本调用量,确认无调用后下线
三、数据同步策略
3.1 同步方式对比
| 方式 | 实时性 | 复杂度 | 适用场景 |
|---|---|---|---|
| 同步调用 | 实时 | 低 | 核心链路 |
| 消息队列 | 秒级 | 中 | 状态同步 |
| 定时任务 | 分钟级 | 低 | 批量同步 |
| CDC | 秒级 | 高 | 数据库同步 |
3.2 消息队列同步
事件定义:
// 订单创建事件
public class OrderCreatedEvent {
private String eventId;
private String eventType = "ORDER_CREATED";
private Long timestamp;
private OrderDTO data;
}
// 库存变动事件
public class InventoryChangedEvent {
private String eventId;
private String eventType = "INVENTORY_CHANGED";
private Long timestamp;
private String warehouseId;
private String skuId;
private Integer beforeQty;
private Integer afterQty;
private String changeType; // IN/OUT/ADJUST
}
发布事件:
@Service
public class OrderEventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent();
event.setEventId(UUID.randomUUID().toString());
event.setTimestamp(System.currentTimeMillis());
event.setData(OrderDTO.from(order));
rocketMQTemplate.convertAndSend("order-topic", event);
}
}
消费事件:
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "wms-consumer-group"
)
public class OrderEventConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
// 幂等检查
if (eventRepository.exists(event.getEventId())) {
return;
}
// 处理事件
processOrderCreated(event);
// 记录已处理
eventRepository.save(event.getEventId());
}
}
3.3 数据一致性保障
问题:消息发送和数据库操作如何保证一致?
方案1:本地消息表
@Service
@Transactional
public class OrderService {
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 保存本地消息(同一事务)
LocalMessage message = new LocalMessage();
message.setTopic("order-topic");
message.setBody(JSON.toJSONString(order));
message.setStatus("PENDING");
localMessageRepository.save(message);
}
}
// 定时任务发送消息
@Scheduled(fixedRate = 1000)
public void sendPendingMessages() {
List<LocalMessage> messages = localMessageRepository.findPending();
for (LocalMessage msg : messages) {
try {
rocketMQTemplate.convertAndSend(msg.getTopic(), msg.getBody());
msg.setStatus("SENT");
localMessageRepository.save(msg);
} catch (Exception e) {
// 重试
}
}
}
方案2:事务消息
@Service
public class OrderService {
public void createOrder(Order order) {
// 发送半消息
rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(order).build(),
order
);
}
}
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
orderRepository.save(order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务是否成功
Order order = JSON.parseObject(msg.getBody(), Order.class);
if (orderRepository.exists(order.getOrderId())) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
四、异常处理机制
4.1 异常分类
| 类型 | 说明 | 处理策略 |
|---|---|---|
| 业务异常 | 库存不足、订单不存在 | 返回错误码,不重试 |
| 系统异常 | 数据库超时、网络错误 | 重试 |
| 第三方异常 | 物流接口超时 | 重试+降级 |
4.2 重试机制
@Service
public class RetryableService {
@Retryable(
value = {SystemException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void callExternalService() {
// 调用外部服务
}
@Recover
public void recover(SystemException e) {
// 重试失败后的处理
log.error("重试失败", e);
alertService.send("外部服务调用失败");
}
}
4.3 熔断降级
@Service
public class CircuitBreakerService {
@CircuitBreaker(name = "logistics", fallbackMethod = "fallback")
public TrackingInfo getTracking(String trackingNo) {
return logisticsClient.getTracking(trackingNo);
}
public TrackingInfo fallback(String trackingNo, Exception e) {
// 降级处理:返回缓存数据或默认值
return trackingCache.get(trackingNo);
}
}
4.4 对账机制
@Service
public class ReconciliationService {
/**
* 每日对账:比对OMS和WMS的库存
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
public void reconcileInventory() {
List<String> skuIds = skuService.getAllSkuIds();
for (String skuId : skuIds) {
int omsQty = omsClient.getAvailableQty(skuId);
int wmsQty = wmsClient.getTotalQty(skuId);
if (omsQty != wmsQty) {
// 记录差异
ReconciliationRecord record = new ReconciliationRecord();
record.setSkuId(skuId);
record.setOmsQty(omsQty);
record.setWmsQty(wmsQty);
record.setDifference(omsQty - wmsQty);
reconciliationRepository.save(record);
// 告警
if (Math.abs(omsQty - wmsQty) > 10) {
alertService.send("库存差异告警: " + skuId);
}
}
}
}
}
五、监控告警
5.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 接口响应时间 | P99延迟 | > 1s |
| 接口成功率 | 成功请求占比 | < 99% |
| 消息积压 | 未消费消息数 | > 10000 |
| 同步延迟 | 数据同步延迟 | > 5min |
5.2 日志规范
// 接口调用日志
log.info("调用WMS接口 | method={} | request={} | response={} | cost={}ms",
"pushOutboundOrder", request, response, cost);
// 异常日志
log.error("调用WMS接口失败 | method={} | request={} | error={}",
"pushOutboundOrder", request, e.getMessage(), e);
六、总结
6.1 核心要点
- 架构选择:事件驱动 + 同步调用混合
- API规范:RESTful设计、统一响应格式、安全认证
- 数据同步:核心链路同步、非核心异步
- 一致性保障:本地消息表或事务消息
- 异常处理:重试、熔断、降级、对账
6.2 下一步
- 设计系统间接口规范
- 搭建消息队列基础设施
- 实现核心集成链路
- 建立监控告警体系
系列文章导航
本文是《跨境电商数字化转型指南》系列的第25篇
- 01-24. 前序文章
- 25. 系统集成架构设计(本文)
- 26. API设计最佳实践
- 27. 数据同步策略详解