引子:一次服务雪崩引发的思考

2020年双11凌晨2点,某电商平台订单服务突然不可用,导致用户无法下单。

故障链路

用户下单 → 订单服务 → 库存服务(超时20秒) → 订单服务线程池耗尽 → 整个系统不可用

问题根源

  • 订单服务同步调用库存服务(HTTP请求)
  • 库存服务压力大,响应慢(20秒超时)
  • 订单服务线程池耗尽(200个线程全部阻塞)
  • 新的订单请求无法处理,系统崩溃

架构师王明的反思

  • “同步调用的问题是什么?”
  • “为什么不用异步消息?”
  • “什么场景用同步,什么场景用异步?”

这个案例揭示了微服务通信的核心问题:如何选择合适的通信模式,平衡性能、可靠性、复杂度?


一、通信模式的本质:耦合度与可靠性的权衡

1.1 通信模式的两个维度

维度1:同步 vs 异步

维度同步通信异步通信
调用方式请求→等待→响应请求→立即返回→回调
阻塞性调用方阻塞等待调用方不阻塞
耦合度强耦合(时间耦合)弱耦合(时间解耦)
响应时间快(毫秒级)慢(秒级或分钟级)
可用性低(被调用方挂了,调用方也挂)高(被调用方挂了,消息不丢失)
复杂度低(简单直接)高(需要消息队列)

维度2:点对点 vs 发布订阅

维度点对点(P2P)发布订阅(Pub/Sub)
调用关系一对一一对多
耦合度强耦合(空间耦合)弱耦合(空间解耦)
扩展性差(新增消费者要修改代码)好(新增消费者不影响发布者)
典型场景RPC调用领域事件

1.2 耦合度的四个维度

1. 时间耦合(Temporal Coupling)

  • 同步调用:调用方必须等待被调用方返回(时间耦合)
  • 异步消息:调用方不等待,发送消息后立即返回(时间解耦)

2. 空间耦合(Spatial Coupling)

  • 直接调用:调用方需要知道被调用方的地址(空间耦合)
  • 消息队列:调用方只知道队列名称,不知道消费者(空间解耦)

3. 平台耦合(Platform Coupling)

  • 语言绑定:gRPC需要生成客户端代码(平台耦合)
  • HTTP REST:任何语言都可以调用(平台解耦)

4. 数据耦合(Data Coupling)

  • 共享数据库:服务间通过数据库通信(数据耦合)
  • API通信:服务间通过API通信(数据解耦)

1.3 可靠性的三个层次

1. 消息可能丢失(At Most Once)

发送方 → 消息 → 网络故障 → 消息丢失

适用场景:日志、监控数据(允许少量丢失)

2. 消息至少一次(At Least Once)

发送方 → 消息 → 重试机制 → 消息可能重复 → 接收方去重

适用场景:订单创建、支付(不能丢失,但可以重复)

3. 消息恰好一次(Exactly Once)

发送方 → 幂等操作 + 去重 → 接收方保证只处理一次

适用场景:金融交易(既不能丢失,也不能重复)


二、同步通信:RESTful API、gRPC、Apache Dubbo

2.1 RESTful API:最流行的通信方式

什么是RESTful API?

**REST(Representational State Transfer)**是一种架构风格,由Roy Fielding在2000年提出。

六大约束

  1. 客户端-服务器分离:前后端分离
  2. 无状态:每个请求包含所有必要信息
  3. 可缓存:响应可以被缓存
  4. 统一接口:使用HTTP标准方法(GET、POST、PUT、DELETE)
  5. 分层系统:客户端不知道是否连接到最终服务器
  6. 按需代码(可选):服务器可以返回可执行代码

RESTful API设计示例

/**
 * 订单服务:RESTful API
 */
@RestController
@RequestMapping("/api/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    /**
     * 创建订单
     * POST /api/orders
     */
    @PostMapping
    public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
        Order order = orderService.createOrder(request);
        return ResponseEntity.status(HttpStatus.CREATED).body(order);
    }

    /**
     * 查询订单
     * GET /api/orders/{orderNo}
     */
    @GetMapping("/{orderNo}")
    public ResponseEntity<Order> getOrder(@PathVariable String orderNo) {
        Order order = orderService.getOrder(orderNo);
        if (order == null) {
            return ResponseEntity.notFound().build();
        }
        return ResponseEntity.ok(order);
    }

    /**
     * 更新订单
     * PUT /api/orders/{orderNo}
     */
    @PutMapping("/{orderNo}")
    public ResponseEntity<Order> updateOrder(
            @PathVariable String orderNo,
            @RequestBody UpdateOrderRequest request) {
        Order order = orderService.updateOrder(orderNo, request);
        return ResponseEntity.ok(order);
    }

    /**
     * 删除订单
     * DELETE /api/orders/{orderNo}
     */
    @DeleteMapping("/{orderNo}")
    public ResponseEntity<Void> deleteOrder(@PathVariable String orderNo) {
        orderService.deleteOrder(orderNo);
        return ResponseEntity.noContent().build();
    }

    /**
     * 查询用户的所有订单
     * GET /api/orders?userId=123
     */
    @GetMapping
    public ResponseEntity<List<Order>> getUserOrders(@RequestParam Long userId) {
        List<Order> orders = orderService.getUserOrders(userId);
        return ResponseEntity.ok(orders);
    }
}

RESTful API的调用方式

方式1:RestTemplate(Spring传统方式)

@Service
public class OrderServiceClient {

    @Autowired
    private RestTemplate restTemplate;

    private static final String ORDER_SERVICE_URL = "http://order-service/api/orders";

    /**
     * 创建订单
     */
    public Order createOrder(CreateOrderRequest request) {
        ResponseEntity<Order> response = restTemplate.postForEntity(
            ORDER_SERVICE_URL,
            request,
            Order.class
        );
        return response.getBody();
    }

    /**
     * 查询订单
     */
    public Order getOrder(String orderNo) {
        ResponseEntity<Order> response = restTemplate.getForEntity(
            ORDER_SERVICE_URL + "/" + orderNo,
            Order.class
        );
        return response.getBody();
    }
}

方式2:OpenFeign(声明式HTTP客户端)

/**
 * 订单服务客户端(Feign声明式)
 */
@FeignClient(
    name = "order-service",         // 服务名(从注册中心获取)
    fallback = OrderServiceFallback.class  // 降级策略
)
public interface OrderServiceClient {

    /**
     * 创建订单
     */
    @PostMapping("/api/orders")
    Order createOrder(@RequestBody CreateOrderRequest request);

    /**
     * 查询订单
     */
    @GetMapping("/api/orders/{orderNo}")
    Order getOrder(@PathVariable("orderNo") String orderNo);

    /**
     * 查询用户的所有订单
     */
    @GetMapping("/api/orders")
    List<Order> getUserOrders(@RequestParam("userId") Long userId);
}

/**
 * 降级策略
 */
@Component
public class OrderServiceFallback implements OrderServiceClient {

    @Override
    public Order createOrder(CreateOrderRequest request) {
        throw new BusinessException("订单服务不可用");
    }

    @Override
    public Order getOrder(String orderNo) {
        return Order.builder()
                .orderNo(orderNo)
                .status(OrderStatus.UNKNOWN)
                .build();
    }

    @Override
    public List<Order> getUserOrders(Long userId) {
        return Collections.emptyList();
    }
}

RESTful API的优缺点

优势

  1. 简单易懂:使用HTTP标准方法,学习成本低
  2. 跨语言支持:任何语言都可以调用HTTP API
  3. 可调试性强:可以用curl、Postman测试
  4. 生态成熟:大量工具和库支持

劣势

  1. 性能较低:JSON序列化/反序列化开销大
  2. 类型不安全:没有强类型约束,容易出错
  3. HTTP开销:HTTP头部信息占用带宽
  4. 不支持双向流:只能请求-响应模式

2.2 gRPC:高性能RPC框架

什么是gRPC?

gRPC是Google开源的高性能RPC框架,基于HTTP/2和Protocol Buffers。

核心特性

  1. 高性能:使用Protobuf二进制序列化(比JSON快5-10倍)
  2. 强类型:通过.proto文件定义接口,生成客户端和服务端代码
  3. 多语言支持:支持Java、Go、Python、C++等
  4. 双向流:支持客户端流、服务端流、双向流

gRPC服务定义

1. 定义Protocol Buffer(.proto文件)

syntax = "proto3";

package order;

option java_package = "com.example.order.grpc";
option java_outer_classname = "OrderProto";

// 订单服务接口
service OrderService {
  // 创建订单
  rpc CreateOrder(CreateOrderRequest) returns (Order);

  // 查询订单
  rpc GetOrder(GetOrderRequest) returns (Order);

  // 查询用户的所有订单(服务端流)
  rpc GetUserOrders(GetUserOrdersRequest) returns (stream Order);

  // 批量创建订单(客户端流)
  rpc BatchCreateOrders(stream CreateOrderRequest) returns (BatchCreateOrdersResponse);

  // 订单实时同步(双向流)
  rpc SyncOrders(stream Order) returns (stream Order);
}

// 创建订单请求
message CreateOrderRequest {
  int64 user_id = 1;
  int64 product_id = 2;
  int32 quantity = 3;
}

// 订单
message Order {
  string order_no = 1;
  int64 user_id = 2;
  int64 product_id = 3;
  string product_name = 4;
  int32 quantity = 5;
  double amount = 6;
  string status = 7;
  int64 create_time = 8;
}

// 查询订单请求
message GetOrderRequest {
  string order_no = 1;
}

// 查询用户订单请求
message GetUserOrdersRequest {
  int64 user_id = 1;
}

// 批量创建订单响应
message BatchCreateOrdersResponse {
  int32 success_count = 1;
  int32 failed_count = 2;
}

2. 生成Java代码

# 使用protoc编译器生成Java代码
protoc --java_out=src/main/java \
       --grpc-java_out=src/main/java \
       order.proto

3. 实现服务端

/**
 * 订单服务实现(gRPC服务端)
 */
@GrpcService
public class OrderGrpcService extends OrderServiceGrpc.OrderServiceImplBase {

    @Autowired
    private OrderService orderService;

    /**
     * 创建订单(一元RPC)
     */
    @Override
    public void createOrder(CreateOrderRequest request,
                            StreamObserver<Order> responseObserver) {
        try {
            // 调用业务服务
            com.example.order.entity.Order order = orderService.createOrder(
                convertToEntity(request)
            );

            // 返回响应
            responseObserver.onNext(convertToProto(order));
            responseObserver.onCompleted();

        } catch (Exception e) {
            responseObserver.onError(
                Status.INTERNAL
                    .withDescription(e.getMessage())
                    .asRuntimeException()
            );
        }
    }

    /**
     * 查询订单(一元RPC)
     */
    @Override
    public void getOrder(GetOrderRequest request,
                         StreamObserver<Order> responseObserver) {
        com.example.order.entity.Order order = orderService.getOrder(
            request.getOrderNo()
        );

        if (order == null) {
            responseObserver.onError(
                Status.NOT_FOUND
                    .withDescription("订单不存在")
                    .asRuntimeException()
            );
        } else {
            responseObserver.onNext(convertToProto(order));
            responseObserver.onCompleted();
        }
    }

    /**
     * 查询用户的所有订单(服务端流)
     */
    @Override
    public void getUserOrders(GetUserOrdersRequest request,
                              StreamObserver<Order> responseObserver) {
        List<com.example.order.entity.Order> orders = orderService.getUserOrders(
            request.getUserId()
        );

        // 流式返回多个订单
        for (com.example.order.entity.Order order : orders) {
            responseObserver.onNext(convertToProto(order));
        }

        responseObserver.onCompleted();
    }

    /**
     * 批量创建订单(客户端流)
     */
    @Override
    public StreamObserver<CreateOrderRequest> batchCreateOrders(
            StreamObserver<BatchCreateOrdersResponse> responseObserver) {

        return new StreamObserver<CreateOrderRequest>() {
            private int successCount = 0;
            private int failedCount = 0;

            @Override
            public void onNext(CreateOrderRequest request) {
                try {
                    orderService.createOrder(convertToEntity(request));
                    successCount++;
                } catch (Exception e) {
                    failedCount++;
                }
            }

            @Override
            public void onError(Throwable t) {
                responseObserver.onError(t);
            }

            @Override
            public void onCompleted() {
                BatchCreateOrdersResponse response = BatchCreateOrdersResponse.newBuilder()
                        .setSuccessCount(successCount)
                        .setFailedCount(failedCount)
                        .build();

                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        };
    }
}

4. 实现客户端

/**
 * 订单服务客户端(gRPC客户端)
 */
@Service
public class OrderGrpcClient {

    @GrpcClient("order-service")
    private OrderServiceGrpc.OrderServiceBlockingStub orderServiceStub;

    /**
     * 创建订单
     */
    public Order createOrder(CreateOrderRequest request) {
        try {
            return orderServiceStub.createOrder(request);
        } catch (StatusRuntimeException e) {
            throw new BusinessException("订单服务不可用: " + e.getStatus());
        }
    }

    /**
     * 查询订单
     */
    public Order getOrder(String orderNo) {
        GetOrderRequest request = GetOrderRequest.newBuilder()
                .setOrderNo(orderNo)
                .build();

        return orderServiceStub.getOrder(request);
    }

    /**
     * 查询用户的所有订单(服务端流)
     */
    public List<Order> getUserOrders(Long userId) {
        GetUserOrdersRequest request = GetUserOrdersRequest.newBuilder()
                .setUserId(userId)
                .build();

        List<Order> orders = new ArrayList<>();

        // 接收流式响应
        Iterator<Order> iterator = orderServiceStub.getUserOrders(request);
        while (iterator.hasNext()) {
            orders.add(iterator.next());
        }

        return orders;
    }
}

gRPC的优缺点

优势

  1. 高性能:Protobuf序列化比JSON快5-10倍
  2. 强类型:编译时检查,减少运行时错误
  3. 双向流:支持客户端流、服务端流、双向流
  4. HTTP/2:多路复用、头部压缩、服务端推送

劣势

  1. 学习成本高:需要学习Protocol Buffers
  2. 可调试性差:二进制协议,无法直接查看
  3. 浏览器支持差:浏览器不能直接调用gRPC
  4. 生态不如REST成熟:工具和库相对较少

2.3 Apache Dubbo:阿里巴巴开源的RPC框架

什么是Dubbo?

Dubbo是阿里巴巴开源的高性能Java RPC框架,已捐献给Apache基金会。

核心特性

  1. 高性能:基于Netty,支持多种序列化协议(Hessian、Protobuf、Kryo)
  2. 服务治理:服务注册、发现、负载均衡、容错
  3. 易用性:基于Spring Boot,零配置快速接入
  4. 扩展性:插件化设计,支持自定义扩展

Dubbo服务定义

1. 定义接口

/**
 * 订单服务接口
 */
public interface OrderService {

    /**
     * 创建订单
     */
    Order createOrder(CreateOrderRequest request);

    /**
     * 查询订单
     */
    Order getOrder(String orderNo);

    /**
     * 查询用户的所有订单
     */
    List<Order> getUserOrders(Long userId);
}

2. 实现服务提供者

/**
 * 订单服务实现(Dubbo服务提供者)
 */
@DubboService(
    version = "1.0.0",
    timeout = 5000,
    retries = 2
)
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Override
    public Order createOrder(CreateOrderRequest request) {
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setQuantity(request.getQuantity());

        return orderRepository.save(order);
    }

    @Override
    public Order getOrder(String orderNo) {
        return orderRepository.findByOrderNo(orderNo);
    }

    @Override
    public List<Order> getUserOrders(Long userId) {
        return orderRepository.findByUserId(userId);
    }
}

3. 配置服务提供者

# application.yml(服务提供者)
dubbo:
  application:
    name: order-service
  protocol:
    name: dubbo
    port: 20880
  registry:
    address: nacos://127.0.0.1:8848
  provider:
    timeout: 5000
    retries: 2

4. 实现服务消费者

/**
 * 订单服务消费者
 */
@Service
public class OrderServiceConsumer {

    @DubboReference(
        version = "1.0.0",
        timeout = 5000,
        retries = 2,
        loadbalance = "random"  // 负载均衡策略
    )
    private OrderService orderService;

    public Order createOrder(CreateOrderRequest request) {
        return orderService.createOrder(request);
    }

    public Order getOrder(String orderNo) {
        return orderService.getOrder(orderNo);
    }
}

5. 配置服务消费者

# application.yml(服务消费者)
dubbo:
  application:
    name: order-consumer
  registry:
    address: nacos://127.0.0.1:8848
  consumer:
    timeout: 5000
    retries: 2
    check: false  # 启动时不检查服务是否可用

Dubbo的优缺点

优势

  1. 高性能:基于Netty,支持多种序列化协议
  2. 服务治理完善:负载均衡、容错、降级、限流
  3. 易用性高:基于Spring Boot,零配置快速接入
  4. 生态成熟:阿里系大规模生产验证

劣势

  1. Java绑定:主要支持Java(虽然有多语言版本,但不成熟)
  2. 学习成本:需要学习Dubbo特有的概念
  3. 社区活跃度:相比gRPC,社区相对较小

三、异步通信:消息队列(RabbitMQ、RocketMQ、Kafka)

3.1 为什么需要消息队列?

同步调用的问题

订单服务 → 库存服务(同步调用,阻塞等待20秒)

问题:
1. 订单服务线程阻塞20秒
2. 库存服务压力大,响应慢
3. 订单服务线程池耗尽,系统崩溃

异步消息的解决方案

订单服务 → 消息队列 → 库存服务

优势:
1. 订单服务发送消息后立即返回(不阻塞)
2. 库存服务异步消费消息(自己的节奏)
3. 消息队列削峰填谷(高峰时缓冲消息)

3.2 消息队列的三种模型

模型1:点对点(Point-to-Point)

特点

  • 一对一通信
  • 消息只能被一个消费者消费
  • 消费后消息删除

典型场景:任务分发

                  消息队列(Queue)
Producer → [M1, M2, M3, M4, M5] → Consumer1(消费M1, M2)
                                → Consumer2(消费M3, M4)
                                → Consumer3(消费M5)

模型2:发布订阅(Publish/Subscribe)

特点

  • 一对多通信
  • 消息可以被多个消费者消费
  • 消费后消息不删除(每个消费者独立消费)

典型场景:领域事件

                   Topic(主题)
Publisher → [OrderCreatedEvent] → Subscriber1(库存服务)
                                 → Subscriber2(积分服务)
                                 → Subscriber3(通知服务)

模型3:请求响应(Request/Response)

特点

  • 双向通信
  • 请求者发送消息,等待响应
  • 响应者处理消息,返回结果

典型场景:异步RPC

Requester → Request Queue → Worker
           ← Reply Queue  ←

3.3 RabbitMQ:基于AMQP协议的消息队列

RabbitMQ的核心概念

Producer → Exchange → Binding → Queue → Consumer
           (交换机)   (绑定)   (队列)

核心组件

  1. Producer:消息生产者
  2. Exchange:交换机(根据路由规则分发消息)
  3. Binding:绑定(交换机与队列的关系)
  4. Queue:队列(存储消息)
  5. Consumer:消息消费者

RabbitMQ的四种交换机类型

1. Direct Exchange(直连交换机)

Producer → Direct Exchange
              |
              ├─ routing_key="order.create" → Queue1
              ├─ routing_key="order.pay" → Queue2
              └─ routing_key="order.cancel" → Queue3

2. Fanout Exchange(扇出交换机)

Producer → Fanout Exchange
              |
              ├─ Queue1(全部消息)
              ├─ Queue2(全部消息)
              └─ Queue3(全部消息)

3. Topic Exchange(主题交换机)

Producer → Topic Exchange
              |
              ├─ routing_key="order.*" → Queue1(匹配order.create、order.pay等)
              ├─ routing_key="*.create" → Queue2(匹配order.create、product.create等)
              └─ routing_key="order.#" → Queue3(匹配order开头的所有消息)

4. Headers Exchange(头交换机)

根据消息头部属性路由(很少使用)

RabbitMQ代码示例

1. 生产者(发送消息)

/**
 * 订单服务:发送订单创建事件
 */
@Service
public class OrderEventPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String EXCHANGE = "order.topic.exchange";
    private static final String ROUTING_KEY = "order.created";

    /**
     * 发布订单创建事件
     */
    public void publishOrderCreatedEvent(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
                .orderNo(order.getOrderNo())
                .userId(order.getUserId())
                .amount(order.getAmount())
                .createTime(LocalDateTime.now())
                .build();

        // 发送消息到交换机
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, event);

        log.info("发布订单创建事件: {}", event);
    }
}

2. 消费者(接收消息)

/**
 * 库存服务:监听订单创建事件
 */
@Component
public class OrderCreatedEventListener {

    @Autowired
    private InventoryService inventoryService;

    /**
     * 监听订单创建事件,扣减库存
     */
    @RabbitListener(queues = "order.created.queue")
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        log.info("接收到订单创建事件: {}", event);

        try {
            // 扣减库存
            inventoryService.deduct(event.getProductId(), event.getQuantity());
            log.info("库存扣减成功: orderNo={}", event.getOrderNo());

        } catch (Exception e) {
            log.error("库存扣减失败: orderNo={}, error={}",
                event.getOrderNo(), e.getMessage());
            throw e;  // 重新抛出异常,触发重试
        }
    }
}

3. 配置交换机和队列

/**
 * RabbitMQ配置
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 声明交换机
     */
    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange("order.topic.exchange", true, false);
    }

    /**
     * 声明队列
     */
    @Bean
    public Queue orderCreatedQueue() {
        return new Queue("order.created.queue", true, false, false);
    }

    /**
     * 绑定交换机和队列
     */
    @Bean
    public Binding orderCreatedBinding() {
        return BindingBuilder
                .bind(orderCreatedQueue())
                .to(orderTopicExchange())
                .with("order.created");
    }
}

3.4 RocketMQ:阿里巴巴开源的消息队列

RocketMQ的核心特性

  1. 高性能:单机TPS 10万+
  2. 高可用:主从自动切换
  3. 事务消息:支持分布式事务
  4. 延时消息:支持定时投递
  5. 顺序消息:保证消息顺序
  6. 消息过滤:支持Tag过滤

RocketMQ代码示例

1. 生产者(发送消息)

/**
 * 订单服务:发送订单创建事件(RocketMQ)
 */
@Service
public class OrderEventPublisher {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private static final String TOPIC = "ORDER_CREATED_TOPIC";

    /**
     * 发布订单创建事件
     */
    public void publishOrderCreatedEvent(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
                .orderNo(order.getOrderNo())
                .userId(order.getUserId())
                .amount(order.getAmount())
                .createTime(LocalDateTime.now())
                .build();

        // 同步发送消息
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, event);

        log.info("发布订单创建事件成功: msgId={}", sendResult.getMsgId());
    }

    /**
     * 异步发送消息
     */
    public void publishOrderCreatedEventAsync(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
                .orderNo(order.getOrderNo())
                .userId(order.getUserId())
                .build();

        // 异步发送消息
        rocketMQTemplate.asyncSend(TOPIC, event, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功: msgId={}", sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                log.error("发送失败: {}", e.getMessage());
            }
        });
    }

    /**
     * 发送延时消息(例如:30分钟后自动取消订单)
     */
    public void publishOrderCancelEvent(Order order) {
        OrderCancelEvent event = OrderCancelEvent.builder()
                .orderNo(order.getOrderNo())
                .build();

        // 发送延时消息(延时级别:1秒、5秒、10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、20分钟、30分钟、1小时、2小时)
        Message<OrderCancelEvent> message = MessageBuilder
                .withPayload(event)
                .build();

        rocketMQTemplate.syncSend(
            "ORDER_CANCEL_TOPIC",
            message,
            3000,  // 发送超时时间
            16     // 延时级别:30分钟
        );
    }
}

2. 消费者(接收消息)

/**
 * 库存服务:监听订单创建事件(RocketMQ)
 */
@Service
@RocketMQMessageListener(
    topic = "ORDER_CREATED_TOPIC",
    consumerGroup = "inventory-consumer-group",
    selectorExpression = "*"  // Tag过滤(*表示所有Tag)
)
public class OrderCreatedEventListener implements RocketMQListener<OrderCreatedEvent> {

    @Autowired
    private InventoryService inventoryService;

    @Override
    public void onMessage(OrderCreatedEvent event) {
        log.info("接收到订单创建事件: {}", event);

        try {
            // 扣减库存
            inventoryService.deduct(event.getProductId(), event.getQuantity());
            log.info("库存扣减成功: orderNo={}", event.getOrderNo());

        } catch (Exception e) {
            log.error("库存扣减失败: orderNo={}, error={}",
                event.getOrderNo(), e.getMessage());
            throw e;  // 重新抛出异常,触发重试
        }
    }
}

3. 事务消息(分布式事务)

/**
 * 订单服务:发送事务消息
 */
@Service
public class OrderTransactionPublisher {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void publishTransactionMessage(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
                .orderNo(order.getOrderNo())
                .build();

        // 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "ORDER_CREATED_TOPIC",
            MessageBuilder.withPayload(event).build(),
            order  // 传递给事务监听器的参数
        );

        log.info("事务消息发送结果: {}", result.getLocalTransactionState());
    }
}

/**
 * 事务监听器
 */
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = (Order) arg;

            // 执行本地事务(创建订单)
            orderService.createOrder(order);

            // 本地事务成功,提交消息
            return RocketMQLocalTransactionState.COMMIT;

        } catch (Exception e) {
            log.error("本地事务执行失败: {}", e.getMessage());
            // 本地事务失败,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 检查本地事务状态(用于消息回查)
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 根据消息ID查询本地事务状态
        String orderNo = extractOrderNo(msg);
        Order order = orderService.getOrder(orderNo);

        if (order != null) {
            // 订单存在,提交消息
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 订单不存在,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

3.5 Apache Kafka:高吞吐量的分布式消息系统

Kafka的核心特性

  1. 高吞吐量:单机TPS 100万+
  2. 持久化:消息持久化到磁盘
  3. 分布式:支持分区、复制
  4. 高可用:故障自动恢复
  5. 流处理:支持Kafka Streams

Kafka的核心概念

Producer → Topic(主题)
            |
            ├─ Partition 0(分区0)→ Consumer Group 1
            ├─ Partition 1(分区1)→ Consumer Group 1
            └─ Partition 2(分区2)→ Consumer Group 1

核心组件

  1. Producer:消息生产者
  2. Topic:主题(消息分类)
  3. Partition:分区(并行处理)
  4. Consumer:消息消费者
  5. Consumer Group:消费者组(负载均衡)

Kafka代码示例

1. 生产者(发送消息)

/**
 * 订单服务:发送订单创建事件(Kafka)
 */
@Service
public class OrderEventPublisher {

    @Autowired
    private KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

    private static final String TOPIC = "order-created-topic";

    /**
     * 发布订单创建事件
     */
    public void publishOrderCreatedEvent(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
                .orderNo(order.getOrderNo())
                .userId(order.getUserId())
                .amount(order.getAmount())
                .createTime(LocalDateTime.now())
                .build();

        // 发送消息(异步)
        kafkaTemplate.send(TOPIC, order.getOrderNo(), event)
                .addCallback(
                    result -> log.info("发送成功: partition={}, offset={}",
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset()),
                    ex -> log.error("发送失败: {}", ex.getMessage())
                );
    }
}

2. 消费者(接收消息)

/**
 * 库存服务:监听订单创建事件(Kafka)
 */
@Service
public class OrderCreatedEventListener {

    @Autowired
    private InventoryService inventoryService;

    /**
     * 监听订单创建事件
     */
    @KafkaListener(
        topics = "order-created-topic",
        groupId = "inventory-consumer-group"
    )
    public void handleOrderCreatedEvent(ConsumerRecord<String, OrderCreatedEvent> record) {
        OrderCreatedEvent event = record.value();

        log.info("接收到订单创建事件: partition={}, offset={}, event={}",
            record.partition(), record.offset(), event);

        try {
            // 扣减库存
            inventoryService.deduct(event.getProductId(), event.getQuantity());
            log.info("库存扣减成功: orderNo={}", event.getOrderNo());

        } catch (Exception e) {
            log.error("库存扣减失败: orderNo={}, error={}",
                event.getOrderNo(), e.getMessage());
            throw e;
        }
    }
}

3.6 三种消息队列的对比

维度RabbitMQRocketMQKafka
TPS1万+10万+100万+
延时微秒级毫秒级毫秒级
可靠性
顺序性支持支持支持(分区内)
事务消息不支持支持不支持
延时消息需要插件支持不支持
消息查询不支持支持不支持
适用场景中小规模、复杂路由大规模、分布式事务大数据、日志采集

四、事件驱动架构:从请求-响应到发布-订阅

4.1 什么是事件驱动架构?

**事件驱动架构(Event-Driven Architecture, EDA)**是一种软件架构模式,系统通过产生、检测、消费事件来进行通信。

核心概念

  • 事件:系统中发生的重要事情(订单已创建、订单已支付)
  • 事件发布者:产生事件的服务
  • 事件订阅者:消费事件的服务
  • 事件总线:传递事件的中间件

传统请求-响应模式

订单服务 → 库存服务(同步调用)
         → 积分服务(同步调用)
         → 通知服务(同步调用)

问题:
1. 订单服务需要知道所有下游服务
2. 任何一个下游服务挂了,订单创建失败
3. 新增下游服务,需要修改订单服务代码

事件驱动模式

订单服务 → 事件总线(发布"订单已创建"事件)
             ↓
      ┌──────┼──────┬──────┐
      ↓      ↓      ↓      ↓
   库存服务 积分服务 通知服务 新服务

优势:
1. 订单服务不需要知道下游服务
2. 下游服务挂了,不影响订单创建
3. 新增下游服务,不需要修改订单服务

4.2 领域事件的设计

领域事件表示业务领域中发生的重要事情。

命名规范

  • 使用过去式:OrderCreatedEvent、OrderPaidEvent(表示已经发生)
  • 包含关键信息:订单号、用户ID、金额等

示例

/**
 * 订单已创建事件
 */
@Data
@Builder
public class OrderCreatedEvent {
    private String orderNo;           // 订单号
    private Long userId;               // 用户ID
    private Long productId;            // 商品ID
    private Integer quantity;          // 数量
    private BigDecimal amount;         // 金额
    private LocalDateTime createTime;  // 创建时间
}

/**
 * 订单已支付事件
 */
@Data
@Builder
public class OrderPaidEvent {
    private String orderNo;
    private Long userId;
    private BigDecimal amount;
    private LocalDateTime payTime;
}

/**
 * 订单已取消事件
 */
@Data
@Builder
public class OrderCancelledEvent {
    private String orderNo;
    private Long userId;
    private String cancelReason;
    private LocalDateTime cancelTime;
}

4.3 事件溯源(Event Sourcing)

事件溯源是一种数据存储模式,将所有状态变更记录为事件序列。

传统模式

订单表:
| order_no | status  | amount | update_time |
|----------|---------|--------|-------------|
| ORD001   | PAID    | 100.00 | 2025-11-03  |

问题:只能看到当前状态,无法知道历史变更

事件溯源模式

事件表:
| event_id | event_type        | order_no | data    | event_time  |
|----------|-------------------|----------|---------|-------------|
| 1        | OrderCreated      | ORD001   | {...}   | 2025-11-03  |
| 2        | OrderPaid         | ORD001   | {...}   | 2025-11-03  |

订单状态 = 重放所有事件:
OrderCreated → status=UNPAID
OrderPaid → status=PAID

优势:
1. 完整的历史记录(审计、回溯)
2. 可以重放事件(重建状态)
3. 支持时间旅行(查看任意时刻的状态)

4.4 CQRS模式(命令查询职责分离)

**CQRS(Command Query Responsibility Segregation)**将系统分为命令端和查询端。

传统模式

用户 → 订单服务(既处理写请求,又处理读请求)
         ↓
      Order表

CQRS模式

用户 → 命令端(OrderCommandService)→ 写入 → Order表
         ↓ 发布事件
      事件总线
         ↓ 订阅事件
用户 → 查询端(OrderQueryService)→ 读取 → Order视图表

优势

  1. 读写分离:写操作优化事务性,读操作优化查询性能
  2. 独立扩展:读多写少场景,可以只扩展查询端
  3. 灵活性:查询端可以使用不同的数据库(如Elasticsearch)

五、总结与最佳实践

5.1 如何选择通信方式?

场景推荐方案理由
需要立即响应RESTful/gRPC同步调用,立即返回结果
可以异步处理消息队列解耦,削峰填谷
高性能要求gRPC/Dubbo二进制序列化,性能高
跨语言调用RESTfulHTTP标准,任何语言都支持
事件通知消息队列发布订阅,解耦
大数据量传输Kafka高吞吐量
分布式事务RocketMQ事务消息支持事务消息

5.2 幂等性设计

什么是幂等性?

幂等性是指同一个操作执行多次,结果和执行一次相同。

为什么需要幂等性?

在分布式系统中,网络不可靠,消息可能重复发送:

Producer → Message → 网络超时 → 重试 → Message重复 → Consumer处理两次

问题:订单创建两次、库存扣减两次、支付扣款两次

幂等性实现方案

方案1:唯一ID去重

@Service
public class OrderService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public Order createOrder(CreateOrderRequest request) {
        String idempotentKey = request.getIdempotentKey();

        // 检查是否已处理
        Boolean success = redisTemplate.opsForValue()
                .setIfAbsent("idempotent:" + idempotentKey, "1", 24, TimeUnit.HOURS);

        if (Boolean.FALSE.equals(success)) {
            // 已处理,直接返回
            return getOrder(idempotentKey);
        }

        // 未处理,创建订单
        Order order = doCreateOrder(request);

        return order;
    }
}

方案2:数据库唯一索引

CREATE TABLE t_order (
    order_no VARCHAR(32) PRIMARY KEY,  -- 订单号(唯一索引)
    user_id BIGINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    ...
);

-- 重复插入会违反唯一索引,抛出异常
INSERT INTO t_order (order_no, user_id, amount) VALUES ('ORD001', 123, 100.00);

方案3:状态机防重

public void payOrder(String orderNo) {
    Order order = getOrder(orderNo);

    // 检查状态(只有未支付的订单才能支付)
    if (order.getStatus() != OrderStatus.UNPAID) {
        throw new BusinessException("订单状态不允许支付");
    }

    // 执行支付
    doPayOrder(order);

    // 更新状态
    order.setStatus(OrderStatus.PAID);
    updateOrder(order);
}

5.3 超时与重试策略

超时设置

@FeignClient(
    name = "order-service",
    configuration = FeignConfig.class
)
public interface OrderServiceClient {
    // ...
}

@Configuration
public class FeignConfig {

    @Bean
    public Request.Options options() {
        return new Request.Options(
            5000,  // 连接超时:5秒
            10000  // 读取超时:10秒
        );
    }
}

重试策略

@Configuration
public class RetryConfig {

    @Bean
    public Retryer retryer() {
        return new Retryer.Default(
            100,   // 初始间隔:100ms
            1000,  // 最大间隔:1秒
            3      // 最大重试次数:3次
        );
    }
}

5.4 服务降级与熔断

降级策略

@FeignClient(
    name = "order-service",
    fallback = OrderServiceFallback.class  // 降级实现
)
public interface OrderServiceClient {
    Order getOrder(String orderNo);
}

@Component
public class OrderServiceFallback implements OrderServiceClient {

    @Override
    public Order getOrder(String orderNo) {
        // 返回默认值或缓存数据
        return Order.builder()
                .orderNo(orderNo)
                .status(OrderStatus.UNKNOWN)
                .build();
    }
}

参考资料

  1. 《微服务架构设计模式》- Chris Richardson
  2. 《Enterprise Integration Patterns》- Gregor Hohpe
  3. 《Kafka权威指南》- Neha Narkhede
  4. RabbitMQ官方文档
  5. RocketMQ官方文档
  6. Apache Kafka官方文档

最后更新时间:2025-11-03