引子:一次服务雪崩引发的思考
2020年双11凌晨2点,某电商平台订单服务突然不可用,导致用户无法下单。
故障链路:
用户下单 → 订单服务 → 库存服务(超时20秒) → 订单服务线程池耗尽 → 整个系统不可用
问题根源:
- 订单服务同步调用库存服务(HTTP请求)
- 库存服务压力大,响应慢(20秒超时)
- 订单服务线程池耗尽(200个线程全部阻塞)
- 新的订单请求无法处理,系统崩溃
架构师王明的反思:
- “同步调用的问题是什么?”
- “为什么不用异步消息?”
- “什么场景用同步,什么场景用异步?”
这个案例揭示了微服务通信的核心问题:如何选择合适的通信模式,平衡性能、可靠性、复杂度?
一、通信模式的本质:耦合度与可靠性的权衡
1.1 通信模式的两个维度
维度1:同步 vs 异步
| 维度 | 同步通信 | 异步通信 |
|---|---|---|
| 调用方式 | 请求→等待→响应 | 请求→立即返回→回调 |
| 阻塞性 | 调用方阻塞等待 | 调用方不阻塞 |
| 耦合度 | 强耦合(时间耦合) | 弱耦合(时间解耦) |
| 响应时间 | 快(毫秒级) | 慢(秒级或分钟级) |
| 可用性 | 低(被调用方挂了,调用方也挂) | 高(被调用方挂了,消息不丢失) |
| 复杂度 | 低(简单直接) | 高(需要消息队列) |
维度2:点对点 vs 发布订阅
| 维度 | 点对点(P2P) | 发布订阅(Pub/Sub) |
|---|---|---|
| 调用关系 | 一对一 | 一对多 |
| 耦合度 | 强耦合(空间耦合) | 弱耦合(空间解耦) |
| 扩展性 | 差(新增消费者要修改代码) | 好(新增消费者不影响发布者) |
| 典型场景 | RPC调用 | 领域事件 |
1.2 耦合度的四个维度
1. 时间耦合(Temporal Coupling)
- 同步调用:调用方必须等待被调用方返回(时间耦合)
- 异步消息:调用方不等待,发送消息后立即返回(时间解耦)
2. 空间耦合(Spatial Coupling)
- 直接调用:调用方需要知道被调用方的地址(空间耦合)
- 消息队列:调用方只知道队列名称,不知道消费者(空间解耦)
3. 平台耦合(Platform Coupling)
- 语言绑定:gRPC需要生成客户端代码(平台耦合)
- HTTP REST:任何语言都可以调用(平台解耦)
4. 数据耦合(Data Coupling)
- 共享数据库:服务间通过数据库通信(数据耦合)
- API通信:服务间通过API通信(数据解耦)
1.3 可靠性的三个层次
1. 消息可能丢失(At Most Once)
发送方 → 消息 → 网络故障 → 消息丢失
适用场景:日志、监控数据(允许少量丢失)
2. 消息至少一次(At Least Once)
发送方 → 消息 → 重试机制 → 消息可能重复 → 接收方去重
适用场景:订单创建、支付(不能丢失,但可以重复)
3. 消息恰好一次(Exactly Once)
发送方 → 幂等操作 + 去重 → 接收方保证只处理一次
适用场景:金融交易(既不能丢失,也不能重复)
二、同步通信:RESTful API、gRPC、Apache Dubbo
2.1 RESTful API:最流行的通信方式
什么是RESTful API?
**REST(Representational State Transfer)**是一种架构风格,由Roy Fielding在2000年提出。
六大约束:
- 客户端-服务器分离:前后端分离
- 无状态:每个请求包含所有必要信息
- 可缓存:响应可以被缓存
- 统一接口:使用HTTP标准方法(GET、POST、PUT、DELETE)
- 分层系统:客户端不知道是否连接到最终服务器
- 按需代码(可选):服务器可以返回可执行代码
RESTful API设计示例
/**
* 订单服务:RESTful API
*/
@RestController
@RequestMapping("/api/orders")
public class OrderController {
@Autowired
private OrderService orderService;
/**
* 创建订单
* POST /api/orders
*/
@PostMapping
public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
Order order = orderService.createOrder(request);
return ResponseEntity.status(HttpStatus.CREATED).body(order);
}
/**
* 查询订单
* GET /api/orders/{orderNo}
*/
@GetMapping("/{orderNo}")
public ResponseEntity<Order> getOrder(@PathVariable String orderNo) {
Order order = orderService.getOrder(orderNo);
if (order == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(order);
}
/**
* 更新订单
* PUT /api/orders/{orderNo}
*/
@PutMapping("/{orderNo}")
public ResponseEntity<Order> updateOrder(
@PathVariable String orderNo,
@RequestBody UpdateOrderRequest request) {
Order order = orderService.updateOrder(orderNo, request);
return ResponseEntity.ok(order);
}
/**
* 删除订单
* DELETE /api/orders/{orderNo}
*/
@DeleteMapping("/{orderNo}")
public ResponseEntity<Void> deleteOrder(@PathVariable String orderNo) {
orderService.deleteOrder(orderNo);
return ResponseEntity.noContent().build();
}
/**
* 查询用户的所有订单
* GET /api/orders?userId=123
*/
@GetMapping
public ResponseEntity<List<Order>> getUserOrders(@RequestParam Long userId) {
List<Order> orders = orderService.getUserOrders(userId);
return ResponseEntity.ok(orders);
}
}
RESTful API的调用方式
方式1:RestTemplate(Spring传统方式)
@Service
public class OrderServiceClient {
@Autowired
private RestTemplate restTemplate;
private static final String ORDER_SERVICE_URL = "http://order-service/api/orders";
/**
* 创建订单
*/
public Order createOrder(CreateOrderRequest request) {
ResponseEntity<Order> response = restTemplate.postForEntity(
ORDER_SERVICE_URL,
request,
Order.class
);
return response.getBody();
}
/**
* 查询订单
*/
public Order getOrder(String orderNo) {
ResponseEntity<Order> response = restTemplate.getForEntity(
ORDER_SERVICE_URL + "/" + orderNo,
Order.class
);
return response.getBody();
}
}
方式2:OpenFeign(声明式HTTP客户端)
/**
* 订单服务客户端(Feign声明式)
*/
@FeignClient(
name = "order-service", // 服务名(从注册中心获取)
fallback = OrderServiceFallback.class // 降级策略
)
public interface OrderServiceClient {
/**
* 创建订单
*/
@PostMapping("/api/orders")
Order createOrder(@RequestBody CreateOrderRequest request);
/**
* 查询订单
*/
@GetMapping("/api/orders/{orderNo}")
Order getOrder(@PathVariable("orderNo") String orderNo);
/**
* 查询用户的所有订单
*/
@GetMapping("/api/orders")
List<Order> getUserOrders(@RequestParam("userId") Long userId);
}
/**
* 降级策略
*/
@Component
public class OrderServiceFallback implements OrderServiceClient {
@Override
public Order createOrder(CreateOrderRequest request) {
throw new BusinessException("订单服务不可用");
}
@Override
public Order getOrder(String orderNo) {
return Order.builder()
.orderNo(orderNo)
.status(OrderStatus.UNKNOWN)
.build();
}
@Override
public List<Order> getUserOrders(Long userId) {
return Collections.emptyList();
}
}
RESTful API的优缺点
优势:
- ✅ 简单易懂:使用HTTP标准方法,学习成本低
- ✅ 跨语言支持:任何语言都可以调用HTTP API
- ✅ 可调试性强:可以用curl、Postman测试
- ✅ 生态成熟:大量工具和库支持
劣势:
- ❌ 性能较低:JSON序列化/反序列化开销大
- ❌ 类型不安全:没有强类型约束,容易出错
- ❌ HTTP开销:HTTP头部信息占用带宽
- ❌ 不支持双向流:只能请求-响应模式
2.2 gRPC:高性能RPC框架
什么是gRPC?
gRPC是Google开源的高性能RPC框架,基于HTTP/2和Protocol Buffers。
核心特性:
- 高性能:使用Protobuf二进制序列化(比JSON快5-10倍)
- 强类型:通过.proto文件定义接口,生成客户端和服务端代码
- 多语言支持:支持Java、Go、Python、C++等
- 双向流:支持客户端流、服务端流、双向流
gRPC服务定义
1. 定义Protocol Buffer(.proto文件)
syntax = "proto3";
package order;
option java_package = "com.example.order.grpc";
option java_outer_classname = "OrderProto";
// 订单服务接口
service OrderService {
// 创建订单
rpc CreateOrder(CreateOrderRequest) returns (Order);
// 查询订单
rpc GetOrder(GetOrderRequest) returns (Order);
// 查询用户的所有订单(服务端流)
rpc GetUserOrders(GetUserOrdersRequest) returns (stream Order);
// 批量创建订单(客户端流)
rpc BatchCreateOrders(stream CreateOrderRequest) returns (BatchCreateOrdersResponse);
// 订单实时同步(双向流)
rpc SyncOrders(stream Order) returns (stream Order);
}
// 创建订单请求
message CreateOrderRequest {
int64 user_id = 1;
int64 product_id = 2;
int32 quantity = 3;
}
// 订单
message Order {
string order_no = 1;
int64 user_id = 2;
int64 product_id = 3;
string product_name = 4;
int32 quantity = 5;
double amount = 6;
string status = 7;
int64 create_time = 8;
}
// 查询订单请求
message GetOrderRequest {
string order_no = 1;
}
// 查询用户订单请求
message GetUserOrdersRequest {
int64 user_id = 1;
}
// 批量创建订单响应
message BatchCreateOrdersResponse {
int32 success_count = 1;
int32 failed_count = 2;
}
2. 生成Java代码
# 使用protoc编译器生成Java代码
protoc --java_out=src/main/java \
--grpc-java_out=src/main/java \
order.proto
3. 实现服务端
/**
* 订单服务实现(gRPC服务端)
*/
@GrpcService
public class OrderGrpcService extends OrderServiceGrpc.OrderServiceImplBase {
@Autowired
private OrderService orderService;
/**
* 创建订单(一元RPC)
*/
@Override
public void createOrder(CreateOrderRequest request,
StreamObserver<Order> responseObserver) {
try {
// 调用业务服务
com.example.order.entity.Order order = orderService.createOrder(
convertToEntity(request)
);
// 返回响应
responseObserver.onNext(convertToProto(order));
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(
Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException()
);
}
}
/**
* 查询订单(一元RPC)
*/
@Override
public void getOrder(GetOrderRequest request,
StreamObserver<Order> responseObserver) {
com.example.order.entity.Order order = orderService.getOrder(
request.getOrderNo()
);
if (order == null) {
responseObserver.onError(
Status.NOT_FOUND
.withDescription("订单不存在")
.asRuntimeException()
);
} else {
responseObserver.onNext(convertToProto(order));
responseObserver.onCompleted();
}
}
/**
* 查询用户的所有订单(服务端流)
*/
@Override
public void getUserOrders(GetUserOrdersRequest request,
StreamObserver<Order> responseObserver) {
List<com.example.order.entity.Order> orders = orderService.getUserOrders(
request.getUserId()
);
// 流式返回多个订单
for (com.example.order.entity.Order order : orders) {
responseObserver.onNext(convertToProto(order));
}
responseObserver.onCompleted();
}
/**
* 批量创建订单(客户端流)
*/
@Override
public StreamObserver<CreateOrderRequest> batchCreateOrders(
StreamObserver<BatchCreateOrdersResponse> responseObserver) {
return new StreamObserver<CreateOrderRequest>() {
private int successCount = 0;
private int failedCount = 0;
@Override
public void onNext(CreateOrderRequest request) {
try {
orderService.createOrder(convertToEntity(request));
successCount++;
} catch (Exception e) {
failedCount++;
}
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
BatchCreateOrdersResponse response = BatchCreateOrdersResponse.newBuilder()
.setSuccessCount(successCount)
.setFailedCount(failedCount)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
}
4. 实现客户端
/**
* 订单服务客户端(gRPC客户端)
*/
@Service
public class OrderGrpcClient {
@GrpcClient("order-service")
private OrderServiceGrpc.OrderServiceBlockingStub orderServiceStub;
/**
* 创建订单
*/
public Order createOrder(CreateOrderRequest request) {
try {
return orderServiceStub.createOrder(request);
} catch (StatusRuntimeException e) {
throw new BusinessException("订单服务不可用: " + e.getStatus());
}
}
/**
* 查询订单
*/
public Order getOrder(String orderNo) {
GetOrderRequest request = GetOrderRequest.newBuilder()
.setOrderNo(orderNo)
.build();
return orderServiceStub.getOrder(request);
}
/**
* 查询用户的所有订单(服务端流)
*/
public List<Order> getUserOrders(Long userId) {
GetUserOrdersRequest request = GetUserOrdersRequest.newBuilder()
.setUserId(userId)
.build();
List<Order> orders = new ArrayList<>();
// 接收流式响应
Iterator<Order> iterator = orderServiceStub.getUserOrders(request);
while (iterator.hasNext()) {
orders.add(iterator.next());
}
return orders;
}
}
gRPC的优缺点
优势:
- ✅ 高性能:Protobuf序列化比JSON快5-10倍
- ✅ 强类型:编译时检查,减少运行时错误
- ✅ 双向流:支持客户端流、服务端流、双向流
- ✅ HTTP/2:多路复用、头部压缩、服务端推送
劣势:
- ❌ 学习成本高:需要学习Protocol Buffers
- ❌ 可调试性差:二进制协议,无法直接查看
- ❌ 浏览器支持差:浏览器不能直接调用gRPC
- ❌ 生态不如REST成熟:工具和库相对较少
2.3 Apache Dubbo:阿里巴巴开源的RPC框架
什么是Dubbo?
Dubbo是阿里巴巴开源的高性能Java RPC框架,已捐献给Apache基金会。
核心特性:
- 高性能:基于Netty,支持多种序列化协议(Hessian、Protobuf、Kryo)
- 服务治理:服务注册、发现、负载均衡、容错
- 易用性:基于Spring Boot,零配置快速接入
- 扩展性:插件化设计,支持自定义扩展
Dubbo服务定义
1. 定义接口
/**
* 订单服务接口
*/
public interface OrderService {
/**
* 创建订单
*/
Order createOrder(CreateOrderRequest request);
/**
* 查询订单
*/
Order getOrder(String orderNo);
/**
* 查询用户的所有订单
*/
List<Order> getUserOrders(Long userId);
}
2. 实现服务提供者
/**
* 订单服务实现(Dubbo服务提供者)
*/
@DubboService(
version = "1.0.0",
timeout = 5000,
retries = 2
)
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Override
public Order createOrder(CreateOrderRequest request) {
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
return orderRepository.save(order);
}
@Override
public Order getOrder(String orderNo) {
return orderRepository.findByOrderNo(orderNo);
}
@Override
public List<Order> getUserOrders(Long userId) {
return orderRepository.findByUserId(userId);
}
}
3. 配置服务提供者
# application.yml(服务提供者)
dubbo:
application:
name: order-service
protocol:
name: dubbo
port: 20880
registry:
address: nacos://127.0.0.1:8848
provider:
timeout: 5000
retries: 2
4. 实现服务消费者
/**
* 订单服务消费者
*/
@Service
public class OrderServiceConsumer {
@DubboReference(
version = "1.0.0",
timeout = 5000,
retries = 2,
loadbalance = "random" // 负载均衡策略
)
private OrderService orderService;
public Order createOrder(CreateOrderRequest request) {
return orderService.createOrder(request);
}
public Order getOrder(String orderNo) {
return orderService.getOrder(orderNo);
}
}
5. 配置服务消费者
# application.yml(服务消费者)
dubbo:
application:
name: order-consumer
registry:
address: nacos://127.0.0.1:8848
consumer:
timeout: 5000
retries: 2
check: false # 启动时不检查服务是否可用
Dubbo的优缺点
优势:
- ✅ 高性能:基于Netty,支持多种序列化协议
- ✅ 服务治理完善:负载均衡、容错、降级、限流
- ✅ 易用性高:基于Spring Boot,零配置快速接入
- ✅ 生态成熟:阿里系大规模生产验证
劣势:
- ❌ Java绑定:主要支持Java(虽然有多语言版本,但不成熟)
- ❌ 学习成本:需要学习Dubbo特有的概念
- ❌ 社区活跃度:相比gRPC,社区相对较小
三、异步通信:消息队列(RabbitMQ、RocketMQ、Kafka)
3.1 为什么需要消息队列?
同步调用的问题:
订单服务 → 库存服务(同步调用,阻塞等待20秒)
问题:
1. 订单服务线程阻塞20秒
2. 库存服务压力大,响应慢
3. 订单服务线程池耗尽,系统崩溃
异步消息的解决方案:
订单服务 → 消息队列 → 库存服务
优势:
1. 订单服务发送消息后立即返回(不阻塞)
2. 库存服务异步消费消息(自己的节奏)
3. 消息队列削峰填谷(高峰时缓冲消息)
3.2 消息队列的三种模型
模型1:点对点(Point-to-Point)
特点:
- 一对一通信
- 消息只能被一个消费者消费
- 消费后消息删除
典型场景:任务分发
消息队列(Queue)
Producer → [M1, M2, M3, M4, M5] → Consumer1(消费M1, M2)
→ Consumer2(消费M3, M4)
→ Consumer3(消费M5)
模型2:发布订阅(Publish/Subscribe)
特点:
- 一对多通信
- 消息可以被多个消费者消费
- 消费后消息不删除(每个消费者独立消费)
典型场景:领域事件
Topic(主题)
Publisher → [OrderCreatedEvent] → Subscriber1(库存服务)
→ Subscriber2(积分服务)
→ Subscriber3(通知服务)
模型3:请求响应(Request/Response)
特点:
- 双向通信
- 请求者发送消息,等待响应
- 响应者处理消息,返回结果
典型场景:异步RPC
Requester → Request Queue → Worker
← Reply Queue ←
3.3 RabbitMQ:基于AMQP协议的消息队列
RabbitMQ的核心概念
Producer → Exchange → Binding → Queue → Consumer
(交换机) (绑定) (队列)
核心组件:
- Producer:消息生产者
- Exchange:交换机(根据路由规则分发消息)
- Binding:绑定(交换机与队列的关系)
- Queue:队列(存储消息)
- Consumer:消息消费者
RabbitMQ的四种交换机类型
1. Direct Exchange(直连交换机)
Producer → Direct Exchange
|
├─ routing_key="order.create" → Queue1
├─ routing_key="order.pay" → Queue2
└─ routing_key="order.cancel" → Queue3
2. Fanout Exchange(扇出交换机)
Producer → Fanout Exchange
|
├─ Queue1(全部消息)
├─ Queue2(全部消息)
└─ Queue3(全部消息)
3. Topic Exchange(主题交换机)
Producer → Topic Exchange
|
├─ routing_key="order.*" → Queue1(匹配order.create、order.pay等)
├─ routing_key="*.create" → Queue2(匹配order.create、product.create等)
└─ routing_key="order.#" → Queue3(匹配order开头的所有消息)
4. Headers Exchange(头交换机)
根据消息头部属性路由(很少使用)
RabbitMQ代码示例
1. 生产者(发送消息)
/**
* 订单服务:发送订单创建事件
*/
@Service
public class OrderEventPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String EXCHANGE = "order.topic.exchange";
private static final String ROUTING_KEY = "order.created";
/**
* 发布订单创建事件
*/
public void publishOrderCreatedEvent(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderNo(order.getOrderNo())
.userId(order.getUserId())
.amount(order.getAmount())
.createTime(LocalDateTime.now())
.build();
// 发送消息到交换机
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, event);
log.info("发布订单创建事件: {}", event);
}
}
2. 消费者(接收消息)
/**
* 库存服务:监听订单创建事件
*/
@Component
public class OrderCreatedEventListener {
@Autowired
private InventoryService inventoryService;
/**
* 监听订单创建事件,扣减库存
*/
@RabbitListener(queues = "order.created.queue")
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
log.info("接收到订单创建事件: {}", event);
try {
// 扣减库存
inventoryService.deduct(event.getProductId(), event.getQuantity());
log.info("库存扣减成功: orderNo={}", event.getOrderNo());
} catch (Exception e) {
log.error("库存扣减失败: orderNo={}, error={}",
event.getOrderNo(), e.getMessage());
throw e; // 重新抛出异常,触发重试
}
}
}
3. 配置交换机和队列
/**
* RabbitMQ配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 声明交换机
*/
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange("order.topic.exchange", true, false);
}
/**
* 声明队列
*/
@Bean
public Queue orderCreatedQueue() {
return new Queue("order.created.queue", true, false, false);
}
/**
* 绑定交换机和队列
*/
@Bean
public Binding orderCreatedBinding() {
return BindingBuilder
.bind(orderCreatedQueue())
.to(orderTopicExchange())
.with("order.created");
}
}
3.4 RocketMQ:阿里巴巴开源的消息队列
RocketMQ的核心特性
- 高性能:单机TPS 10万+
- 高可用:主从自动切换
- 事务消息:支持分布式事务
- 延时消息:支持定时投递
- 顺序消息:保证消息顺序
- 消息过滤:支持Tag过滤
RocketMQ代码示例
1. 生产者(发送消息)
/**
* 订单服务:发送订单创建事件(RocketMQ)
*/
@Service
public class OrderEventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final String TOPIC = "ORDER_CREATED_TOPIC";
/**
* 发布订单创建事件
*/
public void publishOrderCreatedEvent(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderNo(order.getOrderNo())
.userId(order.getUserId())
.amount(order.getAmount())
.createTime(LocalDateTime.now())
.build();
// 同步发送消息
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, event);
log.info("发布订单创建事件成功: msgId={}", sendResult.getMsgId());
}
/**
* 异步发送消息
*/
public void publishOrderCreatedEventAsync(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderNo(order.getOrderNo())
.userId(order.getUserId())
.build();
// 异步发送消息
rocketMQTemplate.asyncSend(TOPIC, event, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功: msgId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败: {}", e.getMessage());
}
});
}
/**
* 发送延时消息(例如:30分钟后自动取消订单)
*/
public void publishOrderCancelEvent(Order order) {
OrderCancelEvent event = OrderCancelEvent.builder()
.orderNo(order.getOrderNo())
.build();
// 发送延时消息(延时级别:1秒、5秒、10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、20分钟、30分钟、1小时、2小时)
Message<OrderCancelEvent> message = MessageBuilder
.withPayload(event)
.build();
rocketMQTemplate.syncSend(
"ORDER_CANCEL_TOPIC",
message,
3000, // 发送超时时间
16 // 延时级别:30分钟
);
}
}
2. 消费者(接收消息)
/**
* 库存服务:监听订单创建事件(RocketMQ)
*/
@Service
@RocketMQMessageListener(
topic = "ORDER_CREATED_TOPIC",
consumerGroup = "inventory-consumer-group",
selectorExpression = "*" // Tag过滤(*表示所有Tag)
)
public class OrderCreatedEventListener implements RocketMQListener<OrderCreatedEvent> {
@Autowired
private InventoryService inventoryService;
@Override
public void onMessage(OrderCreatedEvent event) {
log.info("接收到订单创建事件: {}", event);
try {
// 扣减库存
inventoryService.deduct(event.getProductId(), event.getQuantity());
log.info("库存扣减成功: orderNo={}", event.getOrderNo());
} catch (Exception e) {
log.error("库存扣减失败: orderNo={}, error={}",
event.getOrderNo(), e.getMessage());
throw e; // 重新抛出异常,触发重试
}
}
}
3. 事务消息(分布式事务)
/**
* 订单服务:发送事务消息
*/
@Service
public class OrderTransactionPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
public void publishTransactionMessage(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderNo(order.getOrderNo())
.build();
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"ORDER_CREATED_TOPIC",
MessageBuilder.withPayload(event).build(),
order // 传递给事务监听器的参数
);
log.info("事务消息发送结果: {}", result.getLocalTransactionState());
}
}
/**
* 事务监听器
*/
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
// 执行本地事务(创建订单)
orderService.createOrder(order);
// 本地事务成功,提交消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行失败: {}", e.getMessage());
// 本地事务失败,回滚消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 检查本地事务状态(用于消息回查)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 根据消息ID查询本地事务状态
String orderNo = extractOrderNo(msg);
Order order = orderService.getOrder(orderNo);
if (order != null) {
// 订单存在,提交消息
return RocketMQLocalTransactionState.COMMIT;
} else {
// 订单不存在,回滚消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
3.5 Apache Kafka:高吞吐量的分布式消息系统
Kafka的核心特性
- 高吞吐量:单机TPS 100万+
- 持久化:消息持久化到磁盘
- 分布式:支持分区、复制
- 高可用:故障自动恢复
- 流处理:支持Kafka Streams
Kafka的核心概念
Producer → Topic(主题)
|
├─ Partition 0(分区0)→ Consumer Group 1
├─ Partition 1(分区1)→ Consumer Group 1
└─ Partition 2(分区2)→ Consumer Group 1
核心组件:
- Producer:消息生产者
- Topic:主题(消息分类)
- Partition:分区(并行处理)
- Consumer:消息消费者
- Consumer Group:消费者组(负载均衡)
Kafka代码示例
1. 生产者(发送消息)
/**
* 订单服务:发送订单创建事件(Kafka)
*/
@Service
public class OrderEventPublisher {
@Autowired
private KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
private static final String TOPIC = "order-created-topic";
/**
* 发布订单创建事件
*/
public void publishOrderCreatedEvent(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderNo(order.getOrderNo())
.userId(order.getUserId())
.amount(order.getAmount())
.createTime(LocalDateTime.now())
.build();
// 发送消息(异步)
kafkaTemplate.send(TOPIC, order.getOrderNo(), event)
.addCallback(
result -> log.info("发送成功: partition={}, offset={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset()),
ex -> log.error("发送失败: {}", ex.getMessage())
);
}
}
2. 消费者(接收消息)
/**
* 库存服务:监听订单创建事件(Kafka)
*/
@Service
public class OrderCreatedEventListener {
@Autowired
private InventoryService inventoryService;
/**
* 监听订单创建事件
*/
@KafkaListener(
topics = "order-created-topic",
groupId = "inventory-consumer-group"
)
public void handleOrderCreatedEvent(ConsumerRecord<String, OrderCreatedEvent> record) {
OrderCreatedEvent event = record.value();
log.info("接收到订单创建事件: partition={}, offset={}, event={}",
record.partition(), record.offset(), event);
try {
// 扣减库存
inventoryService.deduct(event.getProductId(), event.getQuantity());
log.info("库存扣减成功: orderNo={}", event.getOrderNo());
} catch (Exception e) {
log.error("库存扣减失败: orderNo={}, error={}",
event.getOrderNo(), e.getMessage());
throw e;
}
}
}
3.6 三种消息队列的对比
| 维度 | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|
| TPS | 1万+ | 10万+ | 100万+ |
| 延时 | 微秒级 | 毫秒级 | 毫秒级 |
| 可靠性 | 高 | 高 | 高 |
| 顺序性 | 支持 | 支持 | 支持(分区内) |
| 事务消息 | 不支持 | 支持 | 不支持 |
| 延时消息 | 需要插件 | 支持 | 不支持 |
| 消息查询 | 不支持 | 支持 | 不支持 |
| 适用场景 | 中小规模、复杂路由 | 大规模、分布式事务 | 大数据、日志采集 |
四、事件驱动架构:从请求-响应到发布-订阅
4.1 什么是事件驱动架构?
**事件驱动架构(Event-Driven Architecture, EDA)**是一种软件架构模式,系统通过产生、检测、消费事件来进行通信。
核心概念:
- 事件:系统中发生的重要事情(订单已创建、订单已支付)
- 事件发布者:产生事件的服务
- 事件订阅者:消费事件的服务
- 事件总线:传递事件的中间件
传统请求-响应模式:
订单服务 → 库存服务(同步调用)
→ 积分服务(同步调用)
→ 通知服务(同步调用)
问题:
1. 订单服务需要知道所有下游服务
2. 任何一个下游服务挂了,订单创建失败
3. 新增下游服务,需要修改订单服务代码
事件驱动模式:
订单服务 → 事件总线(发布"订单已创建"事件)
↓
┌──────┼──────┬──────┐
↓ ↓ ↓ ↓
库存服务 积分服务 通知服务 新服务
优势:
1. 订单服务不需要知道下游服务
2. 下游服务挂了,不影响订单创建
3. 新增下游服务,不需要修改订单服务
4.2 领域事件的设计
领域事件表示业务领域中发生的重要事情。
命名规范:
- 使用过去式:OrderCreatedEvent、OrderPaidEvent(表示已经发生)
- 包含关键信息:订单号、用户ID、金额等
示例:
/**
* 订单已创建事件
*/
@Data
@Builder
public class OrderCreatedEvent {
private String orderNo; // 订单号
private Long userId; // 用户ID
private Long productId; // 商品ID
private Integer quantity; // 数量
private BigDecimal amount; // 金额
private LocalDateTime createTime; // 创建时间
}
/**
* 订单已支付事件
*/
@Data
@Builder
public class OrderPaidEvent {
private String orderNo;
private Long userId;
private BigDecimal amount;
private LocalDateTime payTime;
}
/**
* 订单已取消事件
*/
@Data
@Builder
public class OrderCancelledEvent {
private String orderNo;
private Long userId;
private String cancelReason;
private LocalDateTime cancelTime;
}
4.3 事件溯源(Event Sourcing)
事件溯源是一种数据存储模式,将所有状态变更记录为事件序列。
传统模式:
订单表:
| order_no | status | amount | update_time |
|----------|---------|--------|-------------|
| ORD001 | PAID | 100.00 | 2025-11-03 |
问题:只能看到当前状态,无法知道历史变更
事件溯源模式:
事件表:
| event_id | event_type | order_no | data | event_time |
|----------|-------------------|----------|---------|-------------|
| 1 | OrderCreated | ORD001 | {...} | 2025-11-03 |
| 2 | OrderPaid | ORD001 | {...} | 2025-11-03 |
订单状态 = 重放所有事件:
OrderCreated → status=UNPAID
OrderPaid → status=PAID
优势:
1. 完整的历史记录(审计、回溯)
2. 可以重放事件(重建状态)
3. 支持时间旅行(查看任意时刻的状态)
4.4 CQRS模式(命令查询职责分离)
**CQRS(Command Query Responsibility Segregation)**将系统分为命令端和查询端。
传统模式:
用户 → 订单服务(既处理写请求,又处理读请求)
↓
Order表
CQRS模式:
用户 → 命令端(OrderCommandService)→ 写入 → Order表
↓ 发布事件
事件总线
↓ 订阅事件
用户 → 查询端(OrderQueryService)→ 读取 → Order视图表
优势:
- 读写分离:写操作优化事务性,读操作优化查询性能
- 独立扩展:读多写少场景,可以只扩展查询端
- 灵活性:查询端可以使用不同的数据库(如Elasticsearch)
五、总结与最佳实践
5.1 如何选择通信方式?
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 需要立即响应 | RESTful/gRPC | 同步调用,立即返回结果 |
| 可以异步处理 | 消息队列 | 解耦,削峰填谷 |
| 高性能要求 | gRPC/Dubbo | 二进制序列化,性能高 |
| 跨语言调用 | RESTful | HTTP标准,任何语言都支持 |
| 事件通知 | 消息队列 | 发布订阅,解耦 |
| 大数据量传输 | Kafka | 高吞吐量 |
| 分布式事务 | RocketMQ事务消息 | 支持事务消息 |
5.2 幂等性设计
什么是幂等性?
幂等性是指同一个操作执行多次,结果和执行一次相同。
为什么需要幂等性?
在分布式系统中,网络不可靠,消息可能重复发送:
Producer → Message → 网络超时 → 重试 → Message重复 → Consumer处理两次
问题:订单创建两次、库存扣减两次、支付扣款两次
幂等性实现方案:
方案1:唯一ID去重
@Service
public class OrderService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public Order createOrder(CreateOrderRequest request) {
String idempotentKey = request.getIdempotentKey();
// 检查是否已处理
Boolean success = redisTemplate.opsForValue()
.setIfAbsent("idempotent:" + idempotentKey, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(success)) {
// 已处理,直接返回
return getOrder(idempotentKey);
}
// 未处理,创建订单
Order order = doCreateOrder(request);
return order;
}
}
方案2:数据库唯一索引
CREATE TABLE t_order (
order_no VARCHAR(32) PRIMARY KEY, -- 订单号(唯一索引)
user_id BIGINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
...
);
-- 重复插入会违反唯一索引,抛出异常
INSERT INTO t_order (order_no, user_id, amount) VALUES ('ORD001', 123, 100.00);
方案3:状态机防重
public void payOrder(String orderNo) {
Order order = getOrder(orderNo);
// 检查状态(只有未支付的订单才能支付)
if (order.getStatus() != OrderStatus.UNPAID) {
throw new BusinessException("订单状态不允许支付");
}
// 执行支付
doPayOrder(order);
// 更新状态
order.setStatus(OrderStatus.PAID);
updateOrder(order);
}
5.3 超时与重试策略
超时设置:
@FeignClient(
name = "order-service",
configuration = FeignConfig.class
)
public interface OrderServiceClient {
// ...
}
@Configuration
public class FeignConfig {
@Bean
public Request.Options options() {
return new Request.Options(
5000, // 连接超时:5秒
10000 // 读取超时:10秒
);
}
}
重试策略:
@Configuration
public class RetryConfig {
@Bean
public Retryer retryer() {
return new Retryer.Default(
100, // 初始间隔:100ms
1000, // 最大间隔:1秒
3 // 最大重试次数:3次
);
}
}
5.4 服务降级与熔断
降级策略:
@FeignClient(
name = "order-service",
fallback = OrderServiceFallback.class // 降级实现
)
public interface OrderServiceClient {
Order getOrder(String orderNo);
}
@Component
public class OrderServiceFallback implements OrderServiceClient {
@Override
public Order getOrder(String orderNo) {
// 返回默认值或缓存数据
return Order.builder()
.orderNo(orderNo)
.status(OrderStatus.UNKNOWN)
.build();
}
}
参考资料:
- 《微服务架构设计模式》- Chris Richardson
- 《Enterprise Integration Patterns》- Gregor Hohpe
- 《Kafka权威指南》- Neha Narkhede
- RabbitMQ官方文档
- RocketMQ官方文档
- Apache Kafka官方文档
最后更新时间:2025-11-03