RocketMQ生产04:消息丢失排查 - 全链路追踪与问题定位

引言:消息丢了怎么办? 凌晨接到告警:“订单支付成功,但库存未扣减!“排查半天,发现是 RocketMQ 消息丢了。消息到底在哪个环节丢的?如何快速定位?如何避免再次发生? 消息丢失的常见表现: ❌ Producer 发送成功,Consumer 未收到 ❌ Broker 重启后,部分消息消失 ❌ Consumer 消费后,业务未执行 ❌ 高峰期消息神秘失踪 本文目标: ✅ 理解消息丢失的所有可能环节 ✅ 掌握全链路排查方法 ✅ 建立消息可靠性保障机制 ✅ 实现端到端消息追踪 一、消息丢失的三个环节 1.1 全链路示意图 ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Producer │ (1) │ Broker │ (2) │ Consumer │ │ ├──────>│ ├──────>│ │ └──────────┘ └──────────┘ └──────────┘ ⚠️ ⚠️ ⚠️ 发送端丢失 存储端丢失 消费端丢失 三个风险点: Producer 发送丢失:网络故障、未等待响应 Broker 存储丢失:异步刷盘、磁盘故障 Consumer 消费丢失:自动ACK、业务异常未重试 二、发送端丢失排查 2.1 同步发送 vs 异步发送 2.1.1 同步发送(推荐) // ✅ 正确:等待发送结果 try { SendResult result = producer.send(message); if (result.getSendStatus() == SendStatus.SEND_OK) { log.info("发送成功,msgId={}", result.getMsgId()); } else { log.error("发送失败,status={}", result.getSendStatus()); // 重试或告警 } } catch (Exception e) { log.error("发送异常", e); // 异常处理:重试、记录数据库、告警 } 关键点: ...

2025-11-15 · maneng

RocketMQ生产03:性能调优指南 - JVM、OS、Broker 参数优化

引言:性能优化的本质 上线前压测:TPS 轻松 5 万,延迟 5ms 以内。生产环境一跑:TPS 破千就开始 Full GC,延迟飙到 500ms,系统濒临崩溃… 性能问题的常见表现: ❌ TPS 上不去,CPU 使用率却不高 ❌ 频繁 Full GC,Stop-The-World 时间长 ❌ 消息发送延迟高,P99 延迟上秒 ❌ 磁盘 I/O 成瓶颈,写入速度慢 本文目标: ✅ 掌握 JVM 参数调优方法 ✅ 理解操作系统层面优化 ✅ 优化 Broker 核心参数 ✅ 建立性能调优思维模型 一、性能调优方法论 1.1 调优三板斧 1. 测量(Measure) → 建立性能基线 2. 分析(Analyze) → 定位性能瓶颈 3. 优化(Optimize) → 针对性调优 切忌盲目调优: ❌ 看到别人的配置就直接copy ❌ 不测量就调参,凭感觉优化 ❌ 多个参数同时修改,无法定位效果 正确姿势: ✅ 建立性能基线(压测记录初始状态) ✅ 单一变量法(一次只改一个参数) ✅ 量化效果(对比优化前后的指标) 1.2 性能指标体系 维度 核心指标 目标值 采集方式 吞吐量 TPS(消息/秒) 根据业务需求 Prometheus 延迟 P99 延迟(ms) < 50ms 客户端埋点 资源 CPU 使用率 < 70% top 资源 内存使用率 < 80% free -h 资源 磁盘 I/O 等待 < 10% iostat 稳定性 GC 频率 YGC < 1次/秒 jstat 稳定性 Full GC 间隔 > 1小时 jstat 二、JVM 参数调优 2.1 内存配置 2.1.1 堆内存大小 推荐配置: ...

2025-11-15 · maneng

RocketMQ生产02:监控体系搭建 - Prometheus + Grafana 实战

引言:没有监控的生产环境是裸奔 凌晨 3 点,手机疯狂震动,告警短信轰炸:“RocketMQ 消息堆积 100 万条!“你迅速爬起来,打开电脑,却不知道从哪里开始排查… 这就是没有监控的痛: ❌ 故障发现滞后,用户先于运维发现问题 ❌ 排查无从下手,缺少历史数据 ❌ 性能瓶颈不可见,容量规划靠猜 ❌ 告警不及时,小问题变大故障 本文目标: ✅ 搭建完整的 RocketMQ 监控体系 ✅ 实现核心指标的可视化 ✅ 配置智能告警规则 ✅ 掌握故障排查方法 一、监控体系架构 1.1 整体架构图 ┌─────────────────────────────────────────────────────────┐ │ RocketMQ 集群 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Broker-A │ │ Broker-B │ │ Broker-C │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ Exporter │ ← 暴露 Prometheus 指标 │ │ └──────┬──────┘ │ └─────────────────────┼────────────────────────────────────┘ │ ┌──────▼──────┐ │ Prometheus │ ← 采集、存储、查询 └──────┬──────┘ │ ┌─────────────┼─────────────┐ │ │ │ ┌──────▼──────┐ ┌───▼────┐ ┌──────▼──────┐ │ Grafana │ │AlertMgr│ │ 其他工具 │ │ (可视化) │ │(告警) │ │ (日志等) │ └─────────────┘ └────────┘ └─────────────┘ 1.2 核心组件 组件 作用 端口 RocketMQ Exporter 采集 RocketMQ 指标,暴露给 Prometheus 5557 Prometheus 时序数据库,存储监控指标 9090 Grafana 可视化平台,展示监控大盘 3000 AlertManager 告警管理,通知到钉钉/邮件/微信 9093 二、安装部署 2.1 安装 Prometheus # 1. 下载 Prometheus cd /opt wget https://github.com/prometheus/prometheus/releases/download/v2.45.0/prometheus-2.45.0.linux-amd64.tar.gz tar -xzf prometheus-2.45.0.linux-amd64.tar.gz mv prometheus-2.45.0.linux-amd64 prometheus # 2. 配置 Prometheus cd prometheus cat > prometheus.yml <<EOF global: scrape_interval: 15s # 采集间隔 evaluation_interval: 15s # 告警规则评估间隔 # 采集任务配置 scrape_configs: # Prometheus 自身监控 - job_name: 'prometheus' static_configs: - targets: ['localhost:9090'] # RocketMQ 监控 - job_name: 'rocketmq' static_configs: - targets: ['192.168.1.100:5557'] # Exporter 地址 labels: cluster: 'rocketmq-prod' env: 'production' EOF # 3. 启动 Prometheus nohup ./prometheus --config.file=prometheus.yml \ --storage.tsdb.path=./data \ --web.listen-address=:9090 > prometheus.log 2>&1 & # 4. 验证 curl http://localhost:9090/-/healthy # 浏览器访问:http://your-ip:9090 2.2 安装 RocketMQ Exporter # 1. 下载 Exporter cd /opt wget https://github.com/apache/rocketmq-exporter/releases/download/rocketmq-exporter-0.0.2/rocketmq-exporter-0.0.2-bin.tar.gz tar -xzf rocketmq-exporter-0.0.2-bin.tar.gz cd rocketmq-exporter-0.0.2 # 2. 配置 application.properties cat > application.properties <<EOF # RocketMQ NameServer 地址 rocketmq.config.namesrvAddr=192.168.1.100:9876;192.168.1.101:9876 # Exporter 监听端口 server.port=5557 # 采集指标的消费组(用于监控消费进度) rocketmq.config.consumerGroups=consumer_group_1,consumer_group_2 # 是否启用 ACL(如果 RocketMQ 开启了 ACL) rocketmq.config.enableACL=false EOF # 3. 启动 Exporter nohup java -jar rocketmq-exporter-0.0.2.jar \ --spring.config.location=application.properties > exporter.log 2>&1 & # 4. 验证指标暴露 curl http://localhost:5557/metrics 成功输出示例: ...

2025-11-15 · maneng

RocketMQ生产01:生产部署方案 - 集群规划与容量评估

引言:生产环境的挑战 开发环境跑得欢,生产一上就翻车?RocketMQ 的生产部署是个系统工程,涉及集群规划、容量评估、高可用设计等多个维度。本文将从第一性原理出发,带你构建一个稳定可靠的生产集群。 为什么需要认真规划? 血泪教训: ❌ 磁盘空间不足,消息写入失败,业务中断 ❌ 单点故障,Broker 宕机,服务不可用 ❌ 容量估算错误,高峰期消息堆积,处理延迟 ❌ 网络带宽不足,吞吐量上不去,性能瓶颈 核心目标: ✅ 高可用:任意节点故障不影响服务 ✅ 高性能:满足业务峰值吞吐量需求 ✅ 可扩展:支持业务增长,弹性伸缩 ✅ 易运维:监控完善,故障快速定位 一、部署架构模式 1.1 单 Master 模式 架构图: ┌─────────────┐ │ Producer │ └─────┬───────┘ │ ┌─────▼───────┐ ┌─────────────┐ │ NameServer │◄─────┤ Consumer │ └─────┬───────┘ └─────────────┘ │ ┌─────▼───────┐ │ Broker │ │ (Master) │ └─────────────┘ 特点: ✅ 部署简单,成本低 ❌ 单点故障风险 ❌ 不支持高可用 适用场景: 开发测试环境 低重要性业务 POC 验证 配置示例: # broker.conf brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=/data/rocketmq/store storePathCommitLog=/data/rocketmq/store/commitlog 1.2 多 Master 模式(推荐生产) 架构图: ...

2025-11-15 · maneng

RocketMQ进阶10:消费进度管理 - Offset存储与重置策略

引言:什么是Offset? Offset(偏移量)记录Consumer的消费进度: Queue中的消息位置:0, 1, 2, 3, 4, 5... Consumer已消费到:3 下次从4开始消费 Offset存储方式 集群模式(Broker存储) // 自动提交(默认) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // Offset存储在Broker端,多个Consumer共享进度 广播模式(本地存储) consumer.setMessageModel(MessageModel.BROADCASTING); // Offset存储在本地文件,每个Consumer独立进度 Offset提交机制 // 1. 自动提交(默认) // 消费成功后自动提交 // 2. 手动提交 consumer.setAutoCommit(false); // 业务处理完成后手动提交 context.commitOffset(); Offset重置 按时间重置 # 重置到1小时前 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s -3600000 按位置重置 # 重置到最早 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s -1 # 重置到最新 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s 0 最佳实践 1. 集群模式用于分布式消费 2. 广播模式用于全量通知 3. 生产环境定期备份Offset 4. 重置Offset前务必评估影响 本文关键词:Offset 消费进度 进度管理 重置策略 ...

2025-11-14 · maneng

RocketMQ进阶09:消息堆积处理 - 生产故障的应急方案

引言:消息堆积的危害 堆积影响: 消费延迟增加 内存占用上升 磁盘空间不足 系统性能下降 原因分析 1. Consumer消费速度慢 - 业务逻辑耗时 - 数据库慢查询 - 外部接口超时 2. Consumer数量不足 - 单个Consumer处理能力有限 3. Consumer宕机 - 无Consumer消费 解决方案 方案1:增加Consumer # 快速扩容Consumer实例 docker run -d rocketmq-consumer 方案2:提升消费性能 // 增加消费线程 consumer.setConsumeThreadMin(50); consumer.setConsumeThreadMax(100); // 批量消费 consumer.setConsumeMessageBatchMaxSize(16); // 优化业务逻辑 // - 异步处理 // - 批量操作数据库 // - 使用缓存 方案3:临时限流 // 限制Producer发送速率 Thread.sleep(10); // 每条消息延迟10ms 方案4:跳过堆积消息 // 紧急情况:重置offset跳过堆积 consumer.resetOffsetByTimestamp(topic, timestamp); 监控告警 监控指标: - 消费延迟(Consumer Lag) - 堆积消息数量 - 消费TPS 告警阈值: - 延迟 > 5分钟 - 堆积 > 10万条 本文关键词:消息堆积 故障处理 性能调优 应急方案

2025-11-14 · maneng

RocketMQ进阶08:流量控制机制 - 保护系统的最后防线

引言:为什么需要流量控制? 防止系统过载: Consumer处理速度:100条/秒 消息生产速度:1000条/秒 结果:消息堆积 → 内存溢出 → 系统崩溃 Producer端限流 // 设置发送超时 producer.setSendMsgTimeout(3000); // 异步发送队列大小限制 producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB Consumer端限流 // 1. 限制拉取数量 consumer.setPullBatchSize(32); // 每次最多拉32条 // 2. 限制并发消费线程 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(20); // 3. 限制消费速率 consumer.setPullInterval(100); // 拉取间隔100ms Broker端流控 触发条件: 1. 内存使用超过85% 2. 消息堆积超过阈值 3. PageCache繁忙 流控动作: - 拒绝Producer发送 - 降低Consumer拉取频率 本文关键词:流量控制 限流 背压 系统保护

2025-11-14 · maneng

RocketMQ进阶07:消息重试与死信队列 - 失败处理的最佳实践

引言:消费失败怎么办? Consumer消费失败时,RocketMQ自动重试: 消费失败 → 延迟重试 → 多次重试 → 死信队列 重试机制 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { try { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 返回RECONSUME_LATER触发重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 重试间隔:10s, 30s, 1m, 2m, 3m...最多16次 死信队列 16次重试失败 → 进入死信队列(%DLQ%消费组名) 死信队列处理 // 监听死信队列 @RocketMQMessageListener( topic = "%DLQ%my_consumer_group", consumerGroup = "dlq_handler_group" ) public class DLQHandler implements RocketMQListener<String> { @Override public void onMessage(String msg) { // 人工处理或记录日志 log.error("死信消息:{}", msg); } } 本文关键词:消息重试 死信队列 DLQ 失败处理

2025-11-14 · maneng

RocketMQ进阶06:批量消息优化 - 提升吞吐量的利器

引言:批量的力量 单条发送 vs 批量发送: 单条:1万TPS 批量:10万TPS(提升10倍) 批量发送 List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message msg = new Message("topic", ("Msg" + i).getBytes()); messages.add(msg); } // 批量发送(一次网络调用) SendResult result = producer.send(messages); 批量大小限制 单批最大:4MB 建议:每批100-1000条消息 性能对比 方式 TPS 延迟 单条发送 1万 5ms 批量发送 10万 10ms 本文关键词:批量消息 性能优化 高吞吐量

2025-11-14 · maneng

RocketMQ进阶05:延迟消息机制 - 定时任务的优雅实现

引言:延迟消息的应用场景 典型场景: 订单超时自动取消(30分钟未支付) 定时推送消息(生日祝福) 延迟重试(失败后延迟重试) 延迟级别 RocketMQ支持18个延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ##实现原理 // 发送延迟消息 Message msg = new Message("topic", "body".getBytes()); msg.setDelayTimeLevel(3); // 延迟10秒 producer.send(msg); // 原理: // 1. 消息先发送到SCHEDULE_TOPIC_XXXX // 2. 定时任务扫描到期消息 // 3. 投递到目标Topic 实战案例 // 订单超时取消 public void createOrder(Order order) { // 1. 创建订单 orderService.save(order); // 2. 发送延迟消息(30分钟) Message msg = new Message("order_timeout_topic", order.getId().toString().getBytes()); msg.setDelayTimeLevel(16); // 30分钟 producer.send(msg); } // Consumer处理超时订单 @Override public void onMessage(String orderId) { Order order = orderService.getById(orderId); if ("CREATED".equals(order.getStatus())) { // 仍未支付,取消订单 orderService.cancel(orderId); } } 本文关键词:延迟消息 定时任务 订单超时 延迟队列

2025-11-14 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...