前言:
前面我们分析了 Nacos 客户端和服务端的服务注册流程,服务注册的最终目的是为了服务间的调用的,服务间的调用,需要知道目标服务有哪些实例,也就是服务发现(订阅),本篇我们来分析一下 Nacos 的服务订阅源码。
Nacos 系列文章传送门:
Nacos 配置管理模型 – 命名空间(Namespace)、配置分组(Group)和配置集ID(Data ID)
服务发现
我们在分析 服务启动何时触发 Nacos 的注册流程时分析到 NacosServiceRegistryAutoConfiguration 源码,NacosServiceRegistryAutoConfiguration 由,Spring Boot 自动配置加载的,最终注入到 NacosAutoServiceRegistration 完成了服务注册,我们查看 spring-cloud-starter-alibaba-nacos-discovery.jar 包路径下的 spring.factories 文件,如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
我们注意更客户端服务发现有关的有两个自动配置类 NacosDiscoveryAutoConfiguration 和 NacosDiscoveryClientConfiguration,源码如下:
package com.alibaba.cloud.nacos.discovery; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration( proxyBeanMethods = false ) @ConditionalOnDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryAutoConfiguration { public NacosDiscoveryAutoConfiguration() { } @Bean @ConditionalOnMissingBean public NacosDiscoveryProperties nacosProperties() { return new NacosDiscoveryProperties(); } //创建 NacosServiceDiscovery 对象 @Bean @ConditionalOnMissingBean public NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) { return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager); } } package com.alibaba.cloud.nacos.discovery; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.CommonsClientAutoConfiguration; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration( proxyBeanMethods = false ) @ConditionalOnDiscoveryEnabled @ConditionalOnBlockingDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled @AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class}) //在 NacosDiscoveryAutoConfiguration 自动装配完成后才执行 @AutoConfigureAfter({NacosDiscoveryAutoConfiguration.class}) public class NacosDiscoveryClientConfiguration { public NacosDiscoveryClientConfiguration() { } //创建 DiscoveryClient 对象 @Bean public DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { return new NacosDiscoveryClient(nacosServiceDiscovery); } @Bean @ConditionalOnMissingBean @ConditionalOnProperty( value = {"spring.cloud.nacos.discovery.watch.enabled"}, matchIfMissing = true ) public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties, ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) { return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties, taskExecutorObjectProvider); } }
NacosDiscoveryAutoConfiguration 类自动配置类中,创建了一个 NacosServiceDiscovery 的对象,NacosDiscoveryClientConfiguration 类自动装配时,会创建一个 DiscoveryClient 对象,需要用到前面创建的 NacosServiceDiscovery 对象,因此 NacosDiscoveryClientConfiguration 类的自动装配必须要在 NacosDiscoveryAutoConfiguration 类之后。
NacosDiscoveryClient 源码分析
NacosDiscoveryClient 实现了 Spring Cloud 的 DiscoveryClient 接口,我们需要重点关注的是 getInstances() 和 getServices()方法,这两个方法都是由 NacosServiceDiscovery 类实现的。
package com.alibaba.cloud.nacos.discovery; import java.util.Collections; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; public class NacosDiscoveryClient implements DiscoveryClient { private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class); public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client"; private NacosServiceDiscovery serviceDiscovery; public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { this.serviceDiscovery = nacosServiceDiscovery; } public String description() { return "Spring Cloud Nacos Discovery Client"; } //获取服务实例列表 public List<ServiceInstance> getInstances(String serviceId) { try { return this.serviceDiscovery.getInstances(serviceId); } catch (Exception var3) { throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3); } } //获取服务列表 public List<String> getServices() { try { return this.serviceDiscovery.getServices(); } catch (Exception var2) { log.error("get service name from nacos server fail,", var2); return Collections.emptyList(); } } }
NacosServiceDiscovery 源码分析
NacosServiceDiscovery 类的代码也非常简单,就提供了 getInstances() 和 getServices() 方法,分别去获取服务实例信息和服务信息,具体请看如下源码分析:
package com.alibaba.cloud.nacos.discovery; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceInstance; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.springframework.cloud.client.ServiceInstance; public class NacosServiceDiscovery { //Nacos 配置 属性注入的时候已经赋值了 private NacosDiscoveryProperties discoveryProperties; public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties) { this.discoveryProperties = discoveryProperties; } //获取服务实例列表 public List<ServiceInstance> getInstances(String serviceId) throws NacosException { //获取组 group String group = this.discoveryProperties.getGroup(); //根据 serviceId 和 group 获取服务实例列表 //通过 NacosFactory 反射创建 NacosNamingService List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true); //封装服务实例数据 return hostToServiceInstanceList(instances, serviceId); } //获取服务列表 public List<String> getServices() throws NacosException { //获取组 group String group = this.discoveryProperties.getGroup(); //获取服务列表 ListView<String> services = this.discoveryProperties.namingServiceInstance().getServicesOfServer(1, 2147483647, group); return services.getData(); } //封装服务实例 public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) { //迭代遍历获取到的服务实例列表 List<ServiceInstance> result = new ArrayList(instances.size()); Iterator var3 = instances.iterator(); while(var3.hasNext()) { //实例对象 Instance instance = (Instance)var3.next(); //封装实例 会对实例状态进行判断 ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId); if (serviceInstance != null) { result.add(serviceInstance); } } return result; } //封装服务实例对象 public static ServiceInstance hostToServiceInstance(Instance instance, String serviceId) { //实例为空判断 //实例是否开启 //实例是否健康 if (instance != null && instance.isEnabled() && instance.isHealthy()) { //创建 Nacos 服务实例对象 NacosServiceInstance nacosServiceInstance = new NacosServiceInstance(); //设置 ip nacosServiceInstance.setHost(instance.getIp()); //设置端口 nacosServiceInstance.setPort(instance.getPort()); //设置服务id nacosServiceInstance.setServiceId(serviceId); //元数据 Map<String, String> metadata = new HashMap(); //实例id metadata.put("nacos.instanceId", instance.getInstanceId()); //实例权重 metadata.put("nacos.weight", instance.getWeight() + ""); //实例健康状态 metadata.put("nacos.healthy", instance.isHealthy() + ""); //集群名称 metadata.put("nacos.cluster", instance.getClusterName() + ""); //实例所有元数据 metadata.putAll(instance.getMetadata()); //设置 Nacos 服务实例对象 元数据 nacosServiceInstance.setMetadata(metadata); //安全 设置 if (metadata.containsKey("secure")) { boolean secure = Boolean.parseBoolean((String)metadata.get("secure")); nacosServiceInstance.setSecure(secure); } return nacosServiceInstance; } else { return null; } } }
NacosNamingService#selectInstances 方法源码分析
NacosNamingService#selectInstances 方法主要通过重载方法 HostReactor#getServiceInfo 方法,获取服务实例列表返回。
//com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean) public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { return this.selectInstances(serviceName, groupName, healthy, true); } //重载方法 //com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean, boolean) public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException { return this.selectInstances(serviceName, groupName, new ArrayList(), healthy, subscribe); } //com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean, boolean) public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { //服务信息 ServiceInfo serviceInfo; //是否订阅服务 subscribe 默认 true if (subscribe) { //订阅服务信息 serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { //直接从 Nacos 拉取服务信息 serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } //从服务信息中获取服务实例列表返回 return this.selectInstances(serviceInfo, healthy); }
HostReactor#getServiceInfo 方法源码分析
HostReactor#getServiceInfo 先从本地 serviceInfoMap 缓存获取服务实例列表,如果本地 serviceInfoMap 中获取不到,则创建一个空的服务信息对象,存入到本地缓存中,然后立刻去更新服务信息(实际是发送请求到 Nacos 服务端,获取服务列表),如果本地缓存有,则开启定时更新功能,并返回缓存结果, 需要注意的是不管是立即更新服务列表,还是定时更新服务列表,最终都会执行 HostReactor 中的updateService()方法。
//com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); //拼接 key 服务名@@集群名 String key = ServiceInfo.getKey(serviceName, clusters); //是否打开故障转移开关 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //读取本地服务列表的缓存 //private final Map<String, ServiceInfo> serviceInfoMap; ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //服务信息为空判断 if (null == serviceObj) { //为空 创建一个 ServiceInfo serviceObj = new ServiceInfo(serviceName, clusters); //存入缓存 serviceInfoMap.put(serviceObj.getKey(), serviceObj); //存入待更新的服务列表 //private final Map<String, Object> updatingMap; updatingMap.put(serviceName, new Object()); //立刻更新服务列表 updateServiceNow(serviceName, clusters); //从待更新的服务列表中移除 updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //待更新列表中有服务对象 //保证线程安全 synchronized(serviceObj) { try { //等待 5 秒 等更新完成 serviceObj.wait(5000L); } catch (InterruptedException var8) { LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8); } } } // 开启定时更新服务列表的功能 会延迟1秒执行 后面具体分析 scheduleUpdateIfAbsent(serviceName, clusters); //返回服务信息对象 return serviceInfoMap.get(serviceObj.getKey()); } //com.alibaba.nacos.client.naming.backups.FailoverReactor#getService public ServiceInfo getService(String key) { //根据 key 从服务信息 map 总获取服务信息 ServiceInfo serviceInfo = serviceMap.get(key); if (serviceInfo == null) { //为空 创建一个 ServiceInfo 对象 设置 name 为 key serviceInfo = new ServiceInfo(); serviceInfo.setName(key); } //返回服务信息 return serviceInfo; }
HostReactor#updateServiceNow 方法源码分析
HostReactor#updateServiceNow 方法更新服务信息,本质上其实是通过 HostReactor#updateService 方法,最终通过 NamingProxy 代理对象调用服务端的接口查询服务信息。
//com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow private void updateServiceNow(String serviceName, String clusters) { try { //更新服务信息 this.updateService(serviceName, clusters); } catch (NacosException var4) { LogUtils.NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, var4); } } //com.alibaba.nacos.client.naming.core.HostReactor#updateService public void updateService(String serviceName, String clusters) throws NacosException { //根据服务名称 和 集群名称 获取之前的 服务信息 ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters); //标识 boolean var12 = false; try { //标识为true var12 = true; //通过 NamingProxy 代理对象调用服务端的接口查询服务信息 (1.x 版本是 http 调用 2.x版本是 grpc调用) String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //调用结果不为空 处理结果 this.processServiceJson(result); //标识设置为 false var12 = false; } else { //调用结果为空 标识设置为 false 是否为空都设置为 false 那为啥有这么写 搞不懂... var12 = false; } } finally { if (var12) { //标识为 true if (oldService != null) { //oldService 不为空 synchronized(oldService) { //唤醒等待 前面有等待的线程 oldService.notifyAll(); } } } } if (oldService != null) { //oldService 不为空 synchronized(oldService) { //唤醒等待 前面有等待的线程 oldService.notifyAll(); } } }
NamingProxy#queryList 方法源码分析
NamingProxy#queryList 方法很简单,就是封装请求参数,发起 Http 调用(Nacos 1.X 是 Http 调用,2.X 使用的是 Grpc)。
//com.alibaba.nacos.client.naming.net.NamingProxy#queryList public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { Map<String, String> params = new HashMap(8); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); //服务请求地址 分析 Nacos Server 端服务发现源码时候 这个地址对应的方法就是入口 return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET"); }
NamingProxy#reqApi 方法源码分析
NamingProxy#reqApi 方法源码很简单,前面分析 Nacos 注册的时候也有类似的代码,进行一些简单判断后,通过 NamingProxy#callServer 方法向 Nacos Server 发起调用,查询服务信息。
//com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return this.reqApi(api, params, Collections.EMPTY_MAP, method); } //com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return this.reqApi(api, params, body, this.getServerList(), method); } //com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.util.List<java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { //参数中加入 namespaceId params.put("namespaceId", this.getNamespaceId()); //Nacos 服务地址是否为空 //nacosDomain nacos 域名是否为空 if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) { //地址和 域名都为空 抛出异常 throw new NacosException(400, "no server available"); } else { //异常 NacosException exception = new NacosException(); nacosDomain nacos 域名是否为空 if (StringUtils.isNotBlank(this.nacosDomain)) { //不为空 配置的 nacos 域名 int i = 0; //this.maxRetry 最大重试次数 默认3 while(i < this.maxRetry) { try { //发送请求到 Nacos Server 查询服务实例列表 return this.callServer(api, params, body, this.nacosDomain, method); } catch (NacosException var12) { //异常处理 exception = var12; if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12); } //调用次数 +1 ++i; } } } else { //获取 Nacos 服务数的随机数 Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); int i = 0; while(i < servers.size()) { String server = (String)servers.get(index); try { //发送请求到 Nacos Server 查询服务实例列表 return this.callServer(api, params, body, server, method); } catch (NacosException var13) { //异常 exception = var13; if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13); } index = (index + 1) % servers.size(); //调用次数+1 ++i; } } } LogUtils.NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", new Object[]{api, servers, exception.getErrCode(), exception.getErrMsg()}); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); } }
NamingProxy#callServer 方法源码分析
NamingProxy#callServer 方法是真正向 Nacos Server 发起 Http 请求的方法,封装签名信息、header 信息,发起调用后,处理返回结果。
//com.alibaba.nacos.client.naming.net.NamingProxy#callServer(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String, java.lang.String) public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { //记录开始时间 long start = System.currentTimeMillis(); //结束时间 long end = 0L; //签名信息 this.injectSecurityInfo(params); //header Header header = this.builderHeader(); //拼接请求 url String url; if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) { if (!IPUtil.containsPort(curServer)) { curServer = curServer + ":" + this.serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } else { url = curServer + api; } try { //发送 Http 请求 HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start)); //结果处理 if (restResult.ok()) { return (String)restResult.getData(); } else if (304 == restResult.getCode()) { return ""; } else { throw new NacosException(restResult.getCode(), restResult.getMessage()); } } catch (Exception var13) { LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13); throw new NacosException(500, var13); } } //com.alibaba.nacos.client.naming.net.NamingProxy#injectSecurityInfo private void injectSecurityInfo(Map<String, String> params) { if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) { params.put("accessToken", this.securityProxy.getAccessToken()); } String ak = this.getAccessKey(); String sk = this.getSecretKey(); params.put("app", AppNameUtils.getAppName()); if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) { try { String signData = getSignData((String)params.get("serviceName")); String signature = SignUtil.sign(signData, sk); params.put("signature", signature); params.put("data", signData); params.put("ak", ak); } catch (Exception var6) { LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6); } } }
HostReactor#scheduleUpdateIfAbsent 方法源码分析
前面分析时候提到不管是立即更新服务列表,还是定时更新服务列表,最终都会执行 HostReactor 中的updateService()方法,而执行者就是 HostReactor#scheduleUpdateIfAbsent 方法,源码如下:
//com.alibaba.nacos.client.naming.core.HostReactor#scheduleUpdateIfAbsent public void scheduleUpdateIfAbsent(String serviceName, String clusters) { //判断当前服务是否已经存在了 避免重复调用 if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { //synchronized 线程安全 synchronized(this.futureMap) { //再次判断当前服务是否存在了避免重复调用浪费资源 if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { //添加到调度任务中 重点关注 UpdateTask ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters)); //记录当前服务已经在更新任务中了 this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } } } //com.alibaba.nacos.client.naming.core.HostReactor#addTask public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) { return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS); } public class UpdateTask implements Runnable { long lastRefTime = 9223372036854775807L; private final String clusters; private final String serviceName; private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (this.failCount != limit) { ++this.failCount; } } private void resetFailCount() { this.failCount = 0; } public void run() { long delayTime = 1000L; try { //从本地缓存 serviceInfoMap 中获取服务信息 ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); if (serviceObj == null) { //为空调用 updateService 方法 最终会向 Nacos Server 发起请求获取最新的服务信息 HostReactor.this.updateService(this.serviceName, this.clusters); return; } //本地缓存中有 需要判断最后一次更新时间是否小于缓存刷新时间 if (serviceObj.getLastRefTime() <= this.lastRefTime) { //重新从注册中心获取 HostReactor.this.updateService(this.serviceName, this.clusters); //加入本地缓存 serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); } else { //刷新缓存 HostReactor.this.refreshOnly(this.serviceName, this.clusters); } //更新缓存刷新时间 this.lastRefTime = serviceObj.getLastRefTime(); //!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters): 服务实例不再订阅 //!HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters)) : futureMap 不包含 该服务 if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) { LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters); //返回 return; } //服务实例为空 if (CollectionUtils.isEmpty(serviceObj.getHosts())) { //失败次数+1 this.incFailCount(); return; } //延迟时间 delayTime = serviceObj.getCacheMillis(); //重置失败次数 this.resetFailCount(); } catch (Throwable var7) { //失败次数+1 this.incFailCount(); LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7); } finally { //定时任务更新缓存 没有异常的情况下是 6 秒一次 如果有异常 则最长为 1分钟 HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS); } } }
通过源码分析,我们知道最终开启了一个定时任务,传入了 UpdateTask 对象,而 UpdateTask 实现了 Runnable 接口,我们重点关注 UpdateTask # run 方法,如下:
public void run() { long delayTime = 1000L; try { //从本地缓存 serviceInfoMap 中获取服务信息 ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); if (serviceObj == null) { //为空调用 updateService 方法 最终会向 Nacos Server 发起请求获取最新的服务信息 HostReactor.this.updateService(this.serviceName, this.clusters); return; } //本地缓存中有 需要判断最后一次更新时间是否小于缓存刷新时间 if (serviceObj.getLastRefTime() <= this.lastRefTime) { //重新从注册中心获取 HostReactor.this.updateService(this.serviceName, this.clusters); //加入本地缓存 serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); } else { //刷新缓存 HostReactor.this.refreshOnly(this.serviceName, this.clusters); } //更新缓存刷新时间 this.lastRefTime = serviceObj.getLastRefTime(); //!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters): 服务实例不再订阅 //!HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters)) : futureMap 不包含 该服务 if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) { LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters); //返回 return; } //服务实例为空 if (CollectionUtils.isEmpty(serviceObj.getHosts())) { //失败次数+1 this.incFailCount(); return; } //延迟时间 delayTime = serviceObj.getCacheMillis(); //重置失败次数 this.resetFailCount(); } catch (Throwable var7) { //失败次数+1 this.incFailCount(); LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7); } finally { //定时任务更新缓存 没有异常的情况下是 6 秒一次 如果有异常 则最长为 1分钟 HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS); } }
整个 UpdateTask # run 方法就是服务发现(订阅)的核心,UpdateTask # run 会先判断本地缓存是否有服务信息对象,以及缓存是否过期,当不存在或者过期的时候,会去查询注册中心,获取最新实例,更新最后获取时间,处理服务信息,在最后会计算任务时间,循环执行流程,UpdateTask # run 方法的最核心点是在 finally 中,finally 中执行了定时任务,定时更新服务信息,没有异常的情况下是 6 秒执行一次,最长时间是一分钟执行一次,这个问题在 Nacos 相关面试中也是非常常见的问题,如果你没有看过源码,可能你知道是 6 秒一次,但你可能不知道有可能是一分钟执行一次,以及它是在哪里实现的。
欢迎提出建议及对错误的地方指出纠正。