滑动窗口原理
为什么需要滑动窗口?
固定窗口的临界问题:
时间: 0s 0.5s 1s 1.5s 2s
|---窗口1---|---窗口2---|
请求: 999个 1个 999个
窗口1:999 < 1000 ✅
窗口2:999 < 1000 ✅
实际:0.5s内1998个请求 ❌
滑动窗口解决方案:
时间: 0s 1s 2s
|----窗口-----|
| 分成10个格子 |
每个格子:100ms
统计QPS:滑动统计最近1秒的请求
LeapArray实现
核心数据结构
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 每个窗口时长
protected int sampleCount; // 窗口数量
protected int intervalInMs; // 总时长
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
this.sampleCount = sampleCount;
this.intervalInMs = intervalInMs;
this.windowLengthInMs = intervalInMs / sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
WindowWrap(窗口包装)
public class WindowWrap<T> {
private final long windowLengthInMs; // 窗口时长
private long windowStart; // 窗口开始时间
private T value; // 统计数据
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis &&
timeMillis < windowStart + windowLengthInMs;
}
}
MetricBucket(统计桶)
public class MetricBucket {
private final LongAdder[] counters; // 各项指标计数器
private volatile long minRt; // 最小RT
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public void add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
}
}
enum MetricEvent {
PASS, // 通过
BLOCK, // 阻塞
EXCEPTION, // 异常
SUCCESS, // 成功
RT, // 响应时间
OCCUPIED_PASS // 占用通过
}
窗口获取算法
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 窗口不存在,创建新窗口
WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 窗口存在且有效
return old;
} else if (windowStart > old.windowStart()) {
// 窗口过期,重置窗口
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 时钟回拨
return new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
// 计算窗口索引
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int) (timeId % array.length());
}
// 计算窗口开始时间
private long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
QPS计算
public double getQps() {
long pass = 0;
List<MetricBucket> list = listAll();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass / intervalInSec();
}
public List<MetricBucket> listAll() {
List<MetricBucket> result = new ArrayList<>();
long currentTime = TimeUtil.currentTimeMillis();
for (int i = 0; i < array.length(); i++) {
WindowWrap<MetricBucket> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(currentTime, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
实战示例
统计QPS
public class MetricDemo {
public static void main(String[] args) throws InterruptedException {
// 创建滑动窗口:2个窗口,1秒总时长
ArrayMetric metric = new ArrayMetric(2, 1000);
// 模拟请求
for (int i = 0; i < 100; i++) {
metric.addPass(1);
Thread.sleep(10);
}
// 统计QPS
System.out.println("QPS: " + metric.pass());
System.out.println("Success: " + metric.success());
System.out.println("AvgRT: " + metric.avgRt());
}
}
自定义统计
public class CustomMetric extends LeapArray<MetricBucket> {
public CustomMetric(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long timeMillis) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}
// 自定义统计方法
public void addCustomEvent(String event, long value) {
WindowWrap<MetricBucket> wrap = currentWindow();
wrap.value().add(event, value);
}
}
性能优化
1. LongAdder替代AtomicLong
// 高并发下性能更好
private final LongAdder counter = new LongAdder();
public void increment() {
counter.increment();
}
public long sum() {
return counter.sum();
}
2. 数组环形复用
// 使用固定大小数组,避免GC
private final AtomicReferenceArray<WindowWrap<T>> array;
// 环形索引
private int idx = (int) (timeId % array.length());
3. CAS无锁更新
// 使用CAS避免锁竞争
if (array.compareAndSet(idx, null, window)) {
return window;
}
时间对齐问题
// 问题:时间戳100ms对齐
long timeMillis = 12345; // 实际时间
long windowStart = timeMillis - timeMillis % 100; // 对齐到100ms
// 结果
12345 → 12300
12399 → 12300
12400 → 12400
总结
滑动窗口核心:
- LeapArray:环形数组存储窗口
- WindowWrap:包装每个时间窗口
- MetricBucket:统计各项指标
- CAS无锁:高性能并发更新
- 时间对齐:窗口开始时间对齐
性能优势:
- O(1)复杂度获取当前窗口
- LongAdder高并发统计
- 环形数组避免GC
- 无锁CAS更新