Slot Chain架构
AbstractLinkedProcessorSlot (抽象类)
↓
ProcessorSlotChain (链)
↓
┌─────────┬─────────┬──────────┬──────────┐
↓ ↓ ↓ ↓ ↓
NodeSelector Cluster Statistic Flow Degrade
Slot Slot Slot Slot Slot
核心Slot详解
1. NodeSelectorSlot(节点选择)
作用:构建调用树,创建Context和DefaultNode。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable {
// 获取或创建DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
map.put(context.getName(), node);
}
}
}
// 设置当前节点
context.setCurNode(node);
// 调用下一个Slot
fireEntry(context, resourceWrapper, node, count, args);
}
}
2. ClusterBuilderSlot(集群构建)
作用:创建ClusterNode,全局统计。
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
clusterNode = new ClusterNode(resourceWrapper.getName());
clusterNodeMap.put(node.getId(), clusterNode);
}
}
}
node.setClusterNode(clusterNode);
// 构建调用树
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, args);
}
}
3. StatisticSlot(统计)
作用:实时统计通过、阻塞、异常、RT等指标。
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 调用下一个Slot
fireEntry(context, resourceWrapper, node, count, args);
// 统计通过请求
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
} catch (BlockException e) {
// 统计阻塞请求
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode) context.getCurNode();
if (context.getCurEntry().getError() == null) {
// 统计成功和RT
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
node.addRtAndSuccess(rt, count);
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
} else {
// 统计异常
node.increaseExceptionQps(count);
}
fireExit(context, resourceWrapper, count, args);
}
}
4. AuthoritySlot(授权)
作用:黑白名单控制。
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(resource.getName(), rule);
}
}
}
}
5. SystemSlot(系统保护)
作用:系统级别保护(Load、CPU、RT)。
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, args);
}
}
public class SystemRuleManager {
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
SystemRule rule = getSystemRule();
if (rule == null) {
return;
}
// 检查QPS
if (rule.getQps() > 0) {
double qps = Constants.ENTRY_NODE.passQps();
if (qps > rule.getQps()) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
}
// 检查线程数
if (rule.getMaxThread() > 0) {
int currentThread = Constants.ENTRY_NODE.curThreadNum();
if (currentThread > rule.getMaxThread()) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
}
// 检查Load
if (rule.getHighestSystemLoad() > 0) {
double currentLoad = getCurrentSystemAvgLoad();
if (currentLoad > rule.getHighestSystemLoad()) {
if (!checkBbr(currentLoad)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
}
}
}
6. FlowSlot(流控)
作用:流量控制。
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, args);
fireEntry(context, resourceWrapper, node, count, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, Object... args) throws BlockException {
List<FlowRule> rules = FlowRuleManager.getRules(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count)) {
throw new FlowException(resource.getName(), rule);
}
}
}
}
}
7. DegradeSlot(熔断)
作用:熔断降级。
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(r.getName(), cb);
}
}
}
}
Slot Chain构建
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 按顺序添加Slot
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
自定义Slot
日志Slot
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
long start = System.currentTimeMillis();
try {
fireEntry(context, resourceWrapper, node, count, args);
} catch (BlockException e) {
log.warn("资源被拒绝:{}, 原因:{}", resourceWrapper.getName(), e.getRuleLimitApp());
throw e;
} finally {
long cost = System.currentTimeMillis() - start;
log.info("资源调用:{}, 耗时:{}ms", resourceWrapper.getName(), cost);
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
限流预警Slot
public class FlowAlertSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final AtomicLong blockCount = new AtomicLong(0);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, args);
} catch (FlowException e) {
long currentBlock = blockCount.incrementAndGet();
// 限流次数超过100次,发送告警
if (currentBlock % 100 == 0) {
sendAlert(resourceWrapper.getName(), currentBlock);
}
throw e;
}
}
private void sendAlert(String resource, long blockCount) {
log.warn("限流告警:资源 {} 已被限流 {} 次", resource, blockCount);
// 发送钉钉、邮件等告警
}
}
Slot执行流程图
Entry
↓
NodeSelectorSlot.entry() ← 构建调用树
↓
ClusterBuilderSlot.entry() ← 创建ClusterNode
↓
StatisticSlot.entry() ← 统计请求
↓
AuthoritySlot.entry() ← 黑白名单检查
↓
SystemSlot.entry() ← 系统保护检查
↓
FlowSlot.entry() ← 流量控制检查
↓
DegradeSlot.entry() ← 熔断降级检查
↓
业务逻辑执行
↓
StatisticSlot.exit() ← 统计成功/失败/RT
↓
Exit
总结
Slot Chain的设计优势:
- 职责单一:每个Slot只负责一种功能
- 易扩展:添加新Slot无需修改现有代码
- 解耦:Slot之间独立,互不影响
- 灵活:可以自定义Slot顺序
核心Slot职责:
- NodeSelectorSlot:构建调用树
- ClusterBuilderSlot:全局统计
- StatisticSlot:实时统计
- AuthoritySlot:黑白名单
- SystemSlot:系统保护
- FlowSlot:流量控制
- DegradeSlot:熔断降级