RocketMQ云原生06:多语言客户端对比 - Java、Go、Python、Node.js

引言:多语言生态 Java 一统天下的时代已过,微服务团队使用多种语言: Java:企业级应用 Go:高性能服务 Python:数据分析、AI Node.js:前端 BFF、实时服务 本文目标: 对比各语言客户端特性 掌握不同语言的使用方法 了解性能差异 选择合适的客户端 一、客户端对比 1.1 官方支持度 语言 官方支持 成熟度 社区活跃度 推荐度 Java ✅ 官方 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Go ✅ 官方 ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ Python ✅ 官方 ⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐ Node.js ⚠️ 社区 ⭐⭐⭐ ⭐⭐ ⭐⭐⭐ C++ ✅ 官方 ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐ .NET ⚠️ 社区 ⭐⭐ ⭐ ⭐⭐ 1.2 功能对比 | 功能 | Java | Go | Python | Node.js | |——|——|—-|—– —|———| | 同步发送 | ✅ | ✅ | ✅ | ✅ | | 异步发送 | ✅ | ✅ | ✅ | ✅ | | 顺序消息 | ✅ | ✅ | ✅ | ⚠️ | | 事务消息 | ✅ | ✅ | ❌ | ❌ | | 延迟消息 | ✅ | ✅ | ✅ | ✅ | | 批量消息 | ✅ | ✅ | ✅ | ✅ | | Pull 消费 | ✅ | ✅ | ✅ | ✅ | | Push 消费 | ✅ | ✅ | ✅ | ✅ | ...

2025-11-15 · maneng

RocketMQ云原生05:EventBridge 事件总线 - 事件驱动架构实践

引言:事件驱动架构的演进 传统点对点模式: 订单服务 ──────> 库存服务 订单服务 ──────> 积分服务 订单服务 ──────> 通知服务 订单服务 ──────> 物流服务 问题:紧耦合,订单服务需要知道所有下游服务 EventBridge 事件总线模式: 订单服务 ──> EventBridge ──> 库存服务 │ ──> 积分服务 │ ──> 通知服务 └──────────> 物流服务 优势:解耦,订单服务只需发布事件,无需关心谁消费 本文目标: 理解事件驱动架构核心理念 掌握 EventBridge 使用方法 实现事件路由和过滤 构建完整的事件驱动系统 一、EventBridge 核心概念 1.1 什么是 EventBridge? 定义:事件总线(Event Bus),用于接收、路由、分发事件的中心化服务。 核心组件: ┌─────────────────────────────────────────────────┐ │ EventBridge │ │ │ │ ┌────────┐ ┌─────────┐ ┌──────────┐ │ │ │ Event │───>│ Event │───>│ Rule │ │ │ │ Source │ │ Bus │ │ (路由规则)│ │ │ └────────┘ └─────────┘ └────┬─────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ Target │ │ │ │ (目标服务) │ │ │ └─────────────┘ │ └─────────────────────────────────────────────────┘ 1.2 核心概念 概念 说明 示例 Event 事件 订单创建事件 Event Source 事件源 订单服务 Event Bus 事件总线 中心化的事件分发器 Rule 路由规则 订单金额 > 1000 的事件 Target 目标 库存服务、积分服务 二、阿里云 EventBridge 2.1 快速开始 1. 创建事件总线 ...

2025-11-15 · maneng

RocketMQ云原生04:RocketMQ Streams - 轻量级流处理框架

引言:流处理的简化之道 Flink/Spark Streaming:功能强大,但部署复杂,资源消耗大。 RocketMQ Streams:轻量级,与 RocketMQ 深度集成,开箱即用。 本文目标: 理解 RocketMQ Streams 核心概念 掌握流处理基本操作 实现实时数据分析 对比其他流处理框架 一、RocketMQ Streams 简介 1.1 什么是 RocketMQ Streams? 定义:基于 RocketMQ 的轻量级流处理框架,提供类似 Kafka Streams 的 API。 架构: ┌─────────────────────────────────────────┐ │ RocketMQ Streams Application │ │ │ │ ┌──────────┐ ┌──────────┐ ┌───────┐ │ │ │ Source │─>│ Process │─>│ Sink │ │ │ │(消费消息)│ │(转换处理)│ │(输出) │ │ │ └────┬─────┘ └──────────┘ └───────┘ │ └───────┼─────────────────────────────────┘ │ ┌───────▼─────────────────────────────────┐ │ RocketMQ Cluster │ │ ┌──────────┐ ┌──────────┐ │ │ │ Topic A │ │ Topic B │ │ │ └──────────┘ └──────────┘ │ └─────────────────────────────────────────┘ 1.2 核心特性 特性 RocketMQ Streams Kafka Streams Flink 轻量级 ✅ 极轻量 ✅ 轻量 ❌ 重量级 部署复杂度 低(无需额外组件) 低 高 学习成本 低 中 高 状态管理 简单 中等 复杂 适用场景 简单流处理 中等复杂度 复杂流处理 二、快速开始 2.1 Maven 依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.0.0</version> </dependency> 2.2 第一个 Streams 应用 import org.apache.rocketmq.streams.core.RocketMQStream; import org.apache.rocketmq.streams.core.topology.TopologyBuilder; public class WordCountExample { public static void main(String[] args) { // 1. 配置 RocketMQ 连接 Properties properties = new Properties(); properties.put("nameserver.address", "127.0.0.1:9876"); properties.put("group.id", "stream_app_group"); // 2. 创建 TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); // 3. 定义流处理逻辑 builder // 从 Topic 读取消息 .source("input_topic") // 分词 .flatMap((key, value) -> { List<KeyValue<String, String>> result = new ArrayList<>(); for (String word : value.toString().split(" ")) { result.add(new KeyValue<>(word, "1")); } return result; }) // 按单词分组计数 .groupBy(kv -> kv.getKey()) .count() // 输出到 Topic .to("output_topic"); // 4. 启动 Streams 应用 RocketMQStream stream = new RocketMQStream(builder.build(), properties); stream.start(); System.out.println("Streams 应用已启动"); } } 三、核心操作 3.1 Source(数据源) // 从单个 Topic 读取 builder.source("input_topic") // 从多个 Topic 读取 builder.source("topic1", "topic2", "topic3") // 指定起始位置 builder.source("input_topic", ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET) 3.2 转换操作 3.2.1 Map(一对一映射) builder .source("user_topic") .map((key, value) -> { User user = JSON.parseObject(value, User.class); return new KeyValue<>(user.getId(), user.getName().toUpperCase()); }); 3.2.2 FlatMap(一对多映射) builder .source("sentence_topic") .flatMap((key, value) -> { List<KeyValue<String, Integer>> words = new ArrayList<>(); for (String word : value.split(" ")) { words.add(new KeyValue<>(word, 1)); } return words; }); 3.2.3 Filter(过滤) builder .source("order_topic") .filter((key, value) -> { Order order = JSON.parseObject(value, Order.class); return order.getAmount() > 100; // 只保留金额 > 100 的订单 }); 3.3 聚合操作 3.3.1 GroupBy + Count builder .source("click_topic") .map((key, value) -> { ClickEvent event = JSON.parseObject(value, ClickEvent.class); return new KeyValue<>(event.getUserId(), 1); }) .groupBy(kv -> kv.getKey()) .count() // 统计每个用户的点击次数 .to("click_count_topic"); 3.3.2 GroupBy + Sum builder .source("order_topic") .map((key, value) -> { Order order = JSON.parseObject(value, Order.class); return new KeyValue<>(order.getUserId(), order.getAmount()); }) .groupBy(kv -> kv.getKey()) .sum() // 统计每个用户的订单总金额 .to("user_amount_topic"); 3.4 窗口操作 3.4.1 滚动窗口(Tumbling Window) builder .source("metric_topic") .groupBy(kv -> kv.getKey()) .window(TumblingWindow.of(Duration.ofMinutes(5))) // 5 分钟窗口 .count() .to("metric_count_topic"); 3.4.2 滑动窗口(Sliding Window) builder .source("traffic_topic") .groupBy(kv -> kv.getKey()) .window(SlidingWindow.of( Duration.ofMinutes(10), // 窗口大小 Duration.ofMinutes(1) // 滑动步长 )) .sum() .to("traffic_sum_topic"); 3.5 Join 操作 // Stream-Stream Join RocketMQStream stream1 = builder.source("order_topic"); RocketMQStream stream2 = builder.source("payment_topic"); stream1 .join(stream2, (order, payment) -> { // 关联订单和支付信息 return new OrderPayment(order, payment); }, JoinWindows.of(Duration.ofMinutes(5)) // 5 分钟时间窗口内关联 ) .to("order_payment_topic"); 四、实战案例 4.1 实时订单金额统计 /** * 需求:实时统计每分钟的订单总金额 */ public class OrderAmountAggregation { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder // 1. 从订单 Topic 读取 .source("order_topic") // 2. 解析订单,提取金额 .map((key, value) -> { Order order = JSON.parseObject(value, Order.class); return new KeyValue<>("total", order.getAmount()); }) // 3. 按 key 分组(这里都是 "total") .groupBy(kv -> kv.getKey()) // 4. 1 分钟滚动窗口聚合 .window(TumblingWindow.of(Duration.ofMinutes(1))) .reduce((v1, v2) -> v1 + v2) // 5. 输出结果 .map((key, value) -> { Map<String, Object> result = new HashMap<>(); result.put("minute", System.currentTimeMillis() / 60000); result.put("totalAmount", value); return new KeyValue<>(key, JSON.toJSONString(result)); }) .to("order_amount_stat_topic"); // 启动 Properties props = new Properties(); props.put("nameserver.address", "127.0.0.1:9876"); props.put("group.id", "order_amount_app"); RocketMQStream stream = new RocketMQStream(builder.build(), props); stream.start(); } } 4.2 实时用户行为分析 /** * 需求:识别 5 分钟内连续点击超过 100 次的用户(异常行为) */ public class AbnormalUserDetection { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder .source("click_topic") // 解析点击事件 .map((key, value) -> { ClickEvent event = JSON.parseObject(value, ClickEvent.class); return new KeyValue<>(event.getUserId(), 1); }) // 按用户分组 .groupBy(kv -> kv.getKey()) // 5 分钟滚动窗口计数 .window(TumblingWindow.of(Duration.ofMinutes(5))) .count() // 过滤出异常用户 .filter((userId, count) -> count > 100) // 告警 .foreach((userId, count) -> { System.out.println("告警:用户 " + userId + " 5分钟内点击 " + count + " 次"); // 发送告警消息 alertService.sendAlert(userId, count); }); RocketMQStream stream = new RocketMQStream(builder.build(), properties); stream.start(); } } 五、状态管理 5.1 本地状态存储 // 配置本地状态存储 properties.put("state.dir", "/tmp/rocketmq-streams-state"); // 使用状态 builder .source("event_topic") .groupBy(kv -> kv.getKey()) .aggregate( () -> 0, // 初始值 (key, newValue, aggValue) -> aggValue + newValue, // 累加 Materialized.as("event-count-store") // 状态存储名称 ); 5.2 查询状态 // 查询状态存储 ReadOnlyKeyValueStore<String, Long> store = stream.store("event-count-store", QueryableStoreTypes.keyValueStore()); Long count = store.get("user123"); System.out.println("用户 user123 的事件数:" + count); 六、最佳实践 6.1 性能优化 // 1. 批量处理 properties.put("batch.size", 100); // 2. 异步处理 properties.put("async.enabled", true); // 3. 并行度配置 properties.put("parallelism", 4); 6.2 异常处理 builder .source("input_topic") .map((key, value) -> { try { return process(value); } catch (Exception e) { // 记录错误日志 logger.error("处理失败", e); // 发送到错误 Topic sendToErrorTopic(value, e); return null; } }) .filter(Objects::nonNull); // 过滤失败的消息 6.3 监控指标 // 配置监控 properties.put("metrics.enabled", true); properties.put("metrics.reporters", "org.apache.rocketmq.streams.metrics.PrometheusReporter"); // 暴露指标 // - streams_records_consumed_total // - streams_records_produced_total // - streams_process_latency_ms 七、与其他框架对比 对比项 RocketMQ Streams Kafka Streams Flink 部署 无需额外组件 无需额外组件 需独立集群 状态后端 本地 RocksDB 本地 RocksDB 多种选择 Exactly-Once 支持 支持 支持 窗口类型 滚动/滑动 滚动/滑动/会话 全面支持 CEP 支持 ❌ ❌ ✅ SQL 支持 ❌ ✅ ✅ 学习成本 低 中 高 选择建议: ...

2025-11-15 · maneng

RocketMQ云原生03:Serverless 消息队列 - 按需使用的新模式

引言:Serverless 的魅力 传统模式:搭建集群、调优参数、监控运维…还要为闲置资源付费。 Serverless 模式:直接使用,按消息数量付费,无需关心底层基础设施。 Serverless 的核心理念: ✅ 零运维(NoOps) ✅ 按量付费(Pay-as-you-go) ✅ 自动弹性伸缩 ✅ 专注业务逻辑 本文目标: 理解 Serverless MQ 架构 对比云厂商 Serverless 方案 实战阿里云/AWS 的 Serverless MQ 掌握最佳实践 一、Serverless MQ 架构 1.1 传统 vs Serverless 传统模式: ┌─────────────────────────────────────┐ │ 用户负责 │ │ ├─ 集群规划 │ │ ├─ 容量评估 │ │ ├─ 部署运维 │ │ ├─ 监控告警 │ │ ├─ 故障处理 │ │ └─ 性能调优 │ └─────────────────────────────────────┘ Serverless 模式: ┌─────────────────────────────────────┐ │ 云厂商负责 │ │ ├─ 自动扩缩容 │ │ ├─ 高可用保障 │ │ ├─ 监控运维 │ │ └─ 性能优化 │ └─────────────────────────────────────┘ ┌─────────────────────────────────────┐ │ 用户只需 │ │ └─ 发送/接收消息 │ └─────────────────────────────────────┘ 1.2 Serverless MQ 特点 特性 传统模式 Serverless 模式 运维复杂度 高 零 成本模式 按实例付费 按消息数量付费 扩容速度 分钟级 秒级 最小成本 固定成本 0(无消息时无费用) 技术门槛 高 低 二、主流云厂商方案 2.1 阿里云 - 消息队列 RocketMQ 版(Serverless) 产品介绍: ...

2025-11-15 · maneng

RocketMQ云原生02:Operator 运维管理 - 自动化的终极形态

引言:声明式运维的魅力 传统部署:编写 10+ 个 YAML 文件,手动管理 StatefulSet、Service、ConfigMap… Operator 部署:一个 CRD 搞定所有配置,自动化管理生命周期。 # 传统方式:10+ 个 YAML 文件 kubectl apply -f namespace.yaml kubectl apply -f configmap.yaml kubectl apply -f nameserver-statefulset.yaml kubectl apply -f nameserver-service.yaml kubectl apply -f broker-statefulset.yaml ... # Operator 方式:1 个 CRD kubectl apply -f rocketmq-cluster.yaml # Done! 本文目标: 理解 Operator 工作原理 安装 RocketMQ Operator 使用 CRD 部署集群 实现自动化运维 一、Operator 基础 1.1 什么是 Operator? Operator = CRD + Controller ┌──────────────────────────────────────────┐ │ Operator │ │ │ │ ┌────────────┐ ┌───────────────┐ │ │ │ CRD │ │ Controller │ │ │ │ (定义) │─────>│ (执行逻辑) │ │ │ └────────────┘ └───────┬───────┘ │ │ │ │ └──────────────────────────────┼──────────┘ │ ┌──────────▼──────────┐ │ K8s API Server │ └──────────┬──────────┘ │ ┌──────────────────────┼──────────────────────┐ │ │ │ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │StatefulSet│ │ Service │ │ ConfigMap │ └───────────┘ └───────────┘ └───────────┘ Operator 功能: ...

2025-11-15 · maneng

RocketMQ云原生01:Kubernetes 部署实践 - 拥抱容器化时代

引言:云原生时代的 RocketMQ 传统物理机部署,扩容一台 Broker 需要 2 天:申请机器 → 装系统 → 装软件 → 配置 → 测试… Kubernetes 部署,扩容只需 30 秒:kubectl scale statefulset broker --replicas=5 云原生的优势: ✅ 秒级弹性伸缩 ✅ 自动故障恢复 ✅ 统一资源管理 ✅ 声明式配置 本文目标: 理解 RocketMQ 在 K8s 上的架构 掌握 StatefulSet 部署方法 实现持久化存储 配置服务发现和负载均衡 一、架构设计 1.1 K8s 部署架构 ┌─────────────────────────────────────────────────────┐ │ Kubernetes 集群 │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Namespace: rocketmq │ │ │ │ │ │ │ │ ┌────────────────────────────────────┐ │ │ │ │ │ StatefulSet: rocketmq-nameserver │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ │ │ ns-0 │ │ ns-1 │ │ │ │ │ │ │ └─────────┘ └─────────┘ │ │ │ │ │ └────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌────────────────────────────────────┐ │ │ │ │ │ StatefulSet: rocketmq-broker │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ │ │ │ │ │ │ │ broker-0 │ │ broker-1 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─PVC─┐ │ │ ┌─PVC─┐ │ │ │ │ │ │ │ │ │ 50Gi│ │ │ │ 50Gi│ │ │ │ │ │ │ │ └──┴─────┴─┘ └──┴─────┴─┘ │ │ │ │ │ └────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌──────────────────────────┐ │ │ │ │ │ Service: nameserver-svc │ │ │ │ │ │ ClusterIP / Headless │ │ │ │ │ └──────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 二、部署准备 2.1 创建命名空间 # namespace.yaml apiVersion: v1 kind: Namespace metadata: name: rocketmq kubectl apply -f namespace.yaml 2.2 创建 ConfigMap # configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: rocketmq-broker-config namespace: rocketmq data: broker.conf: | brokerClusterName=DefaultCluster brokerName=broker-{{ .Ordinal }} brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=/home/rocketmq/store storePathCommitLog=/home/rocketmq/store/commitlog # 网络配置 brokerIP1={{ .PodIP }} listenPort=10911 # NameServer 地址 namesrvAddr=rocketmq-nameserver-0.rocketmq-nameserver.rocketmq.svc.cluster.local:9876;rocketmq-nameserver-1.rocketmq-nameserver.rocketmq.svc.cluster.local:9876 kubectl apply -f configmap.yaml 三、NameServer 部署 3.1 StatefulSet 配置 # nameserver-statefulset.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: rocketmq-nameserver namespace: rocketmq spec: serviceName: rocketmq-nameserver replicas: 2 selector: matchLabels: app: rocketmq-nameserver template: metadata: labels: app: rocketmq-nameserver spec: containers: - name: nameserver image: apache/rocketmq:5.0.0 imagePullPolicy: IfNotPresent command: - sh - mqnamesrv ports: - containerPort: 9876 name: nameserver env: - name: JAVA_OPT_EXT value: "-Xms512m -Xmx512m -Xmn256m" resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m" livenessProbe: tcpSocket: port: 9876 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: tcpSocket: port: 9876 initialDelaySeconds: 10 periodSeconds: 5 3.2 Headless Service # nameserver-service.yaml apiVersion: v1 kind: Service metadata: name: rocketmq-nameserver namespace: rocketmq spec: clusterIP: None # Headless Service selector: app: rocketmq-nameserver ports: - port: 9876 targetPort: 9876 name: nameserver 3.3 部署 NameServer # 部署 StatefulSet kubectl apply -f nameserver-statefulset.yaml # 部署 Service kubectl apply -f nameserver-service.yaml # 查看 Pod 状态 kubectl get pods -n rocketmq -l app=rocketmq-nameserver # 输出: # NAME READY STATUS RESTARTS AGE # rocketmq-nameserver-0 1/1 Running 0 1m # rocketmq-nameserver-1 1/1 Running 0 1m # 查看 Service kubectl get svc -n rocketmq # 查看 DNS 记录 kubectl run -it --rm debug --image=busybox --restart=Never -- nslookup rocketmq-nameserver.rocketmq.svc.cluster.local 四、Broker 部署 4.1 StorageClass 配置 # storageclass.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: rocketmq-storage provisioner: kubernetes.io/aws-ebs # 根据云厂商修改 parameters: type: gp3 fsType: ext4 volumeBindingMode: WaitForFirstConsumer allowVolumeExpansion: true kubectl apply -f storageclass.yaml 4.2 Broker StatefulSet # broker-statefulset.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: rocketmq-broker namespace: rocketmq spec: serviceName: rocketmq-broker replicas: 2 selector: matchLabels: app: rocketmq-broker template: metadata: labels: app: rocketmq-broker spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - rocketmq-broker topologyKey: "kubernetes.io/hostname" containers: - name: broker image: apache/rocketmq:5.0.0 imagePullPolicy: IfNotPresent command: - sh - mqbroker - -c - /etc/rocketmq/broker.conf ports: - containerPort: 10909 name: vip - containerPort: 10911 name: main - containerPort: 10912 name: ha env: - name: NAMESRV_ADDR value: "rocketmq-nameserver-0.rocketmq-nameserver.rocketmq.svc.cluster.local:9876;rocketmq-nameserver-1.rocketmq-nameserver.rocketmq.svc.cluster.local:9876" - name: JAVA_OPT_EXT value: "-Xms2g -Xmx2g -Xmn1g" volumeMounts: - name: broker-storage mountPath: /home/rocketmq/store - name: broker-config mountPath: /etc/rocketmq/broker.conf subPath: broker.conf resources: requests: memory: "2Gi" cpu: "500m" limits: memory: "4Gi" cpu: "2000m" livenessProbe: tcpSocket: port: 10911 initialDelaySeconds: 60 periodSeconds: 10 readinessProbe: tcpSocket: port: 10911 initialDelaySeconds: 20 periodSeconds: 5 volumes: - name: broker-config configMap: name: rocketmq-broker-config volumeClaimTemplates: - metadata: name: broker-storage spec: accessModes: [ "ReadWriteOnce" ] storageClassName: rocketmq-storage resources: requests: storage: 50Gi 4.3 Broker Service # broker-service.yaml apiVersion: v1 kind: Service metadata: name: rocketmq-broker namespace: rocketmq spec: clusterIP: None selector: app: rocketmq-broker ports: - port: 10909 targetPort: 10909 name: vip - port: 10911 targetPort: 10911 name: main - port: 10912 targetPort: 10912 name: ha 4.4 部署 Broker # 部署 StatefulSet kubectl apply -f broker-statefulset.yaml # 部署 Service kubectl apply -f broker-service.yaml # 查看 Pod kubectl get pods -n rocketmq -l app=rocketmq-broker -w # 查看 PVC kubectl get pvc -n rocketmq # 查看日志 kubectl logs -f rocketmq-broker-0 -n rocketmq # 进入 Pod 验证 kubectl exec -it rocketmq-broker-0 -n rocketmq -- sh > sh mqadmin clusterList -n rocketmq-nameserver-0.rocketmq-nameserver.rocketmq.svc.cluster.local:9876 五、客户端连接 5.1 内部访问 在 K8s 集群内部: ...

2025-11-15 · maneng

RocketMQ生产08:版本升级策略 - 平滑迁移与回滚方案

引言:升级的挑战 生产环境 RocketMQ 4.5 升级到 5.0,结果消息格式不兼容,业务大面积报错,紧急回滚… 升级风险: ❌ 新旧版本不兼容 ❌ 升级过程业务中断 ❌ 数据格式变更导致丢消息 ❌ 回滚困难,恢复时间长 本文目标: ✅ 理解版本升级的风险点 ✅ 掌握平滑升级方法 ✅ 实现零停机升级 ✅ 建立快速回滚机制 一、版本兼容性分析 1.1 RocketMQ 版本演进 版本 发布时间 重大变更 兼容性 4.5.x 2019 事务消息优化 ✅ 向下兼容 4.7.x 2020 Dledger 主从切换 ✅ 向下兼容 4.9.x 2021 性能优化、ACL增强 ✅ 向下兼容 5.0.x 2022 Pop消费模式、轻量级客户端 ⚠️ 部分不兼容 5.1.x 2023 Proxy模式、gRPC协议 ⚠️ 新架构 1.2 兼容性检查清单 升级前必查: 查看官方 Release Notes 检查 API 是否有破坏性变更 验证客户端版本兼容性 确认配置文件格式是否变化 检查数据存储格式是否兼容 官方文档: ...

2025-11-15 · maneng

RocketMQ生产07:安全机制配置 - ACL 权限控制实战

引言:安全的重要性 某天,开发环境的测试账号误连到生产环境,执行了 deleteTopic 命令,核心业务 Topic 被删除,服务全面瘫痪… 安全风险: ❌ 任何人都能创建/删除 Topic ❌ 未授权用户读取敏感消息 ❌ 恶意攻击导致集群瘫痪 ❌ 内部人员误操作 本文目标: ✅ 理解 RocketMQ ACL 机制 ✅ 配置细粒度权限控制 ✅ 实现用户/应用隔离 ✅ 建立安全最佳实践 一、ACL 基础概念 1.1 什么是 ACL? ACL(Access Control List):访问控制列表,用于定义"谁可以对哪些资源执行什么操作"。 ACL 三要素: 1. Subject(主体):谁在访问?(用户、应用) 2. Resource(资源):访问什么?(Topic、Group) 3. Permission(权限):做什么操作?(PUB、SUB、DENY) 1.2 RocketMQ ACL 权限类型 权限 含义 适用对象 PUB 发布权限(Producer) Topic SUB 订阅权限(Consumer) Topic + Group DENY 禁止访问 Topic + Group ANY 所有权限 Topic + Group 1.3 ACL 工作流程 ┌─────────┐ ┌─────────────────┐ ┌──────────┐ │ Client │ │ ACL Validator │ │ Broker │ └────┬────┘ └────────┬────────┘ └────┬─────┘ │ │ │ │ 1. 发送请求 │ │ │ (AccessKey) │ │ ├───────────────────>│ │ │ │ │ │ │ 2. 校验签名 │ │ │ 校验权限 │ │ │ │ │ │ 3. 权限通过 │ │ ├───────────────────>│ │ │ │ │ 4. 返回结果 │ │ │<──────────────────────────────────────────┤ │ │ │ 二、启用 ACL 2.1 Broker 配置 # broker.conf # 启用 ACL aclEnable=true # ACL 配置文件路径(相对于 ROCKETMQ_HOME) accessKey=conf/plain_acl.yml 2.2 创建 ACL 配置文件 # 创建配置文件 vim /opt/rocketmq/conf/plain_acl.yml # plain_acl.yml # 全局白名单(IP) globalWhiteRemoteAddresses: - 10.10.103.* # 内网IP段 - 192.168.0.* # 用户配置 accounts: # 管理员账号 - accessKey: RocketMQ_Admin secretKey: 12345678 whiteRemoteAddress: # 白名单IP(可选) admin: true # 管理员权限 defaultTopicPerm: DENY # 默认禁止所有Topic defaultGroupPerm: DENY # 默认禁止所有Group topicPerms: - "*=PUB|SUB" # 所有Topic的发布和订阅权限 groupPerms: - "*=PUB|SUB" # 订单服务账号 - accessKey: order_service secretKey: order_pwd_2024 whiteRemoteAddress: 192.168.1.* # 只允许订单服务器IP admin: false defaultTopicPerm: DENY defaultGroupPerm: DENY topicPerms: - "order_topic=PUB" # 只能发送订单消息 - "order_result_topic=SUB" # 只能消费结果消息 groupPerms: - "order_consumer_group=SUB" # 库存服务账号 - accessKey: inventory_service secretKey: inventory_pwd_2024 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: DENY topicPerms: - "order_topic=SUB" # 只能消费订单消息 - "inventory_topic=PUB" # 只能发送库存消息 groupPerms: - "inventory_consumer_group=SUB" # 只读账号(监控用) - accessKey: monitor_user secretKey: monitor_pwd_2024 admin: false defaultTopicPerm: SUB # 默认只读 defaultGroupPerm: SUB topicPerms: - "*=SUB" 2.3 重启 Broker # 停止 Broker sh /opt/rocketmq/bin/mqshutdown broker # 启动 Broker(应用 ACL 配置) sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf & # 验证 ACL 是否生效 tail -f /opt/rocketmq/logs/rocketmqlogs/broker.log | grep ACL # 输出:ACL is enabled 三、客户端配置 3.1 Producer 配置 import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; public class AclProducerExample { public static void main(String[] args) throws Exception { // 1. 创建 ACL 凭证 String accessKey = "order_service"; String secretKey = "order_pwd_2024"; SessionCredentials credentials = new SessionCredentials(accessKey, secretKey); AclClientRPCHook aclHook = new AclClientRPCHook(credentials); // 2. 创建 Producer(传入 ACL Hook) DefaultMQProducer producer = new DefaultMQProducer( "order_producer_group", aclHook, // ACL Hook true, // 启用消息轨迹 null // 自定义轨迹Topic(可选) ); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); // 3. 发送消息 Message msg = new Message("order_topic", "Hello RocketMQ with ACL".getBytes()); SendResult result = producer.send(msg); System.out.println("发送结果:" + result.getSendStatus()); producer.shutdown(); } } 3.2 Consumer 配置 import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class AclConsumerExample { public static void main(String[] args) throws Exception { // 1. 创建 ACL 凭证 String accessKey = "inventory_service"; String secretKey = "inventory_pwd_2024"; SessionCredentials credentials = new SessionCredentials(accessKey, secretKey); AclClientRPCHook aclHook = new AclClientRPCHook(credentials); // 2. 创建 Consumer(传入 ACL Hook) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "inventory_consumer_group", aclHook, // ACL Hook null // 消息轨迹相关配置 ); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("order_topic", "*"); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("收到消息:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer 启动成功"); } } 3.3 Spring Boot 配置 # application.yml rocketmq: name-server: 127.0.0.1:9876 producer: group: order_producer_group access-key: order_service # ACL AccessKey secret-key: order_pwd_2024 # ACL SecretKey send-message-timeout: 3000 retry-times-when-send-failed: 2 consumer: group: inventory_consumer_group access-key: inventory_service secret-key: inventory_pwd_2024 consume-thread-min: 5 consume-thread-max: 10 Producer 代码: ...

2025-11-15 · maneng

RocketMQ生产06:高可用保障 - 多机房部署与故障演练

引言:高可用的重要性 凌晨 3 点,机房断电,整个 RocketMQ 集群瘫痪,所有业务停摆,损失数百万… 高可用(HA)的目标: ✅ 任意单点故障不影响服务 ✅ 机房级故障快速切换 ✅ 数据零丢失或最小丢失 ✅ RTO(恢复时间)< 5 分钟 本文目标: 掌握主从架构和 Dledger 架构 实现多机房部署方案 学会故障演练方法 建立完整的容灾体系 一、高可用架构 1.1 单机房主从架构 ┌─────────────────────────────────────┐ │ 机房A(生产机房) │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ NameServer │ │ NameServer │ │ │ │ (n1) │ │ (n2) │ │ │ └────────────┘ └────────────┘ │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ Broker-A │ │ Broker-B │ │ │ │ (Master) │ │ (Master) │ │ │ └──────┬─────┘ └──────┬─────┘ │ │ │ │ │ │ ┌──────▼─────┐ ┌──────▼─────┐ │ │ │ Broker-A │ │ Broker-B │ │ │ │ (Slave) │ │ (Slave) │ │ │ └────────────┘ └────────────┘ │ └─────────────────────────────────────┘ 特点: ...

2025-11-15 · maneng

RocketMQ生产05:消息重复处理 - 幂等性设计的最佳实践

引言:重复消费的困扰 数据库里同一笔订单被创建了 3 次,库存被重复扣减,用户投诉"明明只买了 1 件,为什么扣了 3 件库存?" 重复消费的常见表现: ❌ 订单重复创建 ❌ 积分重复发放 ❌ 库存重复扣减 ❌ 消息重复推送 本文目标: ✅ 理解消息重复的根本原因 ✅ 掌握幂等性设计的核心方法 ✅ 学习 5 种幂等性实现方案 ✅ 构建完整的防重体系 一、消息重复的根本原因 1.1 RocketMQ 的 At Least Once 语义 RocketMQ 保证消息至少投递一次(At Least Once),但不保证恰好一次(Exactly Once)。 为什么会重复? ┌─────────┐ ┌─────────┐ ┌──────────┐ │Producer │ │ Broker │ │ Consumer │ └────┬────┘ └────┬────┘ └────┬─────┘ │ │ │ │ 1. 发送消息 │ │ ├──────────────────>│ │ │ │ │ │ 2. 存储成功 │ │ │<──────────────────┤ │ │ │ │ │ │ 3. 推送消息 │ │ ├──────────────────>│ │ │ │ │ │ 4. 消费成功 │ │ │<──────────────────┤ │ │ │ │ │ 5. ACK确认 │ │ │<──X─────────────X─┤ 网络故障,ACK 丢失 │ │ │ │ │ 6. 超时重推 │ │ ├──────────────────>│ 重复消费! │ │ │ 重复的三大场景: ...

2025-11-15 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...