滑动窗口算法:统计的秘密武器
滑动窗口原理 为什么需要滑动窗口? 固定窗口的临界问题: 时间: 0s 0.5s 1s 1.5s 2s |---窗口1---|---窗口2---| 请求: 999个 1个 999个 窗口1:999 < 1000 ✅ 窗口2:999 < 1000 ✅ 实际:0.5s内1998个请求 ❌ 滑动窗口解决方案: 时间: 0s 1s 2s |----窗口-----| | 分成10个格子 | 每个格子:100ms 统计QPS:滑动统计最近1秒的请求 LeapArray实现 核心数据结构 public abstract class LeapArray<T> { protected int windowLengthInMs; // 每个窗口时长 protected int sampleCount; // 窗口数量 protected int intervalInMs; // 总时长 protected final AtomicReferenceArray<WindowWrap<T>> array; public LeapArray(int sampleCount, int intervalInMs) { this.sampleCount = sampleCount; this.intervalInMs = intervalInMs; this.windowLengthInMs = intervalInMs / sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } } WindowWrap(窗口包装) public class WindowWrap<T> { private final long windowLengthInMs; // 窗口时长 private long windowStart; // 窗口开始时间 private T value; // 统计数据 public WindowWrap(long windowLengthInMs, long windowStart, T value) { this.windowLengthInMs = windowLengthInMs; this.windowStart = windowStart; this.value = value; } public boolean isTimeInWindow(long timeMillis) { return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs; } } MetricBucket(统计桶) public class MetricBucket { private final LongAdder[] counters; // 各项指标计数器 private volatile long minRt; // 最小RT public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } public long get(MetricEvent event) { return counters[event.ordinal()].sum(); } public void add(MetricEvent event, long n) { counters[event.ordinal()].add(n); } } enum MetricEvent { PASS, // 通过 BLOCK, // 阻塞 EXCEPTION, // 异常 SUCCESS, // 成功 RT, // 响应时间 OCCUPIED_PASS // 占用通过 } 窗口获取算法 public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { // 窗口不存在,创建新窗口 WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { // 窗口存在且有效 return old; } else if (windowStart > old.windowStart()) { // 窗口过期,重置窗口 if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // 时钟回拨 return new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } // 计算窗口索引 private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / windowLengthInMs; return (int) (timeId % array.length()); } // 计算窗口开始时间 private long calculateWindowStart(long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } QPS计算 public double getQps() { long pass = 0; List<MetricBucket> list = listAll(); for (MetricBucket window : list) { pass += window.pass(); } return pass / intervalInSec(); } public List<MetricBucket> listAll() { List<MetricBucket> result = new ArrayList<>(); long currentTime = TimeUtil.currentTimeMillis(); for (int i = 0; i < array.length(); i++) { WindowWrap<MetricBucket> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(currentTime, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; } 实战示例 统计QPS public class MetricDemo { public static void main(String[] args) throws InterruptedException { // 创建滑动窗口:2个窗口,1秒总时长 ArrayMetric metric = new ArrayMetric(2, 1000); // 模拟请求 for (int i = 0; i < 100; i++) { metric.addPass(1); Thread.sleep(10); } // 统计QPS System.out.println("QPS: " + metric.pass()); System.out.println("Success: " + metric.success()); System.out.println("AvgRT: " + metric.avgRt()); } } 自定义统计 public class CustomMetric extends LeapArray<MetricBucket> { public CustomMetric(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override public MetricBucket newEmptyBucket(long timeMillis) { return new MetricBucket(); } @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) { w.resetTo(startTime); w.value().reset(); return w; } // 自定义统计方法 public void addCustomEvent(String event, long value) { WindowWrap<MetricBucket> wrap = currentWindow(); wrap.value().add(event, value); } } 性能优化 1. LongAdder替代AtomicLong // 高并发下性能更好 private final LongAdder counter = new LongAdder(); public void increment() { counter.increment(); } public long sum() { return counter.sum(); } 2. 数组环形复用 // 使用固定大小数组,避免GC private final AtomicReferenceArray<WindowWrap<T>> array; // 环形索引 private int idx = (int) (timeId % array.length()); 3. CAS无锁更新 // 使用CAS避免锁竞争 if (array.compareAndSet(idx, null, window)) { return window; } 时间对齐问题 // 问题:时间戳100ms对齐 long timeMillis = 12345; // 实际时间 long windowStart = timeMillis - timeMillis % 100; // 对齐到100ms // 结果 12345 → 12300 12399 → 12300 12400 → 12400 总结 滑动窗口核心: ...