Nacos 服务发现(订阅)源码分析(客户端)

avatar
作者
筋斗云
阅读量:6

前言:

前面我们分析了 Nacos 客户端和服务端的服务注册流程,服务注册的最终目的是为了服务间的调用的,服务间的调用,需要知道目标服务有哪些实例,也就是服务发现(订阅),本篇我们来分析一下 Nacos 的服务订阅源码。

Nacos 系列文章传送门:

Nacos 初步认识和 Nacos 部署细节

Nacos 配置管理模型 – 命名空间(Namespace)、配置分组(Group)和配置集ID(Data ID)

Nacos 注册中心和配置中心【实战】

服务启动何时触发 Nacos 的注册流程?

Nacos Client 端服务注册流程源码分析

Nacos Server 端服务注册流程源码分析

服务发现

我们在分析 服务启动何时触发 Nacos 的注册流程时分析到 NacosServiceRegistryAutoConfiguration 源码,NacosServiceRegistryAutoConfiguration 由,Spring Boot 自动配置加载的,最终注入到 NacosAutoServiceRegistration 完成了服务注册,我们查看 spring-cloud-starter-alibaba-nacos-discovery.jar 包路径下的 spring.factories 文件,如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\   com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\   com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\   com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\   com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\   com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\   com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\   com.alibaba.cloud.nacos.NacosServiceAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\   com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration 

我们注意更客户端服务发现有关的有两个自动配置类 NacosDiscoveryAutoConfiguration 和 NacosDiscoveryClientConfiguration,源码如下:

package com.alibaba.cloud.nacos.discovery;  import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @Configuration(     proxyBeanMethods = false ) @ConditionalOnDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryAutoConfiguration {     public NacosDiscoveryAutoConfiguration() {     } 	     @Bean     @ConditionalOnMissingBean     public NacosDiscoveryProperties nacosProperties() {         return new NacosDiscoveryProperties();     }  	//创建 NacosServiceDiscovery 对象     @Bean     @ConditionalOnMissingBean     public NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) {         return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);     } }     package com.alibaba.cloud.nacos.discovery;  import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.CommonsClientAutoConfiguration; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;  @Configuration(     proxyBeanMethods = false ) @ConditionalOnDiscoveryEnabled @ConditionalOnBlockingDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled @AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class}) //在 NacosDiscoveryAutoConfiguration 自动装配完成后才执行 @AutoConfigureAfter({NacosDiscoveryAutoConfiguration.class}) public class NacosDiscoveryClientConfiguration {     public NacosDiscoveryClientConfiguration() {     } 	 	//创建 DiscoveryClient 对象     @Bean     public DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {         return new NacosDiscoveryClient(nacosServiceDiscovery);     }      @Bean     @ConditionalOnMissingBean     @ConditionalOnProperty(         value = {"spring.cloud.nacos.discovery.watch.enabled"},         matchIfMissing = true     )     public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties, ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) {         return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties, taskExecutorObjectProvider);     } } 

NacosDiscoveryAutoConfiguration 类自动配置类中,创建了一个 NacosServiceDiscovery 的对象,NacosDiscoveryClientConfiguration 类自动装配时,会创建一个 DiscoveryClient 对象,需要用到前面创建的 NacosServiceDiscovery 对象,因此 NacosDiscoveryClientConfiguration 类的自动装配必须要在 NacosDiscoveryAutoConfiguration 类之后。

NacosDiscoveryClient 源码分析

NacosDiscoveryClient 实现了 Spring Cloud 的 DiscoveryClient 接口,我们需要重点关注的是 getInstances() 和 getServices()方法,这两个方法都是由 NacosServiceDiscovery 类实现的。

package com.alibaba.cloud.nacos.discovery;  import java.util.Collections; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient;  public class NacosDiscoveryClient implements DiscoveryClient {     private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);     public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";     private NacosServiceDiscovery serviceDiscovery;      public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {         this.serviceDiscovery = nacosServiceDiscovery;     }      public String description() {         return "Spring Cloud Nacos Discovery Client";     } 	 	//获取服务实例列表     public List<ServiceInstance> getInstances(String serviceId) {         try {             return this.serviceDiscovery.getInstances(serviceId);         } catch (Exception var3) {             throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);         }     }  	//获取服务列表     public List<String> getServices() {         try {             return this.serviceDiscovery.getServices();         } catch (Exception var2) {             log.error("get service name from nacos server fail,", var2);             return Collections.emptyList();         }     } } 

NacosServiceDiscovery 源码分析

NacosServiceDiscovery 类的代码也非常简单,就提供了 getInstances() 和 getServices() 方法,分别去获取服务实例信息和服务信息,具体请看如下源码分析:

package com.alibaba.cloud.nacos.discovery;  import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceInstance; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.springframework.cloud.client.ServiceInstance;  public class NacosServiceDiscovery { 	 	//Nacos 配置 属性注入的时候已经赋值了     private NacosDiscoveryProperties discoveryProperties;      public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties) {         this.discoveryProperties = discoveryProperties;     }  	//获取服务实例列表     public List<ServiceInstance> getInstances(String serviceId) throws NacosException { 		//获取组 group         String group = this.discoveryProperties.getGroup(); 		//根据 serviceId 和 group 获取服务实例列表 		//通过 NacosFactory 反射创建 NacosNamingService         List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true); 		//封装服务实例数据         return hostToServiceInstanceList(instances, serviceId);     } 	 	//获取服务列表     public List<String> getServices() throws NacosException { 		//获取组 group         String group = this.discoveryProperties.getGroup(); 		//获取服务列表         ListView<String> services = this.discoveryProperties.namingServiceInstance().getServicesOfServer(1, 2147483647, group);         return services.getData();     } 	 	//封装服务实例     public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) { 		//迭代遍历获取到的服务实例列表         List<ServiceInstance> result = new ArrayList(instances.size());         Iterator var3 = instances.iterator(); 		         while(var3.hasNext()) { 			//实例对象             Instance instance = (Instance)var3.next(); 			//封装实例 会对实例状态进行判断             ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);             if (serviceInstance != null) {                 result.add(serviceInstance);             }         }          return result;     }  	//封装服务实例对象     public static ServiceInstance hostToServiceInstance(Instance instance, String serviceId) { 		//实例为空判断 		//实例是否开启 		//实例是否健康         if (instance != null && instance.isEnabled() && instance.isHealthy()) { 			//创建 Nacos 服务实例对象             NacosServiceInstance nacosServiceInstance = new NacosServiceInstance(); 			//设置 ip             nacosServiceInstance.setHost(instance.getIp()); 			//设置端口             nacosServiceInstance.setPort(instance.getPort()); 			//设置服务id             nacosServiceInstance.setServiceId(serviceId); 			//元数据             Map<String, String> metadata = new HashMap(); 			//实例id             metadata.put("nacos.instanceId", instance.getInstanceId()); 			//实例权重             metadata.put("nacos.weight", instance.getWeight() + ""); 			//实例健康状态             metadata.put("nacos.healthy", instance.isHealthy() + ""); 			//集群名称             metadata.put("nacos.cluster", instance.getClusterName() + ""); 			//实例所有元数据             metadata.putAll(instance.getMetadata()); 			//设置 Nacos 服务实例对象 元数据             nacosServiceInstance.setMetadata(metadata); 			//安全 设置             if (metadata.containsKey("secure")) {                 boolean secure = Boolean.parseBoolean((String)metadata.get("secure"));                 nacosServiceInstance.setSecure(secure);             }              return nacosServiceInstance;         } else {             return null;         }     } } 

NacosNamingService#selectInstances 方法源码分析

NacosNamingService#selectInstances 方法主要通过重载方法 HostReactor#getServiceInfo 方法,获取服务实例列表返回。

//com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean) public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { 	return this.selectInstances(serviceName, groupName, healthy, true); }  //重载方法 //com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean, boolean) public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException { 	return this.selectInstances(serviceName, groupName, new ArrayList(), healthy, subscribe); }  //com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean, boolean) public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { 	//服务信息 	ServiceInfo serviceInfo; 	//是否订阅服务 subscribe 默认 true 	if (subscribe) { 		//订阅服务信息 		serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); 	} else { 		//直接从 Nacos 拉取服务信息 		serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); 	} 	//从服务信息中获取服务实例列表返回 	return this.selectInstances(serviceInfo, healthy); } 

HostReactor#getServiceInfo 方法源码分析

HostReactor#getServiceInfo 先从本地 serviceInfoMap 缓存获取服务实例列表,如果本地 serviceInfoMap 中获取不到,则创建一个空的服务信息对象,存入到本地缓存中,然后立刻去更新服务信息(实际是发送请求到 Nacos 服务端,获取服务列表),如果本地缓存有,则开启定时更新功能,并返回缓存结果, 需要注意的是不管是立即更新服务列表,还是定时更新服务列表,最终都会执行 HostReactor 中的updateService()方法。

//com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { 	 	NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); 	//拼接 key 服务名@@集群名 	String key = ServiceInfo.getKey(serviceName, clusters); 	//是否打开故障转移开关 	if (failoverReactor.isFailoverSwitch()) { 		return failoverReactor.getService(key); 	} 	//读取本地服务列表的缓存 	//private final Map<String, ServiceInfo> serviceInfoMap; 	ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); 	//服务信息为空判断 	if (null == serviceObj) { 		//为空 创建一个 ServiceInfo 		serviceObj = new ServiceInfo(serviceName, clusters); 		//存入缓存 		serviceInfoMap.put(serviceObj.getKey(), serviceObj); 		//存入待更新的服务列表 		//private final Map<String, Object> updatingMap; 		updatingMap.put(serviceName, new Object()); 		//立刻更新服务列表 		updateServiceNow(serviceName, clusters); 		//从待更新的服务列表中移除 		updatingMap.remove(serviceName); 		 	} else if (updatingMap.containsKey(serviceName)) { 		//待更新列表中有服务对象 		//保证线程安全 		synchronized(serviceObj) { 				try { 					//等待 5 秒 等更新完成 					serviceObj.wait(5000L); 				} catch (InterruptedException var8) { 					LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8); 				} 			} 	} 	     // 开启定时更新服务列表的功能 会延迟1秒执行  后面具体分析 	scheduleUpdateIfAbsent(serviceName, clusters); 	//返回服务信息对象 	return serviceInfoMap.get(serviceObj.getKey()); }  //com.alibaba.nacos.client.naming.backups.FailoverReactor#getService public ServiceInfo getService(String key) { 	//根据 key 从服务信息 map 总获取服务信息 	ServiceInfo serviceInfo = serviceMap.get(key); 	 	if (serviceInfo == null) { 		//为空 创建一个 ServiceInfo 对象 设置 name 为 key 		serviceInfo = new ServiceInfo(); 		serviceInfo.setName(key); 	} 	//返回服务信息 	return serviceInfo; }  

HostReactor#updateServiceNow 方法源码分析

HostReactor#updateServiceNow 方法更新服务信息,本质上其实是通过 HostReactor#updateService 方法,最终通过 NamingProxy 代理对象调用服务端的接口查询服务信息。

//com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow private void updateServiceNow(String serviceName, String clusters) { 	try { 		//更新服务信息 		this.updateService(serviceName, clusters); 	} catch (NacosException var4) { 		LogUtils.NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, var4); 	}  }  //com.alibaba.nacos.client.naming.core.HostReactor#updateService public void updateService(String serviceName, String clusters) throws NacosException { 	//根据服务名称 和 集群名称 获取之前的 服务信息 	ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters); 	//标识 	boolean var12 = false;  	try { 		//标识为true 		var12 = true; 		//通过 NamingProxy  代理对象调用服务端的接口查询服务信息 (1.x 版本是 http 调用 2.x版本是 grpc调用) 		String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false); 		if (StringUtils.isNotEmpty(result)) { 			//调用结果不为空 处理结果 			this.processServiceJson(result); 			//标识设置为 false  			var12 = false; 		} else { 			//调用结果为空  标识设置为 false 是否为空都设置为 false  那为啥有这么写  搞不懂... 			var12 = false; 		} 	} finally { 		if (var12) { 			//标识为 true 			if (oldService != null) { 				//oldService 不为空 				synchronized(oldService) { 					//唤醒等待 前面有等待的线程 					oldService.notifyAll(); 				} 			}  		} 	} 	 	if (oldService != null) { 		//oldService 不为空 		synchronized(oldService) { 			//唤醒等待 前面有等待的线程 			oldService.notifyAll(); 		} 	}  } 

NamingProxy#queryList 方法源码分析

NamingProxy#queryList 方法很简单,就是封装请求参数,发起 Http 调用(Nacos 1.X 是 Http 调用,2.X 使用的是 Grpc)。

//com.alibaba.nacos.client.naming.net.NamingProxy#queryList public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { 	Map<String, String> params = new HashMap(8); 	params.put("namespaceId", this.namespaceId); 	params.put("serviceName", serviceName); 	params.put("clusters", clusters); 	params.put("udpPort", String.valueOf(udpPort)); 	params.put("clientIP", NetUtils.localIP()); 	params.put("healthyOnly", String.valueOf(healthyOnly)); 	//服务请求地址 分析 Nacos Server 端服务发现源码时候 这个地址对应的方法就是入口 	return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET"); } 

NamingProxy#reqApi 方法源码分析

NamingProxy#reqApi 方法源码很简单,前面分析 Nacos 注册的时候也有类似的代码,进行一些简单判断后,通过 NamingProxy#callServer 方法向 Nacos Server 发起调用,查询服务信息。

//com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, String method) throws NacosException { 	return this.reqApi(api, params, Collections.EMPTY_MAP, method); }  //com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { 	return this.reqApi(api, params, body, this.getServerList(), method); }   //com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.util.List<java.lang.String>, java.lang.String) public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { 	//参数中加入 namespaceId 	params.put("namespaceId", this.getNamespaceId()); 	//Nacos 服务地址是否为空 	//nacosDomain nacos 域名是否为空 	if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) { 		//地址和 域名都为空 抛出异常 		throw new NacosException(400, "no server available"); 	} else { 		//异常 		NacosException exception = new NacosException(); 		nacosDomain nacos 域名是否为空 		if (StringUtils.isNotBlank(this.nacosDomain)) { 			//不为空 配置的 nacos 域名 			int i = 0; 			//this.maxRetry 最大重试次数 默认3 			while(i < this.maxRetry) { 				try { 					//发送请求到 Nacos Server 查询服务实例列表 					return this.callServer(api, params, body, this.nacosDomain, method); 				} catch (NacosException var12) { 					//异常处理 					exception = var12; 					if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { 						LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12); 					} 					//调用次数 +1 					++i; 				} 			} 		} else { 			//获取 Nacos 服务数的随机数 			Random random = new Random(System.currentTimeMillis()); 			int index = random.nextInt(servers.size()); 			int i = 0;  			while(i < servers.size()) { 				String server = (String)servers.get(index);  				try { 					//发送请求到 Nacos Server 查询服务实例列表 					return this.callServer(api, params, body, server, method); 				} catch (NacosException var13) { 					//异常 					exception = var13; 					if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { 						LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13); 					}  					index = (index + 1) % servers.size(); 					//调用次数+1 					++i; 				} 			} 		}  		LogUtils.NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", new Object[]{api, servers, exception.getErrCode(), exception.getErrMsg()}); 		throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); 	} } 

NamingProxy#callServer 方法源码分析

NamingProxy#callServer 方法是真正向 Nacos Server 发起 Http 请求的方法,封装签名信息、header 信息,发起调用后,处理返回结果。

//com.alibaba.nacos.client.naming.net.NamingProxy#callServer(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String, java.lang.String) public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { 	//记录开始时间 	long start = System.currentTimeMillis(); 	//结束时间 	long end = 0L; 	//签名信息 	this.injectSecurityInfo(params); 	//header 	Header header = this.builderHeader(); 	//拼接请求 url 	String url; 	if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) { 		if (!IPUtil.containsPort(curServer)) { 			curServer = curServer + ":" + this.serverPort; 		}  		url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; 	} else { 		url = curServer + api; 	}  	try { 		//发送 Http 请求 		HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); 		end = System.currentTimeMillis(); 		MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start)); 		//结果处理 		if (restResult.ok()) { 			return (String)restResult.getData(); 		} else if (304 == restResult.getCode()) { 			return ""; 		} else { 			throw new NacosException(restResult.getCode(), restResult.getMessage()); 		} 	} catch (Exception var13) { 		LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13); 		throw new NacosException(500, var13); 	} }  //com.alibaba.nacos.client.naming.net.NamingProxy#injectSecurityInfo private void injectSecurityInfo(Map<String, String> params) { 	if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) { 		params.put("accessToken", this.securityProxy.getAccessToken()); 	}  	String ak = this.getAccessKey(); 	String sk = this.getSecretKey(); 	params.put("app", AppNameUtils.getAppName()); 	if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) { 		try { 			String signData = getSignData((String)params.get("serviceName")); 			String signature = SignUtil.sign(signData, sk); 			params.put("signature", signature); 			params.put("data", signData); 			params.put("ak", ak); 		} catch (Exception var6) { 			LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6); 		} 	}  } 

HostReactor#scheduleUpdateIfAbsent 方法源码分析

前面分析时候提到不管是立即更新服务列表,还是定时更新服务列表,最终都会执行 HostReactor 中的updateService()方法,而执行者就是 HostReactor#scheduleUpdateIfAbsent 方法,源码如下:

//com.alibaba.nacos.client.naming.core.HostReactor#scheduleUpdateIfAbsent public void scheduleUpdateIfAbsent(String serviceName, String clusters) { 	//判断当前服务是否已经存在了 避免重复调用 	if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { 		//synchronized 线程安全 		synchronized(this.futureMap) { 			//再次判断当前服务是否存在了避免重复调用浪费资源 			if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { 				//添加到调度任务中 重点关注 UpdateTask 				ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters)); 				//记录当前服务已经在更新任务中了 				this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); 			} 		} 	} }  //com.alibaba.nacos.client.naming.core.HostReactor#addTask public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) { 	return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS); }  public class UpdateTask implements Runnable { 	long lastRefTime = 9223372036854775807L; 	private final String clusters; 	private final String serviceName; 	private int failCount = 0;  	public UpdateTask(String serviceName, String clusters) { 		this.serviceName = serviceName; 		this.clusters = clusters; 	}  	private void incFailCount() { 		int limit = 6; 		if (this.failCount != limit) { 			++this.failCount; 		} 	}  	private void resetFailCount() { 		this.failCount = 0; 	}  	public void run() { 		long delayTime = 1000L;  		try { 			//从本地缓存 serviceInfoMap 中获取服务信息 			ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); 			if (serviceObj == null) { 				//为空调用 updateService 方法 最终会向 Nacos Server 发起请求获取最新的服务信息 				HostReactor.this.updateService(this.serviceName, this.clusters); 				return; 			} 			//本地缓存中有 需要判断最后一次更新时间是否小于缓存刷新时间 			if (serviceObj.getLastRefTime() <= this.lastRefTime) { 				//重新从注册中心获取 				HostReactor.this.updateService(this.serviceName, this.clusters); 				//加入本地缓存 				serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); 			} else { 				//刷新缓存 				HostReactor.this.refreshOnly(this.serviceName, this.clusters); 			} 			//更新缓存刷新时间 			this.lastRefTime = serviceObj.getLastRefTime(); 			//!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters): 服务实例不再订阅 			//!HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters)) : futureMap 不包含 该服务 			if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) { 				LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters); 				//返回 				return; 			} 			//服务实例为空  			if (CollectionUtils.isEmpty(serviceObj.getHosts())) { 				//失败次数+1 				this.incFailCount(); 				return; 			} 			//延迟时间  			delayTime = serviceObj.getCacheMillis(); 			//重置失败次数 			this.resetFailCount(); 		} catch (Throwable var7) { 			//失败次数+1 			this.incFailCount(); 			LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7); 		} finally { 			//定时任务更新缓存 没有异常的情况下是 6 秒一次 如果有异常 则最长为 1分钟 			HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS); 		}  	} } 

通过源码分析,我们知道最终开启了一个定时任务,传入了 UpdateTask 对象,而 UpdateTask 实现了 Runnable 接口,我们重点关注 UpdateTask # run 方法,如下:

public void run() { 	long delayTime = 1000L;  	try { 		//从本地缓存 serviceInfoMap 中获取服务信息 		ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); 		if (serviceObj == null) { 			//为空调用 updateService 方法 最终会向 Nacos Server 发起请求获取最新的服务信息 			HostReactor.this.updateService(this.serviceName, this.clusters); 			return; 		} 		//本地缓存中有 需要判断最后一次更新时间是否小于缓存刷新时间 		if (serviceObj.getLastRefTime() <= this.lastRefTime) { 			//重新从注册中心获取 			HostReactor.this.updateService(this.serviceName, this.clusters); 			//加入本地缓存 			serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); 		} else { 			//刷新缓存 			HostReactor.this.refreshOnly(this.serviceName, this.clusters); 		} 		//更新缓存刷新时间 		this.lastRefTime = serviceObj.getLastRefTime(); 		//!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters): 服务实例不再订阅 		//!HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters)) : futureMap 不包含 该服务 		if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) { 			LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters); 			//返回 			return; 		} 		//服务实例为空  		if (CollectionUtils.isEmpty(serviceObj.getHosts())) { 			//失败次数+1 			this.incFailCount(); 			return; 		} 		//延迟时间  		delayTime = serviceObj.getCacheMillis(); 		//重置失败次数 		this.resetFailCount(); 	} catch (Throwable var7) { 		//失败次数+1 		this.incFailCount(); 		LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7); 	} finally { 		//定时任务更新缓存 没有异常的情况下是 6 秒一次 如果有异常 则最长为 1分钟 		HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS); 	}  } 

整个 UpdateTask # run 方法就是服务发现(订阅)的核心,UpdateTask # run 会先判断本地缓存是否有服务信息对象,以及缓存是否过期,当不存在或者过期的时候,会去查询注册中心,获取最新实例,更新最后获取时间,处理服务信息,在最后会计算任务时间,循环执行流程,UpdateTask # run 方法的最核心点是在 finally 中,finally 中执行了定时任务,定时更新服务信息,没有异常的情况下是 6 秒执行一次,最长时间是一分钟执行一次,这个问题在 Nacos 相关面试中也是非常常见的问题,如果你没有看过源码,可能你知道是 6 秒一次,但你可能不知道有可能是一分钟执行一次,以及它是在哪里实现的。

欢迎提出建议及对错误的地方指出纠正。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!