规则管理架构
┌─────────────────────────────────────┐
│ Dashboard / 配置中心 │
└────────────────┬────────────────────┘
│ Push/Pull
↓
┌─────────────────────────────────────┐
│ RuleManager │
│ ├─ FlowRuleManager │
│ ├─ DegradeRuleManager │
│ ├─ SystemRuleManager │
│ ├─ AuthorityRuleManager │
│ └─ ParamFlowRuleManager │
└────────────────┬────────────────────┘
│ Property
↓
┌─────────────────────────────────────┐
│ PropertyListener │
│ (规则变更监听) │
└────────────────┬────────────────────┘
│ Notify
↓
┌─────────────────────────────────────┐
│ Slot Chain │
│ (应用规则) │
└─────────────────────────────────────┘
RuleManager核心实现
FlowRuleManager
public class FlowRuleManager {
// 规则存储:资源名 → 规则列表
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<>();
// 规则监听器
private static final RulePropertyListener LISTENER = new FlowPropertyListener();
// 数据源属性
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<>();
static {
currentProperty.addListener(LISTENER);
}
// 加载规则
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
// 注册数据源
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
AssertUtil.notNull(property, "property cannot be null");
synchronized (LISTENER) {
currentProperty.removeListener(LISTENER);
property.addListener(LISTENER);
currentProperty = property;
}
}
// 获取规则
public static List<FlowRule> getRules() {
List<FlowRule> rules = new ArrayList<>();
for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}
// 获取资源的规则
public static List<FlowRule> getRules(String resource) {
return flowRules.get(resource);
}
}
PropertyListener
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public synchronized void configLoad(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = new ConcurrentHashMap<>();
if (value != null) {
for (FlowRule rule : value) {
// 校验规则
if (!isValidRule(rule)) {
RecordLog.warn("Bad flow rule: " + rule.toString());
continue;
}
// 解析流控器
FlowRuleUtil.generateRater(rule);
// 按资源分组
String resourceName = rule.getResource();
rules.computeIfAbsent(resourceName, k -> new ArrayList<>()).add(rule);
}
}
// 更新规则
flowRules.clear();
flowRules.putAll(rules);
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + rules);
}
@Override
public void configUpdate(List<FlowRule> value) {
configLoad(value);
}
}
数据源抽象
ReadableDataSource
public interface ReadableDataSource<S, T> extends AutoCloseable {
// 加载配置
T loadConfig() throws Exception;
// 获取属性
SentinelProperty<T> getProperty();
}
AbstractDataSource
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {
protected final Converter<S, T> parser;
protected final SentinelProperty<T> property;
public AbstractDataSource(Converter<S, T> parser) {
this.parser = parser;
this.property = new DynamicSentinelProperty<>();
}
@Override
public T loadConfig() throws Exception {
S source = readSource();
return parser.convert(source);
}
// 子类实现:读取数据源
public abstract S readSource() throws Exception;
@Override
public SentinelProperty<T> getProperty() {
return property;
}
}
Pull模式实现
FileDataSource
public class FileRefreshableDataSource<T> extends AbstractDataSource<String, T> {
private final String file;
private long lastModified = 0;
public FileRefreshableDataSource(File file, Converter<String, T> parser) {
super(parser);
this.file = file.getAbsolutePath();
// 启动定时任务
startTimerTask();
}
@Override
public String readSource() throws Exception {
return Files.readAllLines(Paths.get(file))
.stream()
.collect(Collectors.joining("\n"));
}
private void startTimerTask() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(() -> {
try {
File f = new File(file);
long curModified = f.lastModified();
if (curModified > lastModified) {
lastModified = curModified;
T conf = loadConfig();
getProperty().updateValue(conf);
}
} catch (Exception e) {
RecordLog.warn("Error when loading file", e);
}
}, 0, 3, TimeUnit.SECONDS);
}
}
Push模式实现
NacosDataSource
public class NacosDataSource<T> extends AbstractDataSource<String, T> {
private final ConfigService configService;
private final String dataId;
private final String groupId;
public NacosDataSource(String serverAddr, String groupId, String dataId, Converter<String, T> parser) throws Exception {
super(parser);
this.groupId = groupId;
this.dataId = dataId;
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
this.configService = ConfigFactory.createConfigService(properties);
initNacosListener();
loadInitialConfig();
}
@Override
public String readSource() throws Exception {
return configService.getConfig(dataId, groupId, 3000);
}
private void initNacosListener() {
try {
configService.addListener(dataId, groupId, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
RecordLog.info("[NacosDataSource] New property value received: " + configInfo);
T newValue = NacosDataSource.this.parser.convert(configInfo);
getProperty().updateValue(newValue);
}
});
} catch (Exception e) {
throw new IllegalStateException("Error occurred when initializing Nacos listener", e);
}
}
private void loadInitialConfig() {
try {
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception e) {
RecordLog.warn("[NacosDataSource] Error when loading initial config", e);
}
}
}
规则校验
public class RuleValidator {
public static boolean isValidFlowRule(FlowRule rule) {
if (rule == null) {
return false;
}
// 校验资源名
if (StringUtil.isBlank(rule.getResource())) {
return false;
}
// 校验阈值
if (rule.getCount() < 0) {
return false;
}
// 校验限流模式
if (rule.getGrade() != RuleConstant.FLOW_GRADE_QPS &&
rule.getGrade() != RuleConstant.FLOW_GRADE_THREAD) {
return false;
}
// 校验流控策略
if (rule.getStrategy() != RuleConstant.STRATEGY_DIRECT &&
rule.getStrategy() != RuleConstant.STRATEGY_RELATE &&
rule.getStrategy() != RuleConstant.STRATEGY_CHAIN) {
return false;
}
// 校验流控效果
if (rule.getControlBehavior() < 0 || rule.getControlBehavior() > 2) {
return false;
}
return true;
}
}
规则转换器
public class FlowRuleJsonConverter implements Converter<String, List<FlowRule>> {
@Override
public List<FlowRule> convert(String source) {
if (StringUtil.isEmpty(source)) {
return new ArrayList<>();
}
try {
return JSON.parseArray(source, FlowRule.class);
} catch (Exception e) {
RecordLog.warn("Error when parsing flow rules", e);
return new ArrayList<>();
}
}
}
Dashboard推送
RulePublisher
public interface DynamicRulePublisher<T> {
void publish(String app, T rules) throws Exception;
}
@Component
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Override
public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
String dataId = app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX;
configService.publishConfig(
dataId,
NacosConfigUtil.GROUP_ID,
JSON.toJSONString(rules),
ConfigType.JSON.getType()
);
}
}
RuleProvider
public interface DynamicRuleProvider<T> {
T getRules(String appName) throws Exception;
}
@Component
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Override
public List<FlowRuleEntity> getRules(String appName) throws Exception {
String dataId = appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX;
String rules = configService.getConfig(
dataId,
NacosConfigUtil.GROUP_ID,
3000
);
if (StringUtil.isEmpty(rules)) {
return new ArrayList<>();
}
return JSON.parseArray(rules, FlowRuleEntity.class);
}
}
总结
规则管理核心机制:
- RuleManager:管理规则,按资源分组存储
- PropertyListener:监听规则变更,实时生效
- DataSource:抽象数据源,支持Pull和Push
- Converter:规则转换,支持JSON、XML等格式
- Validator:规则校验,确保规则合法性
推送模式选择:
- Pull模式:定时拉取,实时性差(3秒)
- Push模式:配置中心推送,实时性高(秒级)
最佳实践:
- 生产环境使用Push模式 + Nacos
- Dashboard改造支持持久化
- 规则变更监控和告警
- 定期备份规则配置