如第二部分(动态代理部分)所言,客户端在向服务端发起调用时,会拉取到提供服务的实例列表,并使用负载均衡算法选择其中一台机器,众多负载均衡算法可通过SPI进行扩展。
private final LoadBalance loadBalance; public ZkServiceDiscoveryImpl() { this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(LoadBalanceEnum.LOADBALANCE.getName()); } String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
在众多负载均衡算法中,一致性哈希算法是比较经典的一种,主要介绍一致性哈希算法:
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>(); @Override protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) { // 返回给定对象的哈希码,该哈希码与默认的方法hashCode()返回的值相同,但不受对象类中hashCode()方法的覆盖影响 // 存储serviceAddresses对象的身份哈希码。这个变量后续可能被用于判断serviceAddresses对象是否发生了变化,如果对象的身份哈希码与之前存储的值不同,那么可以认为对象已经被修改 // 帮助系统识别服务地址列表是否发生了变化,从而决定是否需要重新构建哈希环等操作 int identityHashCode = System.identityHashCode(serviceAddresses); // build rpc service name by rpcRequest String rpcServiceName = rpcRequest.getRpcServiceName(); // 所调用服务的哈希环,从CHM本地缓存中取出 ConsistentHashSelector selector = selectors.get(rpcServiceName); // check for updates // 判断身份标识,如果身份标识不符或者缓存中无数据,就创建新的哈希环 if (selector == null || selector.identityHashCode != identityHashCode) { // 使用新的服务列表创建哈希环,虚拟节点数为160,并设置身份标识 selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode)); selector = selectors.get(rpcServiceName); } // 取出对应的哈希环实例,并进行筛选 // 使用+操作符连接字符串和任何其他类型的对象时,Java会自动调用对象的toString()方法将其转换为字符串 // 流对象的toString()方法会被调用,返回流对象的字符串表示 return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters())); }
参数需要传入服务列表与RPC调用的请求参数,一致性哈希需要根据请求参数来确定对应机器实例。首先是根据对应服务的服务列表来获取身份标识,身份标识的作用是判断可用机器实例是否有发生变化,如果发生变化,那么本地缓存中对存储的应服务的哈希环就失效了,使用本地缓存也是因为构造哈希环是对计算资源有一定损耗的。
如果身份标识不一致或缓存中根本没有对应哈希环,那么就需要创建对应的哈希环:
private final TreeMap<Long, String> virtualInvokers; private final int identityHashCode; ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) { this.virtualInvokers = new TreeMap<>(); this.identityHashCode = identityHashCode; for (String invoker : invokers) { // 虚拟节点数量为replicaNumber for (int i = 0; i < replicaNumber / 4; i++) { // 哈希值为机器标识+循环号来确定 byte[] digest = md5(invoker + i); for (int h = 0; h < 4; h++) { // 哈希值计算,再根据索引号打散一部分,但所有虚拟节点都对应着该机器 long m = hash(digest, h); // 将虚拟节点与机器进行关联 virtualInvokers.put(m, invoker); } } } }
哈希值和机器的映射保存在TreeMap中。需要注意的是,每个机器对应着160个虚拟节点,这些节点通过拼接尾号与计算哈希值时使用不同索引来进行打散操作,通过虚拟节点进行打散可以有效避免数据倾斜的情况(除非某个参数的调用量非常的大)。
获取哈希环后,传入本次请求的参数,以此选择机器:
public String select(String rpcServiceKey) { // 传入参数哈希后的哈希结果,字节数组 byte[] digest = md5(rpcServiceKey); // 根据返回的长整型的哈希值在哈希环的位置筛选出机器 return selectForKey(hash(digest, 0)); } public String selectForKey(long hashCode) { // 找到大于或等于给定哈希值的最小的虚拟节点,在一致性哈希环中找到与给定哈希值最接近的服务地址,以实现负载均衡,tailMap是TreeMap提供的方法 Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry(); // 哈希值大于哈希环中所有节点的值 if (entry == null) { // 获取哈希环中哈希值最小的元素,即超过最后一个节点就取第一个节点 entry = virtualInvokers.firstEntry(); } return entry.getValue(); }
计算出传入参数的哈希值之后,需要在之前的treeMap上查询第一个大于当前节点哈希值的节点,并据此得到对应的机器,使用treemap也是由于其提供的tailMap方法非常便于实现这一需求。另外如果没有筛选到节点,说明其哈希值大于哈希环上哈希值最大的节点,应该取哈希环上的第一个节点。
按上述方式就完成了使用一致性哈希算法来选择机器的过程,生产场景中也是经常需要使用一致性哈希算法进行负载均衡,比如服务端机器本地缓存了一些非常耗时的操作,或者是不同的参数需要依赖不同的文件,为了防止文件的重复下载,也需要使用一致性哈希来进行负载均衡。