集群流控的架构设计

架构设计 ┌────────────────────────────────┐ │ Token Server │ │ ├─ TokenService (令牌服务) │ │ ├─ FlowStatistics (统计) │ │ └─ ClusterFlowRuleChecker │ └────────┬───────────────────────┘ │ Netty通信 ┌────┴───┬────┬────┐ ↓ ↓ ↓ ↓ Client1 Client2 Client3 Client4 Token Server核心 TokenService public class TokenService { // 资源ID → FlowRule private final Map<Long, FlowRule> ruleMap = new ConcurrentHashMap<>(); // 资源ID → 统计数据 private final Map<Long, ClusterMetric> metricMap = new ConcurrentHashMap<>(); // 请求令牌 public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { if (ruleId == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } FlowRule rule = ruleMap.get(ruleId); if (rule == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } // 获取统计数据 ClusterMetric metric = metricMap.computeIfAbsent(ruleId, k -> new ClusterMetric()); // 检查是否允许通过 boolean pass = canPass(rule, metric, acquireCount); if (pass) { metric.add(acquireCount); return new TokenResult(TokenResultStatus.OK); } else { return new TokenResult(TokenResultStatus.BLOCKED); } } private boolean canPass(FlowRule rule, ClusterMetric metric, int acquireCount) { double count = rule.getCount(); long currentCount = metric.getCurrentCount(); return currentCount + acquireCount <= count; } } ClusterMetric public class ClusterMetric { private final LeapArray<MetricBucket> data; public ClusterMetric() { this.data = new BucketLeapArray(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); } public long getCurrentCount() { data.currentWindow(); long pass = 0; List<MetricBucket> list = data.values(); for (MetricBucket bucket : list) { pass += bucket.pass(); } return pass; } public void add(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); } } Token Client核心 ClusterFlowChecker public class ClusterFlowChecker { public static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) { // 获取集群客户端 ClusterTokenClient client = TokenClientProvider.getClient(); if (client == null) { // 客户端未初始化,降级为本地限流 return null; } long flowId = rule.getClusterConfig().getFlowId(); // 请求令牌 TokenResult result = null; try { result = client.requestToken(flowId, acquireCount, prioritized); } catch (Exception ex) { // 请求失败,降级为本地限流 RecordLog.warn("[ClusterFlowChecker] Request cluster token failed", ex); } return result; } } TokenResult public class TokenResult { private Integer status; // 状态码 private int remaining; // 剩余令牌 private int waitInMs; // 等待时间 private Map<String, String> attachments; // 附加信息 public TokenResult(Integer status) { this.status = status; } public boolean isPass() { return status == TokenResultStatus.OK; } public boolean isBlocked() { return status == TokenResultStatus.BLOCKED; } public boolean shouldWait() { return status == TokenResultStatus.SHOULD_WAIT; } } public final class TokenResultStatus { public static final int OK = 0; public static final int BLOCKED = 1; public static final int SHOULD_WAIT = 2; public static final int NO_RULE_EXISTS = 3; public static final int BAD_REQUEST = 4; public static final int FAIL = 5; } Netty通信 Server端 public class TokenServer { private final int port; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public void start() throws Exception { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Decoder()) .addLast(new Encoder()) .addLast(new TokenServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } public void stop() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } } public class TokenServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof TokenRequest) { TokenRequest request = (TokenRequest) msg; // 请求令牌 TokenResult result = TokenService.requestToken( request.getFlowId(), request.getAcquireCount(), request.isPriority() ); // 响应 TokenResponse response = new TokenResponse(); response.setStatus(result.getStatus()); response.setRemaining(result.getRemaining()); ctx.writeAndFlush(response); } } } Client端 public class TokenClient { private EventLoopGroup group; private Bootstrap bootstrap; private Channel channel; public void start(String host, int port) throws Exception { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Encoder()) .addLast(new Decoder()) .addLast(new TokenClientHandler()); } }); channel = bootstrap.connect(host, port).sync().channel(); } public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) { TokenRequest request = new TokenRequest(); request.setFlowId(flowId); request.setAcquireCount(acquireCount); request.setPriority(prioritized); // 发送请求 channel.writeAndFlush(request); // 等待响应(简化处理) return waitForResponse(); } } 失败降级 public class ClusterFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { FlowRule rule = getRuleFor(resourceWrapper); if (rule != null && rule.isClusterMode()) { // 集群模式 TokenResult result = ClusterFlowChecker.acquireClusterToken(rule, count, false); if (result == null || result.getStatus() == TokenResultStatus.FAIL) { // Token Server不可用,降级为本地限流 fallbackToLocalOrPass(context, rule, node, count, args); } else if (result.isBlocked()) { // 集群限流拒绝 throw new FlowException(resourceWrapper.getName(), rule); } } else { // 本地模式 checkLocalFlow(context, resourceWrapper, node, count, args); } fireEntry(context, resourceWrapper, node, count, args); } private void fallbackToLocalOrPass(Context context, FlowRule rule, DefaultNode node, int count, Object... args) throws BlockException { if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { // 降级为本地限流 checkLocalFlow(context, null, node, count, args); } else { // 直接通过 // Do nothing } } } 动态切换 Server选举 public class ServerElection { private final NacosNamingService namingService; private volatile boolean isLeader = false; public void elect() { try { // 注册临时节点 Instance instance = new Instance(); instance.setIp(NetUtil.getLocalHost()); instance.setPort(ClusterConstants.DEFAULT_TOKEN_SERVER_PORT); instance.setHealthy(true); instance.setWeight(1.0); namingService.registerInstance( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, instance ); // 监听节点变化 namingService.subscribe( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, event -> { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event).getInstances(); checkAndUpdateLeader(instances); } } ); } catch (Exception e) { RecordLog.warn("Election failed", e); } } private void checkAndUpdateLeader(List<Instance> instances) { if (instances.isEmpty()) { isLeader = false; return; } // 选择第一个实例为Leader Instance leader = instances.get(0); isLeader = isCurrentInstance(leader); if (isLeader) { startTokenServer(); } else { stopTokenServer(); connectToLeader(leader); } } } 总结 集群流控架构核心: ...

2025-11-20 · maneng

集群流控:跨实例的流量管理

引言:当单机限流不够用时 假设你的订单服务部署了3个实例,每个实例配置了QPS限流1000: 实例1:QPS限流 1000 实例2:QPS限流 1000 实例3:QPS限流 1000 问题来了:整个订单服务的总QPS是多少? 答案:3000 QPS(每个实例1000)。 但如果你的需求是"整个订单服务的总QPS不超过2000"呢? 传统的单机限流无法实现!因为每个实例各自统计,无法感知其他实例的流量。 这就是集群流控(Cluster Flow Control)要解决的问题。 单机限流的局限性 问题1:无法精确控制总流量 场景:3个实例,期望总QPS 2000。 单机限流配置: FlowRule rule = new FlowRule(); rule.setResource("orderCreate"); rule.setCount(667); // 2000 / 3 ≈ 667 问题: 流量不均匀时,会导致总QPS超限 某个实例挂了,总QPS会降低 示例: 场景 实例1 QPS 实例2 QPS 实例3 QPS 总QPS 结果 流量均匀 666 666 666 1998 ✅ 正常 流量不均 667 667 1000 2334 ❌ 超限 实例3挂 667 667 0 1334 ⚠️ 浪费 问题2:无法应对突发流量 场景:3个实例,某一秒流量全部打到实例1。 时刻 实例1 实例2 实例3 总QPS 期望 T0 667 ❌ 0 0 667 2000 T1 667 ❌ 0 0 667 2000 实例1已经限流了,但实例2和实例3还有大量闲置容量。 ...

2025-11-20 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...