生产环境最佳实践清单

部署架构 ✅ 推荐架构 ┌────────────────────────────────────┐ │ Nginx (反向代理 + 健康检查) │ └──────────────┬─────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Spring Cloud Gateway │ │ (网关层限流) │ └──────────┬───────────────────────┘ ↓ ┌──────────────────────────────────┐ │ 微服务集群 │ │ (服务层限流 + 熔断) │ │ ├─ Order Service (3实例) │ │ ├─ Product Service (5实例) │ │ └─ Payment Service (2实例) │ └──────────┬───────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Sentinel Dashboard (高可用) │ │ ├─ Dashboard 1 │ │ └─ Dashboard 2 │ └──────────────────────────────────┘ ↓ ┌──────────────────────────────────┐ │ Nacos (配置中心 + 注册中心) │ │ (规则持久化) │ └──────────────────────────────────┘ 规则配置 1. 限流规则 // ✅ 推荐:分层限流 // 第一层:网关限流(总入口) GatewayFlowRule gatewayRule = new GatewayFlowRule("order-service") .setCount(10000); // 第二层:服务限流(服务级) FlowRule serviceRule = new FlowRule(); serviceRule.setResource("orderService"); serviceRule.setCount(5000); // 第三层:接口限流(接口级) FlowRule apiRule = new FlowRule(); apiRule.setResource("orderCreate"); apiRule.setCount(1000); 2. 熔断规则 // ✅ 推荐:慢调用比例 DegradeRule rule = new DegradeRule(); rule.setResource("callProductService"); rule.setGrade(RuleConstant.DEGRADE_GRADE_RT); rule.setCount(1000); // RT > 1秒 rule.setSlowRatioThreshold(0.5); // 慢调用比例50% rule.setMinRequestAmount(10); // 最少10个请求 rule.setStatIntervalMs(10000); // 统计10秒 rule.setTimeWindow(10); // 熔断10秒 3. 系统保护 // ✅ 推荐:多指标保护 SystemRule rule = new SystemRule(); rule.setHighestCpuUsage(0.8); // CPU 80% rule.setAvgRt(500); // RT 500ms rule.setMaxThread(50); // 线程数50 配置清单 application.yml spring: application: name: order-service cloud: # Nacos配置 nacos: discovery: server-addr: nacos.example.com:8848 namespace: production config: server-addr: nacos.example.com:8848 namespace: production # Sentinel配置 sentinel: transport: dashboard: sentinel.example.com:8080 port: 8719 eager: true # 规则持久化 datasource: flow: nacos: server-addr: nacos.example.com:8848 dataId: ${spring.application.name}-flow-rules groupId: SENTINEL_GROUP rule-type: flow namespace: production degrade: nacos: server-addr: nacos.example.com:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP rule-type: degrade namespace: production 监控告警 Prometheus指标 management: endpoints: web: exposure: include: '*' metrics: export: prometheus: enabled: true tags: application: ${spring.application.name} env: production 告警规则 groups: - name: sentinel_critical rules: # 限流比例过高 - alert: HighBlockRate expr: | rate(sentinel_block_qps[1m]) / (rate(sentinel_pass_qps[1m]) + rate(sentinel_block_qps[1m])) > 0.1 for: 5m annotations: summary: "限流比例超过10%" # 熔断器开启 - alert: CircuitBreakerOpen expr: sentinel_circuit_breaker_state == 1 for: 1m annotations: summary: "熔断器已开启" # RT过高 - alert: HighRT expr: sentinel_avg_rt > 1000 for: 5m annotations: summary: "平均RT超过1秒" 代码规范 1. 资源命名 // ✅ 推荐:模块_操作 @SentinelResource("order_create") @SentinelResource("product_query") @SentinelResource("payment_pay") // ❌ 避免:动态资源名 @SentinelResource(value = "/api/" + userId) // 会产生大量资源 2. 降级处理 // ✅ 推荐:友好降级 @SentinelResource( value = "getProduct", blockHandler = "handleBlock", fallback = "handleFallback" ) public Product getProduct(Long id) { return productService.getById(id); } public Product handleBlock(Long id, BlockException ex) { // 限流降级:返回缓存 return productCache.get(id); } public Product handleFallback(Long id, Throwable ex) { // 异常降级:返回默认值 return Product.builder().id(id).name("服务暂时不可用").build(); } 3. 异常处理 // ✅ 推荐:全局异常处理 @RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(FlowException.class) public Result handleFlowException(FlowException e) { return Result.error(429, "请求过于频繁,请稍后重试"); } @ExceptionHandler(DegradeException.class) public Result handleDegradeException(DegradeException e) { return Result.error(503, "服务暂时不可用,请稍后重试"); } } 运维流程 1. 变更流程 提出变更需求 ↓ 编写变更方案 ↓ 技术Review ↓ 灰度验证 ↓ 全量发布 ↓ 监控观察 ↓ 变更总结 2. 灰度发布 # 1. 先更新1个实例 kubectl scale deployment order-service --replicas=1 # 更新规则 # 观察5分钟 # 2. 逐步扩容 kubectl scale deployment order-service --replicas=3 # 观察10分钟 # 3. 全量发布 kubectl scale deployment order-service --replicas=10 3. 回滚方案 # 1. 规则回滚 # 在Nacos中恢复旧规则 # 2. 应用回滚 kubectl rollout undo deployment order-service # 3. 验证 curl http://localhost:8080/health 性能优化 1. JVM参数 -Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof 2. 线程池 server: tomcat: threads: max: 200 min-spare: 50 accept-count: 100 3. 规则优化 // 控制规则数量 < 100条 // 合并相似规则 // 定期清理无用规则 安全加固 1. Dashboard认证 # Dashboard启动参数 -Dsentinel.dashboard.auth.username=admin -Dsentinel.dashboard.auth.password=your_strong_password # 启用HTTPS server.port=8443 server.ssl.enabled=true server.ssl.key-store=classpath:keystore.p12 server.ssl.key-store-password=password 2. 规则权限 // 配置规则审批流程 // 只允许管理员修改规则 @PreAuthorize("hasRole('ADMIN')") public Result updateRule(FlowRule rule) { // ... } 测试验证 1. 限流测试 # 压测工具 wrk -t10 -c100 -d60s http://localhost:8080/order/create # 验证限流生效 # 查看Dashboard监控 # 查看应用日志 2. 熔断测试 # 故障注入 curl -X POST http://localhost:8080/fault/inject?resource=productService&delay=5000 # 观察熔断生效 # 查看熔断日志 # 验证降级逻辑 3. 压测报告 测试场景:订单创建接口 限流阈值:1000 QPS 压测并发:100 持续时间:60秒 结果: - 通过QPS:1000 - 限流QPS:100 - 成功率:90.9% - 平均RT:50ms - 限流生效:✅ 应急预案 1. 限流过度 # 立即调整规则 # 在Nacos中增大阈值 # 或临时关闭限流 2. 熔断误判 # 调整熔断阈值 # 增大慢调用时间 # 增大慢调用比例 3. 全局降级 # 配置降级开关 sentinel: enabled: false # 紧急关闭 检查清单 上线前检查 Dashboard部署完成 配置Nacos持久化 配置限流规则 配置熔断规则 配置监控告警 限流压测验证 熔断故障注入测试 降级逻辑验证 应急预案准备 文档编写完成 运行中检查 每日查看Dashboard监控 每周review规则配置 每月review告警记录 每季度压测验证 定期演练应急预案 总结 生产环境最佳实践: ...

2025-11-20 · maneng

问题排查与故障处理

常见问题清单 1. 规则不生效 问题:配置了限流规则,但不生效。 排查: # 1. 检查Dashboard连接 curl http://localhost:8719/api # 2. 检查规则是否加载 curl http://localhost:8719/getRules?type=flow # 3. 检查资源名是否一致 # Dashboard中的资源名 vs 代码中的资源名 解决: // 确保资源名完全一致 @SentinelResource(value = "orderCreate") // 代码中 // Dashboard配置的资源名也必须是 "orderCreate" // 检查规则加载 List<FlowRule> rules = FlowRuleManager.getRules(); System.out.println("规则数量:" + rules.size()); 2. Dashboard看不到应用 问题:应用启动后,Dashboard看不到应用。 排查: # 检查配置 spring: cloud: sentinel: transport: dashboard: localhost:8080 # Dashboard地址 port: 8719 # 客户端端口 eager: true # 立即初始化 解决: // 主动触发一次资源调用 @PostConstruct public void init() { try { Entry entry = SphU.entry("sentinel-heartbeat"); entry.exit(); } catch (BlockException e) { // ignore } } 3. 规则推送失败 问题:Dashboard推送规则失败。 ...

2025-11-20 · maneng

性能调优:降低Sentinel开销

性能开销分析 Sentinel引入的开销 组件 开销 说明 Entry创建 < 0.1ms 创建Entry对象 Slot Chain < 0.2ms 责任链处理 统计更新 < 0.1ms 滑动窗口更新 总开销 < 0.5ms 正常情况下 压测对比 无Sentinel: - QPS: 10,000 - 平均RT: 10ms 有Sentinel: - QPS: 9,800 (-2%) - 平均RT: 10.4ms (+0.4ms) 结论:对性能影响< 5% 优化策略 1. 减少资源数量 // ❌ 不推荐:每个URL都是资源 @SentinelResource(value = "/order/{id}") public Order getOrder(@PathVariable Long id) { } // ✅ 推荐:按模块定义资源 @SentinelResource(value = "orderQuery") public Order getOrder(@PathVariable Long id) { } 2. 合并规则 // ❌ 不推荐:100个资源,100条规则 for (String resource : resources) { FlowRule rule = new FlowRule(); rule.setResource(resource); rule.setCount(1000); rules.add(rule); } // ✅ 推荐:按分组定义规则 FlowRule readRule = new FlowRule(); readRule.setResource("read_api"); readRule.setCount(10000); FlowRule writeRule = new FlowRule(); writeRule.setResource("write_api"); writeRule.setCount(1000); 3. 禁用不必要的Slot public class CustomSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new StatisticSlot()); // 不需要授权,不添加AuthoritySlot // chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } } 4. 调整滑动窗口参数 // 默认:2个窗口,1秒 // 适合QPS限流 // 高QPS场景:增加窗口数 SampleCountProperty.setSampleCount(4); // 4个窗口 IntervalProperty.setInterval(1000); // 1秒 // 低QPS场景:减少窗口数 SampleCountProperty.setSampleCount(1); // 1个窗口 5. 使用异步模式 // 同步模式 @SentinelResource("orderCreate") public Order createOrder(OrderDTO dto) { return orderService.create(dto); } // 异步模式 @SentinelResource("orderCreate") public CompletableFuture<Order> createOrderAsync(OrderDTO dto) { return CompletableFuture.supplyAsync(() -> orderService.create(dto)); } JVM参数优化 堆内存 # 合理设置堆内存,避免过度GC -Xms2g -Xmx2g -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m GC调优 # 使用G1 GC -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1HeapRegionSize=16m # GC日志 -Xloggc:logs/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps 线程池 # Tomcat线程池 server.tomcat.threads.max=200 server.tomcat.threads.min-spare=50 server.tomcat.accept-count=100 批量操作优化 批量检查 // ❌ 不推荐:循环检查 for (Long productId : productIds) { Entry entry = SphU.entry("queryProduct"); try { queryProduct(productId); } finally { entry.exit(); } } // ✅ 推荐:批量检查一次 Entry entry = SphU.entry("queryProduct", EntryType.IN, productIds.size()); try { batchQueryProducts(productIds); } finally { entry.exit(); } 缓存优化 规则缓存 @Component public class RuleCacheManager { private final LoadingCache<String, List<FlowRule>> ruleCache; public RuleCacheManager() { this.ruleCache = CacheBuilder.newBuilder() .maximumSize(1000) .expireAfterWrite(1, TimeUnit.MINUTES) .build(new CacheLoader<String, List<FlowRule>>() { @Override public List<FlowRule> load(String resource) { return FlowRuleManager.getRules(resource); } }); } public List<FlowRule> getRules(String resource) { try { return ruleCache.get(resource); } catch (ExecutionException e) { return Collections.emptyList(); } } } 监控性能指标 性能指标 @Component public class SentinelPerformanceMonitor { private final Timer entryTimer; public SentinelPerformanceMonitor(MeterRegistry registry) { this.entryTimer = Timer.builder("sentinel.entry.timer") .description("Sentinel entry execution time") .register(registry); } @Around("@annotation(sentinelResource)") public Object monitor(ProceedingJoinPoint pjp, SentinelResource sentinelResource) throws Throwable { return entryTimer.record(() -> { try { return pjp.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } }); } } 压测验证 压测脚本 #!/bin/bash # 压测接口 # 无Sentinel ab -n 100000 -c 100 http://localhost:8080/order/create # 有Sentinel # 修改代码添加@SentinelResource ab -n 100000 -c 100 http://localhost:8080/order/create # 对比结果 性能报告 压测结果对比: 指标 | 无Sentinel | 有Sentinel | 差异 -------------|-----------|-----------|------ QPS | 10,245 | 10,087 | -1.5% 平均RT(ms) | 9.76 | 9.91 | +1.5% 最大RT(ms) | 45 | 48 | +6.7% 错误率 | 0% | 0% | 0% 结论:性能影响在可接受范围内(< 2%) 最佳实践 1. 资源粒度 // ✅ 推荐:模块级资源 @SentinelResource("orderModule") // ❌ 避免:方法级资源(太多) @SentinelResource("orderService.createOrder") @SentinelResource("orderService.updateOrder") @SentinelResource("orderService.deleteOrder") 2. 规则数量 // ✅ 推荐:< 100条规则 // 每个应用的规则总数控制在100条以内 // ❌ 避免:> 1000条规则 // 规则过多会影响性能 3. 统计窗口 // 高QPS(> 1000):使用更多窗口 SampleCountProperty.setSampleCount(4); // 低QPS(< 100):使用更少窗口 SampleCountProperty.setSampleCount(1); 4. 集群模式 // 高流量场景使用集群模式 rule.setClusterMode(true); // 低流量场景使用本地模式 rule.setClusterMode(false); 性能测试 测试用例 @Test public void testSentinelPerformance() { // 预热 for (int i = 0; i < 10000; i++) { testEntry(); } // 测试 long start = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { testEntry(); } long cost = System.currentTimeMillis() - start; System.out.println("10万次Entry操作耗时:" + cost + "ms"); System.out.println("平均耗时:" + (cost / 100000.0) + "ms"); } private void testEntry() { try (Entry entry = SphU.entry("test")) { // 业务逻辑 } catch (BlockException e) { // 限流 } } 总结 性能优化要点: ...

2025-11-20 · maneng

监控指标与告警体系

核心监控指标 流量指标 指标 说明 单位 passQps 通过QPS 次/秒 blockQps 限流QPS 次/秒 successQps 成功QPS 次/秒 exceptionQps 异常QPS 次/秒 avgRt 平均响应时间 毫秒 minRt 最小响应时间 毫秒 maxConcurrency 最大并发数 个 规则指标 指标 说明 flowRuleCount 流控规则数量 degradeRuleCount 熔断规则数量 systemRuleCount 系统规则数量 authorityRuleCount 授权规则数量 系统指标 指标 说明 systemLoad 系统负载 cpuUsage CPU使用率 memoryUsage 内存使用率 threadCount 线程数 Prometheus集成 暴露指标 <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> management: endpoints: web: exposure: include: '*' metrics: export: prometheus: enabled: true 自定义指标 @Component public class SentinelMetrics { private final MeterRegistry meterRegistry; public SentinelMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; // 注册指标采集 Metrics.addRegistry(meterRegistry); // 定时采集Sentinel指标 Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { collectMetrics(); }, 0, 10, TimeUnit.SECONDS); } private void collectMetrics() { Map<String, Node> nodeMap = ClusterBuilderSlot.getClusterNodeMap(); for (Map.Entry<String, ClusterNode> entry : nodeMap.entrySet()) { String resource = entry.getKey(); ClusterNode node = entry.getValue(); // 通过QPS Gauge.builder("sentinel.pass.qps", node, Node::passQps) .tag("resource", resource) .register(meterRegistry); // 限流QPS Gauge.builder("sentinel.block.qps", node, Node::blockQps) .tag("resource", resource) .register(meterRegistry); // 平均RT Gauge.builder("sentinel.avg.rt", node, Node::avgRt) .tag("resource", resource) .register(meterRegistry); } } } Prometheus配置 # prometheus.yml scrape_configs: - job_name: 'sentinel' metrics_path: '/actuator/prometheus' static_configs: - targets: ['localhost:8080'] Grafana Dashboard Dashboard JSON { "dashboard": { "title": "Sentinel Monitoring", "panels": [ { "title": "QPS", "type": "graph", "targets": [ { "expr": "rate(sentinel_pass_qps{resource=\"orderCreate\"}[1m])", "legendFormat": "通过" }, { "expr": "rate(sentinel_block_qps{resource=\"orderCreate\"}[1m])", "legendFormat": "限流" } ] }, { "title": "限流比例", "type": "graph", "targets": [ { "expr": "rate(sentinel_block_qps[1m]) / (rate(sentinel_pass_qps[1m]) + rate(sentinel_block_qps[1m]))", "legendFormat": "{{resource}}" } ] }, { "title": "平均RT", "type": "graph", "targets": [ { "expr": "sentinel_avg_rt", "legendFormat": "{{resource}}" } ] } ] } } 告警规则 Prometheus Alertmanager # alert_rules.yml groups: - name: sentinel_alerts rules: # 限流频繁告警 - alert: HighBlockRate expr: | rate(sentinel_block_qps[1m]) / (rate(sentinel_pass_qps[1m]) + rate(sentinel_block_qps[1m])) > 0.1 for: 5m labels: severity: warning annotations: summary: "限流比例过高" description: "资源 {{ $labels.resource }} 限流比例超过10%" # 熔断告警 - alert: CircuitBreakerOpen expr: sentinel_circuit_breaker_state == 1 for: 1m labels: severity: critical annotations: summary: "熔断器开启" description: "资源 {{ $labels.resource }} 熔断器已开启" # RT过高告警 - alert: HighAvgRT expr: sentinel_avg_rt > 1000 for: 5m labels: severity: warning annotations: summary: "平均RT过高" description: "资源 {{ $labels.resource }} 平均RT超过1秒" # 系统负载过高 - alert: HighSystemLoad expr: system_load > 4.0 for: 2m labels: severity: critical annotations: summary: "系统负载过高" description: "系统负载为 {{ $value }}" Alertmanager配置 # alertmanager.yml global: resolve_timeout: 5m route: group_by: ['alertname', 'resource'] group_wait: 10s group_interval: 10s repeat_interval: 1h receiver: 'default' receivers: - name: 'default' webhook_configs: - url: 'http://localhost:8060/webhook' email_configs: - to: 'ops@example.com' from: 'alert@example.com' smarthost: 'smtp.example.com:587' 自定义告警 钉钉告警 @Component public class DingTalkAlerter { private static final String WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=xxx"; public void sendAlert(String title, String content) { Map<String, Object> message = new HashMap<>(); message.put("msgtype", "markdown"); Map<String, String> markdown = new HashMap<>(); markdown.put("title", title); markdown.put("text", content); message.put("markdown", markdown); RestTemplate restTemplate = new RestTemplate(); restTemplate.postForObject(WEBHOOK_URL, message, String.class); } } @Component public class SentinelAlerter { @Autowired private DingTalkAlerter dingTalkAlerter; private final AtomicLong blockCount = new AtomicLong(0); @PostConstruct public void init() { // 定时检查限流次数 Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { checkBlockCount(); }, 0, 1, TimeUnit.MINUTES); } private void checkBlockCount() { long currentBlock = blockCount.get(); if (currentBlock > 1000) { // 1分钟限流超过1000次 String content = String.format( "### Sentinel告警\\n" + "- **告警类型**: 限流频繁\\n" + "- **限流次数**: %d次/分钟\\n" + "- **告警时间**: %s", currentBlock, LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) ); dingTalkAlerter.sendAlert("Sentinel限流告警", content); } blockCount.set(0); // 重置计数 } public void recordBlock() { blockCount.incrementAndGet(); } } 日志监控 结构化日志 @Aspect @Component public class SentinelLogAspect { private static final Logger log = LoggerFactory.getLogger(SentinelLogAspect.class); @Around("@annotation(sentinelResource)") public Object around(ProceedingJoinPoint pjp, SentinelResource sentinelResource) throws Throwable { String resource = sentinelResource.value(); long start = System.currentTimeMillis(); try { Object result = pjp.proceed(); long cost = System.currentTimeMillis() - start; // 记录成功日志 log.info("resource={}, status=success, cost={}ms", resource, cost); return result; } catch (BlockException e) { // 记录限流日志 log.warn("resource={}, status=blocked, reason={}", resource, e.getRule()); throw e; } catch (Exception e) { // 记录异常日志 log.error("resource={}, status=error, message={}", resource, e.getMessage()); throw e; } } } ELK日志分析 { "logstash": { "input": { "file": { "path": "/var/log/sentinel/*.log", "type": "sentinel" } }, "filter": { "grok": { "match": { "message": "resource=%{WORD:resource}, status=%{WORD:status}, cost=%{NUMBER:cost}ms" } } }, "output": { "elasticsearch": { "hosts": ["localhost:9200"], "index": "sentinel-%{+YYYY.MM.dd}" } } } } 监控大盘 关键指标展示 ┌─────────────────────────────────────┐ │ Sentinel监控大盘 │ ├─────────────────────────────────────┤ │ 实时QPS: 1,234 / 10,000 │ │ 限流比例: 2.3% │ │ 平均RT: 45ms │ │ 熔断器状态: 正常 ✅ │ ├─────────────────────────────────────┤ │ TOP 5 限流资源: │ │ 1. orderCreate 123次 │ │ 2. paymentPay 89次 │ │ 3. productQuery 56次 │ ├─────────────────────────────────────┤ │ 告警记录: │ │ [15:30] 订单服务限流频繁 │ │ [14:20] 商品服务熔断 │ └─────────────────────────────────────┘ 总结 监控告警体系: ...

2025-11-20 · maneng

Dashboard生产部署与配置

Dashboard部署方案 方式1:Jar包直接部署 # 1. 下载Dashboard wget https://github.com/alibaba/Sentinel/releases/download/1.8.6/sentinel-dashboard-1.8.6.jar # 2. 启动Dashboard java -Dserver.port=8080 \ -Dcsp.sentinel.dashboard.server=localhost:8080 \ -Dproject.name=sentinel-dashboard \ -jar sentinel-dashboard-1.8.6.jar # 3. 访问控制台 # http://localhost:8080 # 默认账号/密码:sentinel/sentinel 方式2:Docker部署 FROM openjdk:8-jre-slim # 下载Dashboard ADD https://github.com/alibaba/Sentinel/releases/download/1.8.6/sentinel-dashboard-1.8.6.jar /app.jar # 暴露端口 EXPOSE 8080 # 启动命令 ENTRYPOINT ["java", "-jar", "/app.jar"] # 构建镜像 docker build -t sentinel-dashboard:1.8.6 . # 运行容器 docker run -d \ --name sentinel-dashboard \ -p 8080:8080 \ sentinel-dashboard:1.8.6 方式3:Kubernetes部署 apiVersion: apps/v1 kind: Deployment metadata: name: sentinel-dashboard spec: replicas: 2 selector: matchLabels: app: sentinel-dashboard template: metadata: labels: app: sentinel-dashboard spec: containers: - name: dashboard image: sentinel-dashboard:1.8.6 ports: - containerPort: 8080 env: - name: JAVA_OPTS value: "-Xms512m -Xmx512m" --- apiVersion: v1 kind: Service metadata: name: sentinel-dashboard spec: selector: app: sentinel-dashboard ports: - port: 8080 targetPort: 8080 type: LoadBalancer 生产配置 application.properties # 服务端口 server.port=8080 # Session超时时间(30分钟) server.servlet.session.timeout=30m # 认证配置 auth.enabled=true auth.username=admin auth.password=your_password # 数据源配置(持久化到Nacos) nacos.server-addr=localhost:8848 nacos.namespace=production nacos.group-id=SENTINEL_GROUP # 规则推送模式 sentinel.rule.push.mode=nacos JVM参数 java -Xms1g -Xmx1g \ -XX:+UseG1GC \ -XX:MaxGCPauseMillis=200 \ -Xloggc:logs/gc.log \ -XX:+PrintGCDetails \ -XX:+PrintGCDateStamps \ -jar sentinel-dashboard.jar 改造Dashboard 持久化到Nacos 1. 修改pom.xml <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency> 2. 配置Nacos @Configuration public class NacosConfig { @Bean public ConfigService nacosConfigService() throws Exception { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, "localhost:8848"); properties.put(PropertyKeyConst.NAMESPACE, "production"); return ConfigFactory.createConfigService(properties); } } 3. 规则发布器 @Component public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<FlowRuleEntity> rules) throws Exception { String dataId = app + "-flow-rules"; configService.publishConfig( dataId, "SENTINEL_GROUP", JSON.toJSONString(rules), ConfigType.JSON.getType() ); } } @Component public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> { @Autowired private ConfigService configService; @Override public List<FlowRuleEntity> getRules(String appName) throws Exception { String dataId = appName + "-flow-rules"; String rules = configService.getConfig(dataId, "SENTINEL_GROUP", 3000); return StringUtil.isEmpty(rules) ? new ArrayList<>() : JSON.parseArray(rules, FlowRuleEntity.class); } } 4. Controller改造 @RestController @RequestMapping("/v1/flow") public class FlowControllerV2 { @Autowired private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider; @Autowired private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher; @GetMapping("/rules") public Result<List<FlowRuleEntity>> apiQueryRules(@RequestParam String app) { try { List<FlowRuleEntity> rules = ruleProvider.getRules(app); return Result.ofSuccess(rules); } catch (Exception e) { return Result.ofFail(-1, e.getMessage()); } } @PostMapping("/rule") public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) { try { List<FlowRuleEntity> rules = ruleProvider.getRules(entity.getApp()); rules.add(entity); rulePublisher.publish(entity.getApp(), rules); return Result.ofSuccess(entity); } catch (Exception e) { return Result.ofFail(-1, e.getMessage()); } } } 权限控制 自定义用户认证 @Configuration public class WebSecurityConfig extends WebSecurityConfigurerAdapter { @Override protected void configure(HttpSecurity http) throws Exception { http .authorizeRequests() .antMatchers("/").permitAll() .anyRequest().authenticated() .and() .formLogin() .loginPage("/login") .permitAll() .and() .logout() .permitAll(); } @Override protected void configure(AuthenticationManagerBuilder auth) throws Exception { auth.inMemoryAuthentication() .withUser("admin").password("{noop}admin123").roles("ADMIN") .and() .withUser("user").password("{noop}user123").roles("USER"); } } 基于角色的权限控制 @RestController @RequestMapping("/v1/flow") public class FlowController { @GetMapping("/rules") @PreAuthorize("hasAnyRole('ADMIN', 'USER')") public Result apiQueryRules(@RequestParam String app) { // ... } @PostMapping("/rule") @PreAuthorize("hasRole('ADMIN')") public Result apiAddRule(@RequestBody FlowRuleEntity entity) { // ... } @DeleteMapping("/rule/{id}") @PreAuthorize("hasRole('ADMIN')") public Result apiDeleteRule(@PathVariable Long id) { // ... } } 监控告警 Prometheus集成 @Configuration public class PrometheusConfig { @Bean MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config().commonTags("application", "sentinel-dashboard"); } } Grafana Dashboard { "dashboard": { "title": "Sentinel Dashboard Monitoring", "panels": [ { "title": "QPS", "targets": [ { "expr": "rate(sentinel_pass_qps[1m])" } ] }, { "title": "Block Rate", "targets": [ { "expr": "rate(sentinel_block_qps[1m]) / rate(sentinel_pass_qps[1m])" } ] } ] } } 高可用部署 Nginx负载均衡 upstream sentinel_dashboard { server 192.168.1.10:8080; server 192.168.1.11:8080; } server { listen 80; server_name dashboard.example.com; location / { proxy_pass http://sentinel_dashboard; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } } Session共享 # Redis共享Session spring.session.store-type=redis spring.redis.host=localhost spring.redis.port=6379 运维脚本 启动脚本 #!/bin/bash # start.sh JAVA_OPTS="-Xms1g -Xmx1g -XX:+UseG1GC" APP_NAME="sentinel-dashboard" APP_JAR="sentinel-dashboard.jar" nohup java $JAVA_OPTS -jar $APP_JAR > logs/dashboard.log 2>&1 & echo $! > dashboard.pid echo "Dashboard started" 停止脚本 #!/bin/bash # stop.sh if [ -f dashboard.pid ]; then kill $(cat dashboard.pid) rm -f dashboard.pid echo "Dashboard stopped" else echo "PID file not found" fi 健康检查 #!/bin/bash # health_check.sh URL="http://localhost:8080/actuator/health" if curl -f $URL > /dev/null 2>&1; then echo "Dashboard is healthy" exit 0 else echo "Dashboard is down" exit 1 fi 总结 Dashboard生产部署要点: ...

2025-11-20 · maneng

集群流控的架构设计

架构设计 ┌────────────────────────────────┐ │ Token Server │ │ ├─ TokenService (令牌服务) │ │ ├─ FlowStatistics (统计) │ │ └─ ClusterFlowRuleChecker │ └────────┬───────────────────────┘ │ Netty通信 ┌────┴───┬────┬────┐ ↓ ↓ ↓ ↓ Client1 Client2 Client3 Client4 Token Server核心 TokenService public class TokenService { // 资源ID → FlowRule private final Map<Long, FlowRule> ruleMap = new ConcurrentHashMap<>(); // 资源ID → 统计数据 private final Map<Long, ClusterMetric> metricMap = new ConcurrentHashMap<>(); // 请求令牌 public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { if (ruleId == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } FlowRule rule = ruleMap.get(ruleId); if (rule == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } // 获取统计数据 ClusterMetric metric = metricMap.computeIfAbsent(ruleId, k -> new ClusterMetric()); // 检查是否允许通过 boolean pass = canPass(rule, metric, acquireCount); if (pass) { metric.add(acquireCount); return new TokenResult(TokenResultStatus.OK); } else { return new TokenResult(TokenResultStatus.BLOCKED); } } private boolean canPass(FlowRule rule, ClusterMetric metric, int acquireCount) { double count = rule.getCount(); long currentCount = metric.getCurrentCount(); return currentCount + acquireCount <= count; } } ClusterMetric public class ClusterMetric { private final LeapArray<MetricBucket> data; public ClusterMetric() { this.data = new BucketLeapArray(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); } public long getCurrentCount() { data.currentWindow(); long pass = 0; List<MetricBucket> list = data.values(); for (MetricBucket bucket : list) { pass += bucket.pass(); } return pass; } public void add(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); } } Token Client核心 ClusterFlowChecker public class ClusterFlowChecker { public static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) { // 获取集群客户端 ClusterTokenClient client = TokenClientProvider.getClient(); if (client == null) { // 客户端未初始化,降级为本地限流 return null; } long flowId = rule.getClusterConfig().getFlowId(); // 请求令牌 TokenResult result = null; try { result = client.requestToken(flowId, acquireCount, prioritized); } catch (Exception ex) { // 请求失败,降级为本地限流 RecordLog.warn("[ClusterFlowChecker] Request cluster token failed", ex); } return result; } } TokenResult public class TokenResult { private Integer status; // 状态码 private int remaining; // 剩余令牌 private int waitInMs; // 等待时间 private Map<String, String> attachments; // 附加信息 public TokenResult(Integer status) { this.status = status; } public boolean isPass() { return status == TokenResultStatus.OK; } public boolean isBlocked() { return status == TokenResultStatus.BLOCKED; } public boolean shouldWait() { return status == TokenResultStatus.SHOULD_WAIT; } } public final class TokenResultStatus { public static final int OK = 0; public static final int BLOCKED = 1; public static final int SHOULD_WAIT = 2; public static final int NO_RULE_EXISTS = 3; public static final int BAD_REQUEST = 4; public static final int FAIL = 5; } Netty通信 Server端 public class TokenServer { private final int port; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public void start() throws Exception { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Decoder()) .addLast(new Encoder()) .addLast(new TokenServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } public void stop() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } } public class TokenServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof TokenRequest) { TokenRequest request = (TokenRequest) msg; // 请求令牌 TokenResult result = TokenService.requestToken( request.getFlowId(), request.getAcquireCount(), request.isPriority() ); // 响应 TokenResponse response = new TokenResponse(); response.setStatus(result.getStatus()); response.setRemaining(result.getRemaining()); ctx.writeAndFlush(response); } } } Client端 public class TokenClient { private EventLoopGroup group; private Bootstrap bootstrap; private Channel channel; public void start(String host, int port) throws Exception { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Encoder()) .addLast(new Decoder()) .addLast(new TokenClientHandler()); } }); channel = bootstrap.connect(host, port).sync().channel(); } public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) { TokenRequest request = new TokenRequest(); request.setFlowId(flowId); request.setAcquireCount(acquireCount); request.setPriority(prioritized); // 发送请求 channel.writeAndFlush(request); // 等待响应(简化处理) return waitForResponse(); } } 失败降级 public class ClusterFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { FlowRule rule = getRuleFor(resourceWrapper); if (rule != null && rule.isClusterMode()) { // 集群模式 TokenResult result = ClusterFlowChecker.acquireClusterToken(rule, count, false); if (result == null || result.getStatus() == TokenResultStatus.FAIL) { // Token Server不可用,降级为本地限流 fallbackToLocalOrPass(context, rule, node, count, args); } else if (result.isBlocked()) { // 集群限流拒绝 throw new FlowException(resourceWrapper.getName(), rule); } } else { // 本地模式 checkLocalFlow(context, resourceWrapper, node, count, args); } fireEntry(context, resourceWrapper, node, count, args); } private void fallbackToLocalOrPass(Context context, FlowRule rule, DefaultNode node, int count, Object... args) throws BlockException { if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { // 降级为本地限流 checkLocalFlow(context, null, node, count, args); } else { // 直接通过 // Do nothing } } } 动态切换 Server选举 public class ServerElection { private final NacosNamingService namingService; private volatile boolean isLeader = false; public void elect() { try { // 注册临时节点 Instance instance = new Instance(); instance.setIp(NetUtil.getLocalHost()); instance.setPort(ClusterConstants.DEFAULT_TOKEN_SERVER_PORT); instance.setHealthy(true); instance.setWeight(1.0); namingService.registerInstance( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, instance ); // 监听节点变化 namingService.subscribe( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, event -> { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event).getInstances(); checkAndUpdateLeader(instances); } } ); } catch (Exception e) { RecordLog.warn("Election failed", e); } } private void checkAndUpdateLeader(List<Instance> instances) { if (instances.isEmpty()) { isLeader = false; return; } // 选择第一个实例为Leader Instance leader = instances.get(0); isLeader = isCurrentInstance(leader); if (isLeader) { startTokenServer(); } else { stopTokenServer(); connectToLeader(leader); } } } 总结 集群流控架构核心: ...

2025-11-20 · maneng

规则管理与推送机制

规则管理架构 ┌─────────────────────────────────────┐ │ Dashboard / 配置中心 │ └────────────────┬────────────────────┘ │ Push/Pull ↓ ┌─────────────────────────────────────┐ │ RuleManager │ │ ├─ FlowRuleManager │ │ ├─ DegradeRuleManager │ │ ├─ SystemRuleManager │ │ ├─ AuthorityRuleManager │ │ └─ ParamFlowRuleManager │ └────────────────┬────────────────────┘ │ Property ↓ ┌─────────────────────────────────────┐ │ PropertyListener │ │ (规则变更监听) │ └────────────────┬────────────────────┘ │ Notify ↓ ┌─────────────────────────────────────┐ │ Slot Chain │ │ (应用规则) │ └─────────────────────────────────────┘ RuleManager核心实现 FlowRuleManager public class FlowRuleManager { // 规则存储:资源名 → 规则列表 private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<>(); // 规则监听器 private static final RulePropertyListener LISTENER = new FlowPropertyListener(); // 数据源属性 private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<>(); static { currentProperty.addListener(LISTENER); } // 加载规则 public static void loadRules(List<FlowRule> rules) { currentProperty.updateValue(rules); } // 注册数据源 public static void register2Property(SentinelProperty<List<FlowRule>> property) { AssertUtil.notNull(property, "property cannot be null"); synchronized (LISTENER) { currentProperty.removeListener(LISTENER); property.addListener(LISTENER); currentProperty = property; } } // 获取规则 public static List<FlowRule> getRules() { List<FlowRule> rules = new ArrayList<>(); for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) { rules.addAll(entry.getValue()); } return rules; } // 获取资源的规则 public static List<FlowRule> getRules(String resource) { return flowRules.get(resource); } } PropertyListener private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> { @Override public synchronized void configLoad(List<FlowRule> value) { Map<String, List<FlowRule>> rules = new ConcurrentHashMap<>(); if (value != null) { for (FlowRule rule : value) { // 校验规则 if (!isValidRule(rule)) { RecordLog.warn("Bad flow rule: " + rule.toString()); continue; } // 解析流控器 FlowRuleUtil.generateRater(rule); // 按资源分组 String resourceName = rule.getResource(); rules.computeIfAbsent(resourceName, k -> new ArrayList<>()).add(rule); } } // 更新规则 flowRules.clear(); flowRules.putAll(rules); RecordLog.info("[FlowRuleManager] Flow rules loaded: " + rules); } @Override public void configUpdate(List<FlowRule> value) { configLoad(value); } } 数据源抽象 ReadableDataSource public interface ReadableDataSource<S, T> extends AutoCloseable { // 加载配置 T loadConfig() throws Exception; // 获取属性 SentinelProperty<T> getProperty(); } AbstractDataSource public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> { protected final Converter<S, T> parser; protected final SentinelProperty<T> property; public AbstractDataSource(Converter<S, T> parser) { this.parser = parser; this.property = new DynamicSentinelProperty<>(); } @Override public T loadConfig() throws Exception { S source = readSource(); return parser.convert(source); } // 子类实现:读取数据源 public abstract S readSource() throws Exception; @Override public SentinelProperty<T> getProperty() { return property; } } Pull模式实现 FileDataSource public class FileRefreshableDataSource<T> extends AbstractDataSource<String, T> { private final String file; private long lastModified = 0; public FileRefreshableDataSource(File file, Converter<String, T> parser) { super(parser); this.file = file.getAbsolutePath(); // 启动定时任务 startTimerTask(); } @Override public String readSource() throws Exception { return Files.readAllLines(Paths.get(file)) .stream() .collect(Collectors.joining("\n")); } private void startTimerTask() { ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.scheduleAtFixedRate(() -> { try { File f = new File(file); long curModified = f.lastModified(); if (curModified > lastModified) { lastModified = curModified; T conf = loadConfig(); getProperty().updateValue(conf); } } catch (Exception e) { RecordLog.warn("Error when loading file", e); } }, 0, 3, TimeUnit.SECONDS); } } Push模式实现 NacosDataSource public class NacosDataSource<T> extends AbstractDataSource<String, T> { private final ConfigService configService; private final String dataId; private final String groupId; public NacosDataSource(String serverAddr, String groupId, String dataId, Converter<String, T> parser) throws Exception { super(parser); this.groupId = groupId; this.dataId = dataId; Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); this.configService = ConfigFactory.createConfigService(properties); initNacosListener(); loadInitialConfig(); } @Override public String readSource() throws Exception { return configService.getConfig(dataId, groupId, 3000); } private void initNacosListener() { try { configService.addListener(dataId, groupId, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { RecordLog.info("[NacosDataSource] New property value received: " + configInfo); T newValue = NacosDataSource.this.parser.convert(configInfo); getProperty().updateValue(newValue); } }); } catch (Exception e) { throw new IllegalStateException("Error occurred when initializing Nacos listener", e); } } private void loadInitialConfig() { try { T newValue = loadConfig(); if (newValue == null) { RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); } getProperty().updateValue(newValue); } catch (Exception e) { RecordLog.warn("[NacosDataSource] Error when loading initial config", e); } } } 规则校验 public class RuleValidator { public static boolean isValidFlowRule(FlowRule rule) { if (rule == null) { return false; } // 校验资源名 if (StringUtil.isBlank(rule.getResource())) { return false; } // 校验阈值 if (rule.getCount() < 0) { return false; } // 校验限流模式 if (rule.getGrade() != RuleConstant.FLOW_GRADE_QPS && rule.getGrade() != RuleConstant.FLOW_GRADE_THREAD) { return false; } // 校验流控策略 if (rule.getStrategy() != RuleConstant.STRATEGY_DIRECT && rule.getStrategy() != RuleConstant.STRATEGY_RELATE && rule.getStrategy() != RuleConstant.STRATEGY_CHAIN) { return false; } // 校验流控效果 if (rule.getControlBehavior() < 0 || rule.getControlBehavior() > 2) { return false; } return true; } } 规则转换器 public class FlowRuleJsonConverter implements Converter<String, List<FlowRule>> { @Override public List<FlowRule> convert(String source) { if (StringUtil.isEmpty(source)) { return new ArrayList<>(); } try { return JSON.parseArray(source, FlowRule.class); } catch (Exception e) { RecordLog.warn("Error when parsing flow rules", e); return new ArrayList<>(); } } } Dashboard推送 RulePublisher public interface DynamicRulePublisher<T> { void publish(String app, T rules) throws Exception; } @Component public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<FlowRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } String dataId = app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX; configService.publishConfig( dataId, NacosConfigUtil.GROUP_ID, JSON.toJSONString(rules), ConfigType.JSON.getType() ); } } RuleProvider public interface DynamicRuleProvider<T> { T getRules(String appName) throws Exception; } @Component public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> { @Autowired private ConfigService configService; @Override public List<FlowRuleEntity> getRules(String appName) throws Exception { String dataId = appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX; String rules = configService.getConfig( dataId, NacosConfigUtil.GROUP_ID, 3000 ); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } return JSON.parseArray(rules, FlowRuleEntity.class); } } 总结 规则管理核心机制: ...

2025-11-20 · maneng

滑动窗口算法:统计的秘密武器

滑动窗口原理 为什么需要滑动窗口? 固定窗口的临界问题: 时间: 0s 0.5s 1s 1.5s 2s |---窗口1---|---窗口2---| 请求: 999个 1个 999个 窗口1:999 < 1000 ✅ 窗口2:999 < 1000 ✅ 实际:0.5s内1998个请求 ❌ 滑动窗口解决方案: 时间: 0s 1s 2s |----窗口-----| | 分成10个格子 | 每个格子:100ms 统计QPS:滑动统计最近1秒的请求 LeapArray实现 核心数据结构 public abstract class LeapArray<T> { protected int windowLengthInMs; // 每个窗口时长 protected int sampleCount; // 窗口数量 protected int intervalInMs; // 总时长 protected final AtomicReferenceArray<WindowWrap<T>> array; public LeapArray(int sampleCount, int intervalInMs) { this.sampleCount = sampleCount; this.intervalInMs = intervalInMs; this.windowLengthInMs = intervalInMs / sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } } WindowWrap(窗口包装) public class WindowWrap<T> { private final long windowLengthInMs; // 窗口时长 private long windowStart; // 窗口开始时间 private T value; // 统计数据 public WindowWrap(long windowLengthInMs, long windowStart, T value) { this.windowLengthInMs = windowLengthInMs; this.windowStart = windowStart; this.value = value; } public boolean isTimeInWindow(long timeMillis) { return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs; } } MetricBucket(统计桶) public class MetricBucket { private final LongAdder[] counters; // 各项指标计数器 private volatile long minRt; // 最小RT public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } public long get(MetricEvent event) { return counters[event.ordinal()].sum(); } public void add(MetricEvent event, long n) { counters[event.ordinal()].add(n); } } enum MetricEvent { PASS, // 通过 BLOCK, // 阻塞 EXCEPTION, // 异常 SUCCESS, // 成功 RT, // 响应时间 OCCUPIED_PASS // 占用通过 } 窗口获取算法 public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { // 窗口不存在,创建新窗口 WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { // 窗口存在且有效 return old; } else if (windowStart > old.windowStart()) { // 窗口过期,重置窗口 if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // 时钟回拨 return new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } // 计算窗口索引 private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / windowLengthInMs; return (int) (timeId % array.length()); } // 计算窗口开始时间 private long calculateWindowStart(long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } QPS计算 public double getQps() { long pass = 0; List<MetricBucket> list = listAll(); for (MetricBucket window : list) { pass += window.pass(); } return pass / intervalInSec(); } public List<MetricBucket> listAll() { List<MetricBucket> result = new ArrayList<>(); long currentTime = TimeUtil.currentTimeMillis(); for (int i = 0; i < array.length(); i++) { WindowWrap<MetricBucket> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(currentTime, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; } 实战示例 统计QPS public class MetricDemo { public static void main(String[] args) throws InterruptedException { // 创建滑动窗口:2个窗口,1秒总时长 ArrayMetric metric = new ArrayMetric(2, 1000); // 模拟请求 for (int i = 0; i < 100; i++) { metric.addPass(1); Thread.sleep(10); } // 统计QPS System.out.println("QPS: " + metric.pass()); System.out.println("Success: " + metric.success()); System.out.println("AvgRT: " + metric.avgRt()); } } 自定义统计 public class CustomMetric extends LeapArray<MetricBucket> { public CustomMetric(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override public MetricBucket newEmptyBucket(long timeMillis) { return new MetricBucket(); } @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) { w.resetTo(startTime); w.value().reset(); return w; } // 自定义统计方法 public void addCustomEvent(String event, long value) { WindowWrap<MetricBucket> wrap = currentWindow(); wrap.value().add(event, value); } } 性能优化 1. LongAdder替代AtomicLong // 高并发下性能更好 private final LongAdder counter = new LongAdder(); public void increment() { counter.increment(); } public long sum() { return counter.sum(); } 2. 数组环形复用 // 使用固定大小数组,避免GC private final AtomicReferenceArray<WindowWrap<T>> array; // 环形索引 private int idx = (int) (timeId % array.length()); 3. CAS无锁更新 // 使用CAS避免锁竞争 if (array.compareAndSet(idx, null, window)) { return window; } 时间对齐问题 // 问题:时间戳100ms对齐 long timeMillis = 12345; // 实际时间 long windowStart = timeMillis - timeMillis % 100; // 对齐到100ms // 结果 12345 → 12300 12399 → 12300 12400 → 12400 总结 滑动窗口核心: ...

2025-11-20 · maneng

责任链模式:ProcessorSlotChain详解

Slot Chain架构 AbstractLinkedProcessorSlot (抽象类) ↓ ProcessorSlotChain (链) ↓ ┌─────────┬─────────┬──────────┬──────────┐ ↓ ↓ ↓ ↓ ↓ NodeSelector Cluster Statistic Flow Degrade Slot Slot Slot Slot Slot 核心Slot详解 1. NodeSelectorSlot(节点选择) 作用:构建调用树,创建Context和DefaultNode。 public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { private volatile Map<String, DefaultNode> map = new HashMap<>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable { // 获取或创建DefaultNode DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { node = new DefaultNode(resourceWrapper, null); map.put(context.getName(), node); } } } // 设置当前节点 context.setCurNode(node); // 调用下一个Slot fireEntry(context, resourceWrapper, node, count, args); } } 2. ClusterBuilderSlot(集群构建) 作用:创建ClusterNode,全局统计。 ...

2025-11-20 · maneng

Sentinel核心架构设计全景

整体架构 ┌────────────────────────────────────────┐ │ Sentinel Client │ ├────────────────────────────────────────┤ │ API Layer (SphU/SphO/Entry) │ ├────────────────────────────────────────┤ │ Slot Chain (责任链模式) │ │ ├─ NodeSelectorSlot │ │ ├─ ClusterBuilderSlot │ │ ├─ StatisticSlot │ │ ├─ AuthoritySlot │ │ ├─ SystemSlot │ │ ├─ FlowSlot │ │ └─ DegradeSlot │ ├────────────────────────────────────────┤ │ Metrics (滑动窗口统计) │ ├────────────────────────────────────────┤ │ Rule Manager (规则管理) │ └────────────────────────────────────────┘ 核心组件 1. Entry(资源入口) public abstract class Entry implements AutoCloseable { private final long createTime; private Node curNode; private Node originNode; private Throwable error; protected Entry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { this.resourceWrapper = resourceWrapper; this.chain = chain; this.context = context; this.createTime = TimeUtil.currentTimeMillis(); } @Override public void close() { if (chain != null) { chain.exit(context, resourceWrapper, count, args); } } } 作用: ...

2025-11-20 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...