文章目录
sentinel持久化push推模式
pull拉模式的缺点,以保存本地文件举例:
- 定时任务是每隔3s执行一次,去判断规则持久化文件的最后修改时间。这里有一定时间的延迟,但如果时间设置的太短,有影响服务器的性能
- 我们的微服务是集群部署的,其他服务实例可读取不到我这台服务器的本地文件
所以还有一种push推送模式。我们一般会引入第三方中间件来实现,以Nacos为例。我们修改了nacos中的配置,它就会将更新后的数据推送给微服务。
push模式有两种实现方式:
在微服务端添加读数据源,为dataId添加监听器,当规则配置文件更改之后我就获取到更改后的规则内存并更新内存中的数据;再添加一个写数据源,每当dashboard中更新了规则,我除了更新内存中的数据之外,我通过
ConfigService.publishConfig()
方法还往Nacos端进行写入在dashboard源码中进行更改,在获取规则内容、更新规则内容的接口中,不要和微服务端进行交互,直接去和Nacos通信,通过
ConfigService.publishConfig()
和ConfigService.getConfig()
来实现。这种方式主要注意dashboard端的规则实体对象和微服务端的规则实体对象不一致问题,需要经过转换相关的操作。sentinel默认情况下就直接把规则实体转换为json字符串推送给Nacos,Nacos配置文件更改了,又推送给微服务,微服务这边再把json字符串转换为规则实体对象这一步就会发现,转换失败了,某些属性对应不上。进而就导致了dashboard端设置的规则在微服务这边未生效。
微服务端的实现
具体实现
引入读数据源的依赖
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency>
配置文件中添加规则持久化的dataId
server: port: 8806 spring: application: name: mall-user-sentinel-rule-push #微服务名称 #配置nacos注册中心地址 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: # 添加sentinel的控制台地址 dashboard: 127.0.0.1:8080 datasource: # 名称自定义,可以随便定义字符串 flow-rules: nacos: server-addr: 127.0.0.1:8848 # dataId取了微服务名字,后面再拼接字符串 dataId: ${spring.application.name}-flow-rules # 我这里在Nacos配置中心,单独使用了一个组 groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: flow degrade-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: authority system-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-system-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: system
在Nacos配置中心中创建对应的配置文件
编写java类,定义写数据源
import com.alibaba.cloud.sentinel.SentinelProperties; import com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 添加往Nacos的写数据源,只不过未使用InitFunc * 如果要使用就需要放开注解 */ @Configuration(proxyBeanMethods = false) @AutoConfigureAfter(SentinelAutoConfiguration.class) public class SentinelNacosDataSourceConfiguration { @Bean @ConditionalOnMissingBean public SentinelNacosDataSourceHandler sentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) { return new SentinelNacosDataSourceHandler(sentinelProperties); } }
import com.alibaba.cloud.sentinel.SentinelProperties; import com.alibaba.cloud.sentinel.datasource.RuleType; import com.alibaba.cloud.sentinel.datasource.config.DataSourcePropertiesConfiguration; import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties; import com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; import com.alibaba.csp.sentinel.slots.system.SystemRule; import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.SmartInitializingSingleton; import java.util.List; /** * sentinel 规则持久化到 nacos配置中心 */ public class SentinelNacosDataSourceHandler implements SmartInitializingSingleton { private final SentinelProperties sentinelProperties; public SentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) { this.sentinelProperties = sentinelProperties; } @Override public void afterSingletonsInstantiated() { // 遍历我们配置文件中指定的多个spring.cloud.sentinel.datasource的多个配置 sentinelProperties.getDatasource().values().forEach(this::registryWriter); } private void registryWriter(DataSourcePropertiesConfiguration dataSourceProperties) { // 只获取application.yml文件中 nacos配置的数据源 final NacosDataSourceProperties nacosDataSourceProperties = dataSourceProperties.getNacos(); if (nacosDataSourceProperties == null) { return; } // 获取规则类型,然后根据各个类型创建相应的写数据源 final RuleType ruleType = nacosDataSourceProperties.getRuleType(); switch (ruleType) { case FLOW: WritableDataSource<List<FlowRule>> flowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter); break; case DEGRADE: WritableDataSource<List<DegradeRule>> degradeRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter); break; case PARAM_FLOW: WritableDataSource<List<ParamFlowRule>> paramFlowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString); ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter); break; case SYSTEM: WritableDataSource<List<SystemRule>> systemRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter); break; case AUTHORITY: WritableDataSource<List<AuthorityRule>> authRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter); break; default: break; } } }
import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties; import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import java.util.Properties; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 将sentinel规则写入到nacos配置中心 * @param <T> */ @Slf4j public class NacosWritableDataSource<T> implements WritableDataSource<T> { private final Converter<T, String> configEncoder; private final NacosDataSourceProperties nacosDataSourceProperties; private final Lock lock = new ReentrantLock(true); private ConfigService configService = null; public NacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties, Converter<T, String> configEncoder) { if (configEncoder == null) { throw new IllegalArgumentException("Config encoder cannot be null"); } if (nacosDataSourceProperties == null) { throw new IllegalArgumentException("Config nacosDataSourceProperties cannot be null"); } this.configEncoder = configEncoder; this.nacosDataSourceProperties = nacosDataSourceProperties; final Properties properties = buildProperties(nacosDataSourceProperties); try { // 也可以直接注入NacosDataSource,然后反射获取其configService属性 this.configService = NacosFactory.createConfigService(properties); } catch (NacosException e) { log.error("create configService failed.", e); } } private Properties buildProperties(NacosDataSourceProperties nacosDataSourceProperties) { Properties properties = new Properties(); if (!StringUtils.isEmpty(nacosDataSourceProperties.getServerAddr())) { properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDataSourceProperties.getServerAddr()); } else { properties.setProperty(PropertyKeyConst.ACCESS_KEY, nacosDataSourceProperties.getAccessKey()); properties.setProperty(PropertyKeyConst.SECRET_KEY, nacosDataSourceProperties.getSecretKey()); properties.setProperty(PropertyKeyConst.ENDPOINT, nacosDataSourceProperties.getEndpoint()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getNamespace())) { properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDataSourceProperties.getNamespace()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getUsername())) { properties.setProperty(PropertyKeyConst.USERNAME, nacosDataSourceProperties.getUsername()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getPassword())) { properties.setProperty(PropertyKeyConst.PASSWORD, nacosDataSourceProperties.getPassword()); } return properties; } @Override public void write(T value) throws Exception { lock.lock(); // todo handle cluster concurrent problem try { String convertResult = configEncoder.convert(value); if (configService == null) { log.error("configServer is null, can not continue."); return; } // 规则配置数据推送到nacos配置中心 final boolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), convertResult); if (!published) { log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType()); } } finally { lock.unlock(); } } @Override public void close() throws Exception { } }
启动微服务进行测试。
dashboard中为某个接口定义一个流控规则
调用接口测试,发送三次请求
查看Nacos中的配置文件,就会发现也成功写入了
源码分析
读数据源
引入读数据源的依赖,我们来看看具体是怎么实现的
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency>
实现思路:
- 和文件的读数据源一样,继承了
AbstractDataSource
类,这样就不需要我们再去写一遍加载配置、更新内存中的配置
在源码中的这个扩展包下面,就有nacos读数据源的实现
我们先看看NacosDataSource
类的父类的代码
- 创建一个DynamicSentinelProperty对象,主要作用是更新内存中的规则配置
- 加载配置、解析配置
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> { protected final Converter<S, T> parser; protected final SentinelProperty<T> property; public AbstractDataSource(Converter<S, T> parser) { if (parser == null) { throw new IllegalArgumentException("parser can't be null"); } // 子类传过来的解析器 this.parser = parser; // 更新内存中的配置 // 我们会经常看见 getProperty().updateValue(newValue); 这样的代码 this.property = new DynamicSentinelProperty<T>(); } @Override public T loadConfig() throws Exception { // 调用子类的readSource()方法,一般会得到一个String, // 在通过解析器Converter 并解析配置转换成对应的对象 return loadConfig(readSource()); } public T loadConfig(S conf) throws Exception { // 解析配置 T value = parser.convert(conf); return value; } @Override public SentinelProperty<T> getProperty() { return property; } }
读配置源的具体实现:
- 通过Nacos的serverAddr构建一个Properties对象,该对象会用于初始化ConfigService接口的对象
- 利用线程池中唯一一个线程,创建一个监听器,监听dataId,当配置中心的配置更改后就会调用微服务客户端,微服务客户端这边有一个while+阻塞队列实现的轮询机制,它调用监听器的方法,监听器里面会更新内存中的规则配置
- 初始化configService对象,并通过configService.addListener(…)为指定的dataId添加监听器
- 微服务刚启动会调用父类的loadConfig()方法,父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析;再更新内存中的规则配置
public class NacosDataSource<T> extends AbstractDataSource<String, T> { private static final int DEFAULT_TIMEOUT = 3000; // 创建一个只有一个线程的线程池,用来执行dataId的监听器 private final ExecutorService pool = new ThreadPoolExecutor(...); private final Listener configListener; private final String groupId; private final String dataId; private final Properties properties; private ConfigService configService = null; public NacosDataSource(final String serverAddr, final String groupId, final String dataId,Converter<String, T> parser) { this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser); } public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter<String, T> parser) { super(parser); if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { throw new IllegalArgumentException(...); } AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst"); this.groupId = groupId; this.dataId = dataId; this.properties = properties; // 创建一个监听器 this.configListener = new Listener() { @Override public Executor getExecutor() { return pool; } @Override public void receiveConfigInfo(final String configInfo) { RecordLog.info(...); // 通过转换器进行转换 T newValue = NacosDataSource.this.parser.convert(configInfo); // 调用父类的SentinelProperty对象,更新内存中的规则配置 getProperty().updateValue(newValue); } }; // 初始化configService对象,并通过configService.addListener(..)为指定的dataId添加监听器 initNacosListener(); // 微服务刚启动,会从Nacos配置中心加载一次配置 loadInitialConfig(); } private void loadInitialConfig() { try { // 调用父类的loadConfig() 父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析 T newValue = loadConfig(); if (newValue == null) { RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); } // 调用父类的SentinelProperty对象,更新内存中的规则配置 getProperty().updateValue(newValue); } catch (Exception ex) { RecordLog.warn("[NacosDataSource] Error when loading initial config", ex); } } private void initNacosListener() { try { // 初始化configService对象 this.configService = NacosFactory.createConfigService(this.properties); // Add config listener. // 通过configService.addListener(..)为指定的dataId添加监听器 configService.addListener(dataId, groupId, configListener); } catch (Exception e) { RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e); e.printStackTrace(); } } @Override public String readSource() throws Exception { if (configService == null) { throw new IllegalStateException("Nacos config service has not been initialized or error occurred"); } // 通过ConfigService接口中的getConfig()方法,从Nacos配置中心获取配置 return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT); } @Override public void close() { if (configService != null) { configService.removeListener(dataId, groupId, configListener); } pool.shutdownNow(); } private static Properties buildProperties(String serverAddr) { // 构建一个Properties对象,该对象会在初始化ConfigService时会用上 Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr); return properties; } }
写数据源的实现
写数据源源码实现流程相对简单。我们知道dashboard更新配置后调用微服务端,微服务这边的ModifyRulesCommandHandler
类会处理规则更改的请求。这里会有一个写数据源相关的操作
// 注意name = "setRules",这就是控制台请求服务端的url路径 @CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}") public class ModifyRulesCommandHandler implements CommandHandler<String> { public CommandResponse<String> handle(CommandRequest request) { //...... // 处理流控规则 if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) { List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class); FlowRuleManager.loadRules(flowRules); // 关键一步,这里会有一个写数据源的操作。默认情况下是没有WritableDataSource,我们可以在这里进行扩展 if (!writeToDataSource(getFlowDataSource(), flowRules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); // 处理权限规则 } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) { ... // 处理熔断规则 } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) { ... // 处理系统规则 } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) { ... } return CommandResponse.ofFailure(new IllegalArgumentException("invalid type")); } }
所以我们要做的事情就是创建一个写数据源,并进行注册写数据源WritableDataSourceRegistry
。我们先来看看源码中的Demo,通过读写文件的方式实现的读写数据源。
public void init() throws Exception { // 文件保存路径 String flowRuleDir = System.getProperty("user.home") + File.separator + "sentinel" + File.separator + "rules"; String flowRuleFile = "flowRule.json"; String flowRulePath = flowRuleDir + File.separator + flowRuleFile; // 添加读数据源 ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>( flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}) ); FlowRuleManager.register2Property(ds.getProperty()); // 添加写数据源 WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson); WritableDataSourceRegistry.registerFlowDataSource(wds); }
我在定义一个往Nacos的写数据源,一个简单的实现,具体项目中能用的请参考上面 <具体实现>一章 。这里只是用更少的代码来理解nacos的写数据源
import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.csp.sentinel.init.InitFunc; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry; import com.alibaba.fastjson.JSON; import java.util.List; public class NacosDataSourceInitFunc implements InitFunc { @Override public void init() throws Exception { //流控规则 WritableDataSource<List<FlowRule>> writableDataSource = new NacosWritableDataSource<>( "127.0.0.1:8848", "DEFAULT_GROUP", "mall-user-sentinel-rule-push-demo-flow", JSON::toJSONString); WritableDataSourceRegistry.registerFlowDataSource(writableDataSource); } }
import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.ConfigType; import com.alibaba.nacos.api.exception.NacosException; import java.util.Properties; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class NacosWritableDataSource<T> implements WritableDataSource<T> { private final String serverAddr; private final String groupId; private final String dataId; private final Properties properties; private ConfigService configService; private final Converter<T, String> configEncoder; private final Lock lock = new ReentrantLock(true); public NacosWritableDataSource(String serverAddr, String groupId, String dataId, Converter<T, String> configEncoder) { this.serverAddr = serverAddr; this.groupId = groupId; this.dataId = dataId; // 通过serverAddr构建一个properties对象 this.properties = NacosWritableDataSource.buildProperties(serverAddr); this.configEncoder = configEncoder; initConfigService(); } private void initConfigService() { try { // 通过properties对象初始化ConfigService this.configService = NacosFactory.createConfigService(properties); } catch (NacosException e) { e.printStackTrace(); } } private static Properties buildProperties(String serverAddr) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr); return properties; } @Override public void write(T t) throws Exception { lock.lock(); try { // 通过ConfigService往Nacos配置中心写入数据 configService.publishConfig(dataId, groupId, this.configEncoder.convert(t), ConfigType.JSON.getType()); } finally { lock.unlock(); } } @Override public void close() throws Exception { } }
微服务端解析读数据源流程
我们引入了下面的依赖
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency>
并在配置文件中指定了多个读数据源。这些数据源是如何创建的嘞?
server: port: 8806 spring: application: name: mall-user-sentinel-rule-push #微服务名称 #配置nacos注册中心地址 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: # 添加sentinel的控制台地址 dashboard: 127.0.0.1:8080 datasource: # 名称自定义,可以随便定义字符串 # 每一个都是一个读数据源 flow-rules: nacos: server-addr: 127.0.0.1:8848 # dataId取了微服务名字,后面再拼接字符串 dataId: ${spring.application.name}-flow-rules # 我这里在Nacos配置中心,单独使用了一个组 groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: flow # 读数据源 degrade-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: authority system-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-system-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: system
源码的入口是SentinelDataSourceHandler
类,它实现了SmartInitializingSingleton
接口,这是Spring中的接口,所有非懒加载单例bean创建完成之后会调用这个接口的实现类:
- 在构造函数中依赖注入SentinelProperties对象,该对象中保存了我们配置文件中所有读数据源的配置
- 遍历SentinelProperties对象中的读数据源,并为每一个读数据源生成一个beanName
- 为每一个读数据源对象 + beanName 创建一个BeanDefinition
- 将BeanDefinition添加进BeanFactory中
- BeanFactory.getBean(beanName) 创建读数据源对象。该对象其实是FactoryBean类型的
- 上方的getBean()方法最终会调用至NacosDataSourceFactoryBean.getObject()方法,在这里创建NacosDataSource对象。该对象就是上方引入maven依赖中的读数据源对象。
public class SentinelDataSourceHandler implements SmartInitializingSingleton { //...... // SentinelProperties中保存着Map<String, DataSourcePropertiesConfiguration> datasource // 也就是我们上方yml文件中定义的多个数据源,我们自定义的名字就是String private final SentinelProperties sentinelProperties; // 构造方法中进行依赖注入 sentinelProperties对象 public SentinelDataSourceHandler(DefaultListableBeanFactory beanFactory,SentinelProperties sentinelProperties,...) { //... this.sentinelProperties = sentinelProperties; } // 遍历Map<String, DataSourcePropertiesConfiguration>集合,最终取出我们的每一个配置的数据源 @Override public void afterSingletonsInstantiated() { sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> { try { List<String> validFields = dataSourceProperties.getValidField(); // ... // AbstractDataSourceProperties就是我们在配置文件中具体的每一个配置对象的公共父类 AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties .getValidDataSourceProperties(); abstractDataSourceProperties.setEnv(env); abstractDataSourceProperties.preCheck(dataSourceName); // 把我们配置的每一个数据源,还有这里字符串凭借的一个beanName。调用下面的registerBean()方法 // beanName为 flow-rules + "-sentinel-" + nacos + "-datasource" // flow-rules是我们在yml文件中自定义的名字,nacos就是下面的validFields.get(0)值 registerBean(abstractDataSourceProperties, dataSourceName+ "-sentinel-" + validFields.get(0) + "-datasource"); } catch (Exception e) { log.error(...); } }); } private void registerBean(final AbstractDataSourceProperties dataSourceProperties,String dataSourceName) { // 对我们的数据源生成一个BeanDefinition BeanDefinitionBuilder builder = parseBeanDefinition(dataSourceProperties,dataSourceName); // 将BeanDefinition添加进BeanFactory中 this.beanFactory.registerBeanDefinition(dataSourceName,builder.getBeanDefinition()); // 通过beanFactory.getBean(dataSourceName)方法,创建bean对象 // 我们配置文件中定义的每一个读数据源就变为了一个一个的bean // 注意,我们的读数据源它是一个FactoryBean,这里的getBean()方法最终会去到NacosDataSourceFactoryBean.getObject() AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory.getBean(dataSourceName); // 将读数据源添加进对应的规则管理器中 dataSourceProperties.postRegister(newDataSource); }
public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource> { //...... @Override public NacosDataSource getObject() throws Exception { // 为properties对象赋值 Properties properties = new Properties(); if (!StringUtils.isEmpty(this.serverAddr)) { properties.setProperty(PropertyKeyConst.SERVER_ADDR, this.serverAddr); } else { properties.setProperty(PropertyKeyConst.ENDPOINT, this.endpoint); } if (!StringUtils.isEmpty(this.contextPath)) { properties.setProperty(PropertyKeyConst.CONTEXT_PATH, this.contextPath); } if (!StringUtils.isEmpty(this.accessKey)) { properties.setProperty(PropertyKeyConst.ACCESS_KEY, this.accessKey); } if (!StringUtils.isEmpty(this.secretKey)) { properties.setProperty(PropertyKeyConst.SECRET_KEY, this.secretKey); } if (!StringUtils.isEmpty(this.namespace)) { properties.setProperty(PropertyKeyConst.NAMESPACE, this.namespace); } if (!StringUtils.isEmpty(this.username)) { properties.setProperty(PropertyKeyConst.USERNAME, this.username); } if (!StringUtils.isEmpty(this.password)) { properties.setProperty(PropertyKeyConst.PASSWORD, this.password); } // 创建一个Nacos读数据源对象,这里也就是上方:<源码分析> —— <读数据源> 的那一个对象 return new NacosDataSource(properties, groupId, dataId, converter); } // ...... }
修改源码的实现
我们需要在Sentinel源码中进行修改,将dashboard和微服务之间的通信,改为dashboard和nacos的通信。在通过Nacos配置中心的推送机制去更新微服务内存中的规则配置。
从 Sentinel 1.4.0 开始,Sentinel 控制台提供 DynamicRulePublisher
和 DynamicRuleProvider
接口用于实现应用维度的规则推送和拉取:
DynamicRuleProvider: 拉取规则
DynamicRulePublisher: 推送规则
在dashboard工程下的com.alibaba.csp.sentinel.dashboard.rule包下创建nacos包,然后把各种场景的配置规则拉取和推送的实现类写到此包下
可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑
官方demo
我们看看官方的demo是如何实现的
首先创建一个NacosConfigUtil
类,用来定义常量
public final class NacosConfigUtil { // 其实demo中也就用到了上面两个常量 // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置 public static final String GROUP_ID = "SENTINEL_GROUP"; // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样 // 避免你写一个dataId,微服务从另一个dataId去读 public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules"; public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules"; public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map"; public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config"; public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config"; public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config"; public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set"; private NacosConfigUtil() {} }
创建一个NacosConfig
配置类,这里就定义了流控规则相关的转换器
@Configuration public class NacosConfig { // 流控规则相关 定义 List<FlowRuleEntity> 到 String的转换器 @Bean public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() { return JSON::toJSONString; } // 流控规则相关 定义 String 到 List<FlowRuleEntity>的转换器 @Bean public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() { return s -> JSON.parseArray(s, FlowRuleEntity.class); } // 根据一个Nacos的serverAddr,创建ConfigService对象。推送配置/拉取配置都是通过该对象来完成的 @Bean public ConfigService nacosConfigService() throws Exception { return ConfigFactory.createConfigService("localhost"); } }
接下来我们来看看dashboard推送规则配置的实现代码,它实现了DynamicRulePublisher
接口
@Component("flowRuleNacosPublisher") public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> { // 注入上面配置类的中定义的ConfigService和Converter转换器 @Autowired private ConfigService configService; @Autowired private Converter<List<FlowRuleEntity>, String> converter; @Override public void publish(String app, List<FlowRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } // 调用Nacos的configService.publishConfig(..)方法 推送配置 // dataId为 appName + 最上方的常量文件后缀-flow-rules , 分组为最上方定义的常量SENTINEL_GROUP , 并对规则配置集合转换为json字符串 configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, converter.convert(rules)); } }
接下来我们来看看dashboard拉取规则配置的实现代码,它实现了DynamicRuleProvider
接口
@Component("flowRuleNacosProvider") public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> { // 注入上面配置类的中定义的ConfigService和Converter转换器 @Autowired private ConfigService configService; @Autowired private Converter<String, List<FlowRuleEntity>> converter; @Override public List<FlowRuleEntity> getRules(String appName) throws Exception { // 调用Nacos的configService.getConfig(dataId, group, timeoutMs)方法 拉取配置 String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, 3000); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } // 将json字符串转换为 List<FlowRuleEntity> 规则实体对象集合 return converter.convert(rules); } }
官方Demo这种方式功能上的确是实现了与Nacos通信,对Nacos配置中心进行读写。但存在一个小问题。那就是dashboard这边规则实体对象是FlowRuleEntity
,但是微服务端规则实体对象是FlowRule
。Nacos把配置推送给微服务端时,微服务端把json字符串转换为实体对象时可能就会出现不匹配的情况 —> 微服务规则实体对象没有相应的值 ----> 内存中的规则也就不完善 ----> 出现了dashboard端更新的规则微服务端未生效情况。
当然,流控规则都还好,如下图所示,这两个之间的实体对象成员属性基本上都能对应上
但热点规则这边的实体就不行了,他们之间的层级关系就不同了
public class ParamFlowRuleEntity extends AbstractRuleEntity<ParamFlowRule> { public ParamFlowRuleEntity() { } // ParamFlowRule为客户端的规则实体,但是这里将一整个实体对象变为了ParamFlowRuleEntity的其中一个属性 // 所以这里转json之后的层级关系就发生了改变 public ParamFlowRuleEntity(ParamFlowRule rule) { AssertUtil.notNull(rule, "Authority rule should not be null"); // 父类中的属性 this.rule = rule; } ... } // 父类 public abstract class AbstractRuleEntity<T extends AbstractRule> implements RuleEntity { protected Long id; protected String app; protected String ip; protected Integer port; // ParamFlowRule为客户端的规则实体,成为了ParamFlowRuleEntity实体的一个成员属性 protected T rule; private Date gmtCreate; private Date gmtModified; ... }
为了解决这种情况,那么就需要定义一个规范,存入Nacos配置中心的数据只能是微服务那边的规则实体对象,不能是dashboard这边的规则实体对象
修改源码实现
naocs配置中心保存的是微服务端的规则实体对象
各个规则都先在dashboard端将规则实体转换为微服务能用的规则实体在推送至Nacos配置中心
从Nacos配置中心获取配置后,都先将json字符串转换为dashboard端的规则实体对象
项目结构如下
配置类
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.RuleEntity; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.fastjson.JSON; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public final class NacosConfigUtil { // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置 public static final String GROUP_ID = "SENTINEL_GROUP"; // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样 // 避免你写一个dataId,微服务从另一个dataId去读 public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules"; public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-flow-rules"; public static final String DEGRADE_DATA_ID_POSTFIX = "-degrade-rules"; public static final String SYSTEM_DATA_ID_POSTFIX = "-system-rules"; public static final String AUTHORITY_DATA_ID_POSTFIX = "-authority-rules"; public static final String GATEWAY_FLOW_DATA_ID_POSTFIX = "-gateway-flow-rules"; public static final String GATEWAY_API_DATA_ID_POSTFIX = "-gateway-api-rules"; public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map"; /** * cc for `cluster-client` */ public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config"; /** * cs for `cluster-server` */ public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config"; public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config"; public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set"; //超时时间 public static final int READ_TIMEOUT = 3000; private NacosConfigUtil() {} /** * RuleEntity----->Rule * 控制台这边的规则实体都是RuleEntity类型的,这里就调用各个规则实体对象中的toRule()方法,转换为微服务端的规则实体对象 * 例如 FlowRuleEntity#toRule ----> FlowRule ParamFlowRuleEntity#toRule ----> ParamFlowRule * @param entities * @return */ public static String convertToRule(List<? extends RuleEntity> entities){ return JSON.toJSONString( entities.stream().map(r -> r.toRule()) .collect(Collectors.toList())); } /** * ApiDefinitionEntity----->ApiDefinition * @param entities * @return */ public static String convertToApiDefinition(List<? extends ApiDefinitionEntity> entities){ return JSON.toJSONString( entities.stream().map(r -> r.toApiDefinition()) .collect(Collectors.toList())); } /** * GatewayFlowRuleEntity----->GatewayFlowRule * @param entities * @return */ public static String convertToGatewayFlowRule(List<? extends GatewayFlowRuleEntity> entities){ return JSON.toJSONString( entities.stream().map(r -> r.toGatewayFlowRule()) .collect(Collectors.toList())); } }
通过Nacos配置中心的地址,创建对应的ConfigService对象,并存入Spring容器中
@Configuration public class NacosConfig { @Value("${sentinel.nacos.config.serverAddr}") private String serverAddr="localhost:8848"; @Bean public ConfigService nacosConfigService() throws Exception { return ConfigFactory.createConfigService(serverAddr); } /* 对于Nacos开启了认证,那么就需要添加Naocs的用户名和密码了 @Bean public ConfigService nacosConfigService() throws Exception { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); properties.put(PropertyKeyConst.USERNAME, "nacos"); properties.put(PropertyKeyConst.PASSWORD, "nacos"); return ConfigFactory.createConfigService(properties); }*/ }
flow
拉取配置,实现DynamicRuleProvider
接口
@Component("flowRuleNacosProvider") public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> { // 注入我们上面创建的ConfigService对象 @Autowired private ConfigService configService; @Override public List<FlowRuleEntity> getRules(String appName,String ip,Integer port) throws NacosException { // 从Nacos配置中心拉取配置 String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } // 解析json获取到 List<FlowRule> List<FlowRule> list = JSON.parseArray(rules, FlowRule.class); // 通过FlowRuleEntity.fromFlowRule(..) 方法实现 FlowRule------->FlowRuleEntity return list.stream().map(rule -> FlowRuleEntity.fromFlowRule(appName,ip,port,rule)) .collect(Collectors.toList()); } } /* FlowRuleEntity.fromFlowRule(..) 方法如下所示,Sentinel的dashboard端的规则实体对象内其实都自己写了对应的fromFlowRule()方法 public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) { FlowRuleEntity entity = new FlowRuleEntity(); entity.setApp(app); entity.setIp(ip); entity.setPort(port); entity.setLimitApp(rule.getLimitApp()); entity.setResource(rule.getResource()); entity.setGrade(rule.getGrade()); entity.setCount(rule.getCount()); entity.setStrategy(rule.getStrategy()); entity.setRefResource(rule.getRefResource()); entity.setControlBehavior(rule.getControlBehavior()); entity.setWarmUpPeriodSec(rule.getWarmUpPeriodSec()); entity.setMaxQueueingTimeMs(rule.getMaxQueueingTimeMs()); entity.setClusterMode(rule.isClusterMode()); entity.setClusterConfig(rule.getClusterConfig()); return entity; } */
推送配置,实现DynamicRulePublisher
接口
@Component("flowRuleNacosPublisher") public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> { // 注入我们上面创建的ConfigService对象 @Autowired private ConfigService configService; @Override public void publish(String app, List<FlowRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } //发布配置到Nacos配置中心,这里会调用我们工具类中编写的方法NacosConfigUtil.convertToRule(rules) configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules)); } }
authority
@Component("authorityRuleNacosProvider") public class AuthorityRuleNacosProvider implements DynamicRuleProvider<List<AuthorityRuleEntity>> { @Autowired private ConfigService configService; @Override public List<AuthorityRuleEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } List<AuthorityRule> list = JSON.parseArray(rules, AuthorityRule.class); return list.stream().map(rule -> AuthorityRuleEntity.fromAuthorityRule(appName, ip, port, rule)) .collect(Collectors.toList()); } }
@Component("authorityRuleNacosPublisher") public class AuthorityRuleNacosPublisher implements DynamicRulePublisher<List<AuthorityRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<AuthorityRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules)); } }
degread
@Component("degradeRuleNacosProvider") public class DegradeRuleNacosProvider implements DynamicRuleProvider<List<DegradeRuleEntity>> { @Autowired private ConfigService configService; @Override public List<DegradeRuleEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } List<DegradeRule> list = JSON.parseArray(rules, DegradeRule.class); return list.stream().map(rule -> DegradeRuleEntity.fromDegradeRule(appName, ip, port, rule)) .collect(Collectors.toList()); } }
@Component("degradeRuleNacosPublisher") public class DegradeRuleNacosPublisher implements DynamicRulePublisher<List<DegradeRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<DegradeRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules)); } }
param
@Component("paramFlowRuleNacosProvider") public class ParamFlowRuleNacosProvider implements DynamicRuleProvider<List<ParamFlowRuleEntity>> { @Autowired private ConfigService configService; @Override public List<ParamFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } List<ParamFlowRule> list = JSON.parseArray(rules, ParamFlowRule.class); return list.stream().map(rule -> ParamFlowRuleEntity.fromParamFlowRule(appName, ip, port, rule)) .collect(Collectors.toList()); } }
@Component("paramFlowRuleNacosPublisher") public class ParamFlowRuleNacosPublisher implements DynamicRulePublisher<List<ParamFlowRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<ParamFlowRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules)); } }
system
@Component("systemRuleNacosProvider") public class SystemRuleNacosProvider implements DynamicRuleProvider<List<SystemRuleEntity>> { @Autowired private ConfigService configService; @Override public List<SystemRuleEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } List<SystemRule> list = JSON.parseArray(rules, SystemRule.class); return list.stream().map(rule -> SystemRuleEntity.fromSystemRule(appName, ip, port, rule)) .collect(Collectors.toList()); } }
@Component("systemRuleNacosPublisher") public class SystemRuleNacosPublisher implements DynamicRulePublisher<List<SystemRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<SystemRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules)); } }
gateway
public class ApiDefinition2 { private String apiName; private Set<ApiPathPredicateItem> predicateItems; public ApiDefinition2() { } public String getApiName() { return apiName; } public void setApiName(String apiName) { this.apiName = apiName; } public Set<ApiPathPredicateItem> getPredicateItems() { return predicateItems; } public void setPredicateItems(Set<ApiPathPredicateItem> predicateItems) { this.predicateItems = predicateItems; } @Override public String toString() { return "ApiDefinition2{" + "apiName='" + apiName + '\'' + ", predicateItems=" + predicateItems + '}'; } public ApiDefinition toApiDefinition() { ApiDefinition apiDefinition = new ApiDefinition(); apiDefinition.setApiName(apiName); Set<ApiPredicateItem> apiPredicateItems = new LinkedHashSet<>(); apiDefinition.setPredicateItems(apiPredicateItems); if (predicateItems != null) { for (ApiPathPredicateItem predicateItem : predicateItems) { apiPredicateItems.add(predicateItem); } } return apiDefinition; } }
@Component("gatewayApiRuleNacosProvider") public class GatewayApiRuleNacosProvider implements DynamicRuleProvider<List<ApiDefinitionEntity>> { @Autowired private ConfigService configService; @Override public List<ApiDefinitionEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } // 注意 ApiDefinition的属性Set<ApiPredicateItem> predicateItems中元素 是接口类型,JSON解析丢失数据 // 重写实体类ApiDefinition2,再转换为ApiDefinition List<ApiDefinition2> list = JSON.parseArray(rules, ApiDefinition2.class); return list.stream().map(rule -> ApiDefinitionEntity.fromApiDefinition(appName, ip, port, rule.toApiDefinition())) .collect(Collectors.toList()); } public static void main(String[] args) { String rules = "[{\"apiName\":\"/pms/productInfo/${id}\",\"predicateItems\":[{\"matchStrategy\":1,\"pattern\":\"/pms/productInfo/\"}]}]"; List<ApiDefinition> list = JSON.parseArray(rules, ApiDefinition.class); System.out.println(list); List<ApiDefinition2> list2 = JSON.parseArray(rules, ApiDefinition2.class); System.out.println(list2); System.out.println(list2.get(0).toApiDefinition()); } }
@Component("gatewayApiRuleNacosPublisher") public class GatewayApiRuleNacosPublisher implements DynamicRulePublisher<List<ApiDefinitionEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<ApiDefinitionEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToApiDefinition(rules)); } }
@Component("gatewayFlowRuleNacosProvider") public class GatewayFlowRuleNacosProvider implements DynamicRuleProvider<List<GatewayFlowRuleEntity>> { @Autowired private ConfigService configService; @Override public List<GatewayFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception { String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT); if (StringUtil.isEmpty(rules)) { return new ArrayList<>(); } List<GatewayFlowRule> list = JSON.parseArray(rules, GatewayFlowRule.class); return list.stream().map(rule -> GatewayFlowRuleEntity.fromGatewayFlowRule(appName, ip, port, rule)) .collect(Collectors.toList()); } }
@Component("gatewayFlowRuleNacosPublisher") public class GatewayFlowRuleNacosPublisher implements DynamicRulePublisher<List<GatewayFlowRuleEntity>> { @Autowired private ConfigService configService; @Override public void publish(String app, List<GatewayFlowRuleEntity> rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } configService.publishConfig(app + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToGatewayFlowRule(rules)); } }
修改源码
我们现在需要在controller层,将原本dashboard从微服务获取规则配置、dashboard更新规则后调用微服务,这一过程改为Nacos。
以流控规则举例,在FlowControllerV1
层中注入我们写的类
/** 从远程配置中心拉取规则*/ @Autowired @Qualifier("flowRuleNacosProvider") private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider; /** 推送规则到远程配置中心*/ @Autowired @Qualifier("flowRuleNacosPublisher") private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;
原本dashboard从微服务获取规则配置改为通过flowRuleNacosProvider
从Nacos拉取配置
@GetMapping("/rules") @AuthAction(PrivilegeType.READ_RULE) public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app, @RequestParam String ip, @RequestParam Integer port) { if (StringUtil.isEmpty(app)) { return Result.ofFail(-1, "app can't be null or empty"); } if (StringUtil.isEmpty(ip)) { return Result.ofFail(-1, "ip can't be null or empty"); } if (port == null) { return Result.ofFail(-1, "port can't be null"); } try { //从客户端内存获取规则配置 //List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port); //从远程配置中心获取规则配置 List<FlowRuleEntity> rules = ruleProvider.getRules(app,ip,port); if (rules != null && !rules.isEmpty()) { for (FlowRuleEntity entity : rules) { entity.setApp(app); if (entity.getClusterConfig() != null && entity.getClusterConfig().getFlowId() != null) { entity.setId(entity.getClusterConfig().getFlowId()); } } } rules = repository.saveAll(rules); return Result.ofSuccess(rules); } catch (Throwable throwable) { logger.error("Error when querying flow rules", throwable); return Result.ofThrowable(-1, throwable); } }
规则更改后推送至Nacos
@PostMapping("/rule") @AuthAction(PrivilegeType.WRITE_RULE) public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) { Result<FlowRuleEntity> checkResult = checkEntityInternal(entity); if (checkResult != null) { return checkResult; } entity.setId(null); Date date = new Date(); entity.setGmtCreate(date); entity.setGmtModified(date); entity.setLimitApp(entity.getLimitApp().trim()); entity.setResource(entity.getResource().trim()); try { // 规则写入dashboard的内存中,会写入三个map中 entity = repository.save(entity); //发布规则到客户端内存中 //publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS); //发布规则到远程配置中心 publishRules(entity.getApp()); return Result.ofSuccess(entity); } catch (Throwable t) { Throwable e = t instanceof ExecutionException ? t.getCause() : t; logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e); return Result.ofFail(-1, e.getMessage()); } } /** * 发布规则到远程配置中心 * @param app * @throws Exception */ private void publishRules(/*@NonNull*/ String app) throws Exception { // 从三个Map中的其中一个获取规则实体集合 List<FlowRuleEntity> rules = repository.findAllByApp(app); // 推送Nacos rulePublisher.publish(app, rules); }
在配置文件中指定NacosConfig的地址,因为在最上方的配置类中使用到了该配置项
#接入nacos配置中心用于规则数据持久化 sentinel.nacos.config.serverAddr=localhost:8848
测试
微服务端引入Nacos的读数据源
还是需要它监听dataId的更改,并更新内存中的规则数据
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency>
配置文件中添加相应的配置
server: port: 8806 spring: application: name: mall-user-sentinel-rule-push #微服务名称 #配置nacos注册中心地址 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: # 添加sentinel的控制台地址 dashboard: 127.0.0.1:8080 datasource: # 名称自定义,可以随便定义字符串 flow-rules: nacos: server-addr: 127.0.0.1:8848 # dataId取了微服务名字,后面再拼接字符串 # 注意需要和配置类中常量定义的一致 dataId: ${spring.application.name}-flow-rules # 这里的组名需要和配置类中常量定义的一致 groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: flow degrade-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: authority system-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-system-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: system
在Sentinel中进行了两个限流规则的配置
Naocs的配置中心也有相应的更改
微服务中也会生效
补充
如果在工作中sentinel的持久化这一块已经被其他项目组的人完成了,但他们是直接把dashboard端的规则实体转json,存入了Nacos配置中心。进而导致了热点参数规则不生效,并且不允许我们修改源码。
当出现了上面这种情况,那我们应该怎么处理嘞?
解决方案:
自定义一个解析热点规则配置的解析器FlowParamJsonConverter,继承JsonConverter,重写convert方法。
利用BeanPostProcessor机制替换beanName为param-flow-rules-sentinel-nacos-datasource
的converter
属性,注入FlowParamJsonConverter
。
@Configuration public class ConverterConfig { @Bean("sentinel-json-param-flow-converter2") @Primary public JsonConverter jsonParamFlowConverter() { return new FlowParamJsonConverter(new ObjectMapper(), ParamFlowRule.class); } } @Component public class FlowParamConverterBeanPostProcessor implements BeanPostProcessor { @Autowired private JsonConverter jsonParamFlowConverter; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (beanName.equals("param-flow-rules-sentinel-nacos-datasource")) { NacosDataSourceFactoryBean nacosDataSourceFactoryBean = (NacosDataSourceFactoryBean) bean; nacosDataSourceFactoryBean.setConverter(jsonParamFlowConverter); return bean; } return bean; } } public class FlowParamJsonConverter extends JsonConverter { Class ruleClass; public FlowParamJsonConverter(ObjectMapper objectMapper, Class ruleClass) { super(objectMapper, ruleClass); this.ruleClass = ruleClass; } @Override public Collection<Object> convert(String source) { List<Object> list = new ArrayList<>(); JSONArray jsonArray = JSON.parseArray(source); for (int i = 0; i < jsonArray.size(); i++) { //解析rule属性 JSONObject jsonObject = (JSONObject) jsonArray.getJSONObject(i).get("rule"); Object object = JSON.toJavaObject(jsonObject, ruleClass); list.add(object); } return list; } }