引言:事件驱动架构的演进

传统点对点模式

订单服务 ──────> 库存服务
订单服务 ──────> 积分服务
订单服务 ──────> 通知服务
订单服务 ──────> 物流服务

问题:紧耦合,订单服务需要知道所有下游服务

EventBridge 事件总线模式

订单服务 ──> EventBridge ──> 库存服务
                │          ──> 积分服务
                │          ──> 通知服务
                └──────────> 物流服务

优势:解耦,订单服务只需发布事件,无需关心谁消费

本文目标

  • 理解事件驱动架构核心理念
  • 掌握 EventBridge 使用方法
  • 实现事件路由和过滤
  • 构建完整的事件驱动系统

一、EventBridge 核心概念

1.1 什么是 EventBridge?

定义:事件总线(Event Bus),用于接收、路由、分发事件的中心化服务。

核心组件

┌─────────────────────────────────────────────────┐
│              EventBridge                        │
│                                                 │
│  ┌────────┐    ┌─────────┐    ┌──────────┐    │
│  │ Event  │───>│ Event   │───>│  Rule    │    │
│  │ Source │    │  Bus    │    │ (路由规则)│    │
│  └────────┘    └─────────┘    └────┬─────┘    │
│                                     │          │
│                              ┌──────▼──────┐   │
│                              │   Target    │   │
│                              │  (目标服务)  │   │
│                              └─────────────┘   │
└─────────────────────────────────────────────────┘

1.2 核心概念

概念说明示例
Event事件订单创建事件
Event Source事件源订单服务
Event Bus事件总线中心化的事件分发器
Rule路由规则订单金额 > 1000 的事件
Target目标库存服务、积分服务

二、阿里云 EventBridge

2.1 快速开始

1. 创建事件总线

控制台 → 事件总线 EventBridge → 创建事件总线
→ 名称:order-event-bus
→ 描述:订单事件总线
→ 创建

2. 创建事件源

事件总线详情 → 事件源 → 创建事件源
→ 名称:order-service-source
→ 类型:自定义应用
→ 创建

3. 创建事件规则

事件总线详情 → 事件规则 → 创建规则
→ 名称:high-value-order-rule
→ 事件模式:
{
  "source": ["order-service"],
  "type": ["OrderCreated"],
  "data": {
    "amount": [{
      "numeric": [">", 1000]
    }]
  }
}
→ 目标:RocketMQ Topic
→ 创建

2.2 Java SDK 使用

Maven 依赖

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>eventbridge-sdk</artifactId>
    <version>1.3.10</version>
</dependency>

发布事件

import com.aliyun.eventbridge.EventBridgeClient;
import com.aliyun.eventbridge.models.*;

public class EventPublisher {

    public static void main(String[] args) {
        // 1. 创建客户端
        Config config = new Config();
        config.setAccessKeyId("YOUR_ACCESS_KEY");
        config.setAccessKeySecret("YOUR_SECRET_KEY");
        config.setEndpoint("eventbridge.cn-hangzhou.aliyuncs.com");

        EventBridgeClient client = new EventBridgeClient(config);

        // 2. 构建事件
        CloudEvent event = new CloudEvent();
        event.setId(UUID.randomUUID().toString());
        event.setSource("order-service");
        event.setType("OrderCreated");
        event.setSpecversion("1.0");
        event.setDatacontenttype("application/json");

        // 事件数据
        Map<String, Object> data = new HashMap<>();
        data.put("orderId", "ORDER_12345");
        data.put("userId", "USER_001");
        data.put("amount", 1500.0);
        data.put("createTime", System.currentTimeMillis());
        event.setData(JSON.toJSONString(data).getBytes());

        // 3. 发布事件
        PutEventsRequest request = new PutEventsRequest();
        request.setEventBusName("order-event-bus");
        request.setEvents(Arrays.asList(event));

        PutEventsResponse response = client.putEvents(request);
        System.out.println("事件发布成功:" + response.getRequestId());
    }
}

三、事件路由规则

3.1 基于内容的路由

场景1:按事件类型路由

{
  "source": ["order-service"],
  "type": ["OrderCreated"]
}

→ 路由到:库存服务、积分服务


场景2:按金额路由

{
  "source": ["order-service"],
  "type": ["OrderCreated"],
  "data": {
    "amount": [{
      "numeric": [">", 1000]
    }]
  }
}

→ 路由到:VIP 服务通道


场景3:组合条件

{
  "source": ["order-service"],
  "type": ["OrderCreated"],
  "data": {
    "amount": [{
      "numeric": [">", 1000]
    }],
    "userLevel": ["VIP", "SVIP"]
  }
}

→ 路由到:VIP 运营团队


3.2 事件转换

场景:事件数据转换

// 原始事件
{
  "orderId": "ORDER_12345",
  "userId": "USER_001",
  "amount": 1500.0
}

// 转换后
{
  "order_no": "ORDER_12345",
  "user_id": "USER_001",
  "total_amount": 1500.0,
  "currency": "CNY"
}

配置转换规则

事件规则 → 目标配置 → 数据转换
→ 使用模板:
{
  "order_no": "$.orderId",
  "user_id": "$.userId",
  "total_amount": "$.amount",
  "currency": "CNY"
}

四、实战案例

4.1 电商订单事件驱动

架构

订单服务 ──> EventBridge
              │
              ├──> 库存服务(扣减库存)
              ├──> 积分服务(发放积分)
              ├──> 通知服务(发送通知)
              └──> 数据分析(实时报表)

1. 订单服务发布事件

@Service
public class OrderService {

    @Autowired
    private EventBridgeClient eventBridgeClient;

    public void createOrder(Order order) {
        // 1. 创建订单
        orderRepository.save(order);

        // 2. 发布订单创建事件
        CloudEvent event = new CloudEvent();
        event.setId(UUID.randomUUID().toString());
        event.setSource("order-service");
        event.setType("OrderCreated");
        event.setSpecversion("1.0");

        Map<String, Object> data = new HashMap<>();
        data.put("orderId", order.getId());
        data.put("userId", order.getUserId());
        data.put("amount", order.getAmount());
        data.put("items", order.getItems());
        event.setData(JSON.toJSONString(data).getBytes());

        PutEventsRequest request = new PutEventsRequest();
        request.setEventBusName("order-event-bus");
        request.setEvents(Arrays.asList(event));

        eventBridgeClient.putEvents(request);

        log.info("订单创建事件已发布,orderId={}", order.getId());
    }
}

2. 库存服务消费事件

@RocketMQMessageListener(
    topic = "order-created-topic",
    consumerGroup = "inventory-service-group"
)
public class InventoryEventHandler implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        CloudEvent event = JSON.parseObject(message, CloudEvent.class);
        Map<String, Object> data = JSON.parseObject(
            new String(event.getData()),
            Map.class
        );

        String orderId = (String) data.get("orderId");
        List<OrderItem> items = (List<OrderItem>) data.get("items");

        // 扣减库存
        for (OrderItem item : items) {
            inventoryService.reduceStock(item.getProductId(), item.getQuantity());
        }

        log.info("库存扣减完成,orderId={}", orderId);
    }
}

3. 积分服务消费事件

@RocketMQMessageListener(
    topic = "order-created-topic",
    consumerGroup = "points-service-group"
)
public class PointsEventHandler implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        CloudEvent event = JSON.parseObject(message, CloudEvent.class);
        Map<String, Object> data = JSON.parseObject(
            new String(event.getData()),
            Map.class
        );

        String userId = (String) data.get("userId");
        Double amount = (Double) data.get("amount");

        // 计算积分(订单金额 × 0.01)
        int points = (int) (amount * 0.01);
        pointsService.addPoints(userId, points);

        log.info("积分发放完成,userId={}, points={}", userId, points);
    }
}

4.2 多事件编排

场景:订单完整生命周期

OrderCreated      → 库存扣减、积分发放
OrderPaid         → 通知商家、物流发货
OrderShipped      → 通知用户、更新状态
OrderCompleted    → 评价提醒、优惠券发放
OrderCancelled    → 库存回滚、积分回收

事件规则配置

// 规则1:订单创建
{
  "type": ["OrderCreated"]
}  库存服务、积分服务

// 规则2:订单支付
{
  "type": ["OrderPaid"]
}  商家服务、物流服务

// 规则3:订单取消
{
  "type": ["OrderCancelled"]
}  库存服务(回滚)、积分服务(回收)

五、最佳实践

5.1 事件设计原则

1. CloudEvents 标准

{
  "specversion": "1.0",
  "id": "unique-event-id",
  "source": "order-service",
  "type": "OrderCreated",
  "datacontenttype": "application/json",
  "time": "2025-11-15T10:00:00Z",
  "data": {
    "orderId": "ORDER_12345",
    "amount": 1500.0
  }
}

2. 事件命名规范

格式:<资源>.<动作>
示例:
- OrderCreated
- OrderPaid
- OrderShipped
- OrderCancelled
- UserRegistered
- ProductUpdated

3. 事件版本管理

{
  "type": "OrderCreated",
  "dataschemaversion": "1.0",  // 事件版本
  "data": {
    "orderId": "ORDER_12345"
  }
}

5.2 幂等性保证

@Service
public class EventHandler {

    @Autowired
    private ProcessedEventRepository eventRepository;

    public void handleEvent(CloudEvent event) {
        String eventId = event.getId();

        // 1. 检查事件是否已处理
        if (eventRepository.exists(eventId)) {
            log.warn("事件已处理,跳过,eventId={}", eventId);
            return;
        }

        // 2. 处理事件
        try {
            processEvent(event);

            // 3. 记录已处理
            eventRepository.save(eventId);
        } catch (Exception e) {
            log.error("事件处理失败,eventId={}", eventId, e);
            throw e;
        }
    }
}

5.3 错误处理

死信队列

事件规则 → 目标配置 → 失败处理
→ 最大重试次数:3
→ 死信队列:event-dlq-topic

监控告警

CloudWatch / 云监控:
- 事件发布失败率
- 事件处理延迟
- 死信队列堆积量

六、EventBridge vs RocketMQ

对比项EventBridgeRocketMQ
定位事件路由消息队列
解耦程度高(发布者无需知道订阅者)中(需指定 Topic)
路由能力强(基于内容路由)
事件转换支持不支持
多目标一对多路由需多次发送
成本较高较低

选择建议

  • 需要复杂路由 → EventBridge
  • 简单消息传递 → RocketMQ
  • 混合使用 → EventBridge 路由到 RocketMQ

七、总结

EventBridge 核心价值

1. 解耦:发布者与订阅者完全解耦
2. 灵活路由:基于内容的智能路由
3. 事件转换:统一事件格式
4. 多目标:一个事件路由到多个服务

适用场景

推荐使用

  • 微服务事件驱动
  • 多系统集成
  • 复杂事件路由
  • Serverless 架构

不推荐使用

  • 简单点对点消息
  • 超高 TPS(> 10 万/秒)
  • 对成本敏感

下一篇预告:《多语言客户端对比 - Java、Go、Python、Node.js》,探索 RocketMQ 在不同语言生态中的使用。

本文关键词EventBridge 事件驱动 事件路由 微服务解耦