关于java:一致性-hash-环

6次阅读

共计 16987 个字符,预计需要花费 43 分钟才能阅读完成。

一致性 hash 环

最近做我的项目 做了一个散发器,须要 依据申请携带的参数 把申请散发到 不同的服务器下面,最终我抉择应用 一致性 hash 环 来实现,本篇 就次要解说一下 一致性 hash 环 它的基本原理

概述

一致性 hash 算法 因为 均衡性 持久性的映射特点 被广泛应用于负载平衡畛域,比方 nginx、dubbo、等等 外部都有一致性 hash 的实现,比方 dubbo,当你调用 rpc 接口的时候,如果有 2 个提供者,那么你能够通过配置 让其调用通过 一致性 hash 进行计算 而后散发到具体的 某个实例接口上。

1.hash 算法 在负载平衡中的问题

先来看看一般的 hash 算法的特点,一般的 hash 算法就是把一系列输出 打散成随机的数据,负载平衡就是利用这一点个性,对于大量申请调用,通过肯定的 hash 将它们平均的散列,从而实现压力平均化

如果下面图中 key 作为缓存的 key,node 中寸入该 key 对应的 value,就是一个 简略的分布式缓存零碎了。

问题:能够看出 当 N 节点数发生变化的时候 之前所有的 hash 映射简直全副生效,如果集群是无状态的服务 倒是没什么事件,然而如果是 分布式缓存这种,比方 映射的 key1 本来是去 node1 上查问 缓存的 value1,然而当 N 节点变动后 hash 后的 key1 可能去了 node2 这样 就产生了致命问题。。

2. 一致性 hash 算法

一致性 hash 算法就是来解决 下面的问题

2.1 特点(重要)

上面阐明 一致性 hash 算法的 2 个 重要的特点

  • 平衡性

    平衡性是指哈希的后果可能尽可能散布到所有的缓冲中去,这样能够使得所有的缓冲空间都失去利用。很多哈希算法都可能满足这一条件。

  • 枯燥性

    枯燥性是指如果曾经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲区退出到零碎中,那么哈希的后果应可能保障原有已调配的内容能够被映射到新的缓冲区中去,而不会被映射到旧的缓冲汇合中的其余缓冲区。简略的哈希算法往往不能满足枯燥性的要求

2.2 原理

一致性哈希将整个哈希值空间组织成一个虚构的圆环,如假如某哈希函数 H 的值空间为 0 -2^32-1(即哈希值是一个 32 位无符号整形),整个哈希空间环如下:

就是所有的输出值都被映射到 0-2^32-1 之间,组成一个圆环

下一步将各个服务器应用 Hash 进行一个哈希,具体能够抉择服务器的 ip 或主机名或者其余业务属性作为关键字 进行哈希,这样每台机器就能确定其在哈希环上的地位,这里假如将上文中四台服务器应用 ip 地址哈希后在环空间的地位如下:

如果服务器数量不多且个数绝对稳固,咱们也能够手动设置这些服务器的地位,如设 A 在 2^30- 1 地位,B 在 2^31- 1 地位,C 在 3 *2^30- 1 地位,D 在 2^32- 1 地位,则 hash 值为 0~2^30- 1 的数据存储在 A 中,hash 值为 2^30-1~2^31- 1 的数据存储在 B 中,以此类推。

定位数据存储的办法:将数据 key 应用雷同的函数 Hash 计算出哈希值,并确定此数据在环上的地位,从此地位沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。能够为这些服务器保护一条二分查找树,定位服务器的过程就是在二分查找树中找刚好比其大的节点。

例如咱们有 Object A、Object B、Object C、Object D 四个数据对象,通过哈希计算后,在环空间上的地位如下:

2.3 一致性 hash 长处

如上图,如果当节点 A 宕机了,那么只会影响 objectA 它会被从新映射到 NodeB 节点,其余的 ObjectB ObjectC ObjectD 都不会受到影响,大大提高了容错性和稳定性

2.4 一致性 hash 存在的问题

2.3.1 数据分布不平均

当节点 Node 很少的时候 比方 2 台机器,那么 必然造成大量数据集中在 NodeA,大量在 NodeB

2.3.2 雪崩

当某个节点宕机后,本来属于它的申请 都会被从新 hash 映射到 上游节点,会忽然造成上游节点压力过大 有可能也会造成 上游节点宕机,从而造成雪崩 这是致命的

为此 引入了 虚构节点来解决下面两个问题

3. 带虚构节点的一致性 hash

就是会在圆环上 依据 Node 节点 生成很多的虚构节点 散布在圆环上,这样当 某个 节点挂了后 本来属于它的申请,会被平衡的散布到 其余节点上 升高了产生雪崩的状况,也解决了 节点数少导致 申请散布不均的申请

即对每一个服务节点计算多个哈希(能够用原节点 key+”##xxxk” 作为每个虚构节点的 key,而后求 hashcode),每个计算结果地位都搁置一个此服务节点,称为虚构节点。具体做法能够在服务器 ip 或主机名的前面减少编号来实现。

看上图,此时如果 group3 节点挂了,那么申请会被均分到 group2 和 group1 下面,到此 一致性 hash 的正确生产的应用形式解说完了 上面来看看一个案例代码。

4. 代码测试

可供选择的有很多,memcached 官网应用了基于 md5 的 KETAMA 算法,但这里处于计算效率的思考,应用了 FNV1_32_HASH 算法,如下:

public class HashUtil {
    /**
     * 计算 Hash 值, 应用 FNV1_32_HASH 算法
     * @param str
     * @return
     */
    public static int getHash(String str) {
        final int p = 16777619;
        int hash = (int)2166136261L;
        for (int i = 0; i < str.length(); i++) {hash =( hash ^ str.charAt(i) ) * p;
        }
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;

        if (hash < 0) {hash = Math.abs(hash);
        }
        return hash;
    }
}
package com.weareint.dispatchservice.hashloop;

import org.springframework.stereotype.Component;

import java.util.*;

/**
 *
 *
 * <pre>
 *  一致性 hash 虚构 环
 * </pre>
 *
 * @author johnny
 * @date 2021-08-26 9:22 上午
 */
@Component
public class HashVirtualNodeCircle {

    /** 实在集群列表 */
    private static List<String> instanceNodes;

    /** 虚构节点映射关系 */
    private static SortedMap<Integer, String> virtualNodes = new TreeMap<>();

    /** 虚构节点数 */
    private static final int VIRTUAL_NODE_NUM = 1000;

    /** 刷新 服务实例 */
    public void refreshVirtualHashCircle(HashCircleInstanceNodeBuild build) {
        // 当集群变动时,刷新 hash 环,其余的集群在 hash 环上的地位不会产生变动
        virtualNodes.clear();
        // 获取 最新节点
        instanceNodes = build.instanceNodes();
        // 将虚构节点映射到 Hash 环上
        for (String realInstance : instanceNodes) {for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {String virtualNodeName = getVirtualNodeName(realInstance, i);
                int hash = HashUtils.getHash(virtualNodeName);
                System.out.println("[" + virtualNodeName + "] launched @" + hash);
                virtualNodes.put(hash, virtualNodeName);
            }
        }
    }

    private static String getVirtualNodeName(String realName, int num) {return realName + "&&VN" + num;}

    private static String getRealNodeName(String virtualName) {return virtualName.split("&&")[0];
    }

    private static String getServer(String widgetKey) {int hash = HashUtils.getHash(widgetKey);
        // 只取出所有大于该 hash 值的局部而不用遍历整个 Tree
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        String virtualNodeName;
        if (subMap.isEmpty()) {
            // hash 值在最尾部,应该映射到第一个 group 上
            virtualNodeName = virtualNodes.get(virtualNodes.firstKey());
        } else {virtualNodeName = subMap.get(subMap.firstKey());
        }
        return getRealNodeName(virtualNodeName);
    }

    public static void main(String[] args) {HashVirtualNodeCircle hashVirtualNodeCircle = new HashVirtualNodeCircle();
        hashVirtualNodeCircle.refreshVirtualHashCircle(new HashCircleInstanceNodeBuild() {
                    @Override
                    public List<String> instanceNodes() {LinkedList<String> nodes = new LinkedList<>();
                        nodes.add("192.168.11.23:8090");
                        nodes.add("192.168.11.23:8093");
                        nodes.add("192.168.11.23:8094");
                        return nodes;
                    }
                });

        // 生成随机数进行测试
        Map<String, Integer> resMap = new HashMap<>();

        List<String> plcList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            String plchost = "192.168.0." + i + 1;
            for (int j = 0; j < 10; j++) {plcList.add(plchost + ":" + j + 100);
            }
        }

        for (int i = 0; i < plcList.size(); i++) {String plcwideget = plcList.get(i);
            String group = getServer(plcwideget);
            if (resMap.containsKey(group)) {resMap.put(group, resMap.get(group) + 1);
            } else {resMap.put(group, 1);
            }
        }

        resMap.forEach((k, v) -> {System.out.println("group" + k + ":" + v + "(" + v / 100.0D + "%)");
                });

        System.out.println("=========================================");

    }
}

能够看到 散布很平衡

5.Dubbo 一致性 Hash 实现

最近给公司做的散发器 应用 dubbo 调用近程服务,调研了一下 dubbo 也有本人实现的 一致性 hash 不过理论应用起来 发现有些 bug,目前通过 SPI 机制 本人扩大了一下,来看看 dubbo 的 一致性 hash 实现吧

5.1 版本 dubbo2.7.12

我用的版本是 dubbo2.7.12 曾经入了 2.7 的坑 不少坑都是本人缓缓调试解决的。

5.2 org.apache.dubbo.rpc.cluster.loadbalance

负载平衡根本就这些 一致性 hash,随机,轮训,。。没啥特地的

5.3 dubbo ConsistentHashLoadBalance 源码

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;

/**
 * ConsistentHashLoadBalance
 */
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

    /**
     * Hash nodes name
     */
    public static final String HASH_NODES = "hash.nodes";

    /**
     * Hash arguments name
     */
    public static final String HASH_ARGUMENTS = "hash.arguments";

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // using the hashcode of list to compute the hash only pay attention to the elements in the list
        // 有 bug  1.invokers 老是变动,导致 一直的 在创立 ConsistentHashSelector 
                //      
        int invokersHashCode = invokers.hashCode();
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.identityHashCode != invokersHashCode) {selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 默认虚拟机节点数 160,能够通过
          /**
          dubbo:
            consumer:
                        parameters:
                          hash:
                            nodes: 560 #指定虚构分片结点数  最终 virtualInvokers = nodes * invokerCount
                            arguments: 0  指定的是 哪个参数作为 key
              **/
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
          // 默认取 第 0 个参数 作为 hash 的 key
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {byte[] digest = Bytes.getMD5(address + i);
                    for (int h = 0; h < 4; h++) {long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {String key = toKey(invocation.getArguments());
            byte[] digest = Bytes.getMD5(key);
            return selectForKey(hash(digest, 0));
        }

        private String toKey(Object[] args) {StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {if (i >= 0 && i < args.length) {buf.append(args[i]);
                }
            }
            return buf.toString();}

        private Invoker<T> selectForKey(long hash) {Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();}

        private long hash(byte[] digest, int number) {return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }
    }

}

6.Dubbo 的 一致性 hash 的 bug 和 一些配置参数

6.1 invokers 一直的变动

通过调试 发现 invokers 时不时变动导致 统一在 rehash,其实 很多时候只是 节点的程序变动而已

解决办法:我间接把 invokers 节点数 取出来进行排序后拼接 成一个字符串 进行 计算 hashcode 就不会总变动了

String invokeKey =
        invokers.stream()
                .filter(Node::isAvailable)
                // 依照 ip 和 port 排序
                .sorted(Comparator.comparing(invoke -> invoke.getUrl().getIp()))
                .sorted(Comparator.comparing(invoke -> invoke.getUrl().getPort()))
                .map(invoke -> invoke.getUrl().getIp() + ":" + invoke.getUrl().getPort())
                .collect(Collectors.joining(","));

6.2 留神 isAvailable

invokers 中存在 节点不可用的,如果对于节点不可用的 间接过滤 须要留神 isAvailable

6.3 设置虚构节点发片数

dubbo:
    consumer:
    parameters:
      hash:
        nodes: 560 #指定虚构分片结点数  最终 virtualInvokers = nodes * invokerCount,默认是 160
   

6.4 设置办法的哪个参数作为 hash 的 key

dubbo:
    consumer:
      parameters:
        hash:        
          arguments: 0  #默认就是 0 

7.SPI 扩大 Dubbo 一致性 hash 算法 ExtendConsistentHashLoadBalance

7.1 官网文档

官网文档很具体了

https://dubbo.apache.org/zh/d…

7.2 ExtendConsistentHashLoadBalance 扩大实现

package com.weareint.dispatchservice.extendconsistenthash;

import com.atw.mdc.entity.protocol.webRequest.PlcReadSimpleRequest;
import com.atw.mdc.entity.protocol.webRequest.PlcWriteSimpleRequest;
import com.weareint.dispatchservice.event.ConnectionRouteEvent;
import com.weareint.dispatchservice.event.NodeChangeEvent;
import com.weareint.dispatchservice.event.NodeChangeService;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.Node;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
import org.apache.dubbo.rpc.support.RpcUtils;
import org.springframework.context.ApplicationEventPublisher;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;

/**
 *
 *
 * <pre>
 * 扩大 一致性 Hash 环  次要是把 hash 的 key 只通过 plc 的 deviceCode 进行 hash,而后后续增加 Redis 路由表 进行
 * </pre>
 *
 * @author johnny
 * @date 2021-08-26 5:32 下午
 */
@Slf4j
public class ExtendConsistentHashLoadBalance extends AbstractLoadBalance {

    public static ApplicationEventPublisher publisher;
    public static NodeChangeService nodeChangeService;

    public static final String NAME = "consistenthash";

    /** Hash nodes name */
    public static final String HASH_NODES = "hash.nodes";

    /** Hash arguments name */
    public static final String HASH_ARGUMENTS = "hash.arguments";

    private final ConcurrentMap<String, ExtendConsistentHashLoadBalance.ConsistentHashSelector<?>>
            selectors =
                    new ConcurrentHashMap<
                            String, ExtendConsistentHashLoadBalance.ConsistentHashSelector<?>>();

    public ConcurrentMap<String, ConsistentHashSelector<?>> getSelectors() {return selectors;}

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);
        // String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 只有是 servicekey 就好
        String key = invokers.get(0).getUrl().getServiceKey();
        // String key = invokers.get(0).getUrl().getParameter("remote.application");
        if (log.isDebugEnabled()) {log.info("【remoteApplication:{}】", key);
        }
        // using the hashcode of list to compute the hash only pay attention to the elements in the
        String invokeKey =
                invokers.stream()
                        .filter(Node::isAvailable)
                        // 依照 ip 和 port 排序
                        .sorted(Comparator.comparing(invoke -> invoke.getUrl().getIp()))
                        .sorted(Comparator.comparing(invoke -> invoke.getUrl().getPort()))
                        .map(invoke -> invoke.getUrl().getIp() + ":" + invoke.getUrl().getPort())
                        .collect(Collectors.joining(","));

        if (log.isDebugEnabled()) {log.info("【invokeKey:{}】", invokeKey);
        }
        int invokersHashCode = invokeKey.hashCode();
        ExtendConsistentHashLoadBalance.ConsistentHashSelector<T> selector = null;
        // 同时 submit 可能会有问题 加个锁 可能会有第一次提交生成 虚构节点     
        selector = (ExtendConsistentHashLoadBalance.ConsistentHashSelector<T>) selectors.get(key);
        // 判断是否 invokers 有变动  selector.identityHashCode != invokersHashCode
        if (selector == null || selector.identityHashCode != invokersHashCode) {synchronized (ExtendConsistentHashLoadBalance.class) {
                selector =
                        (ExtendConsistentHashLoadBalance.ConsistentHashSelector<T>)
                                selectors.get(key);
                if (selector == null || selector.identityHashCode != invokersHashCode) {
                    // 这个 isAvailable 要存在 否则有 bug
                    List<Invoker<T>> availableInvoker =
                            invokers.stream()
                                    .filter(Node::isAvailable)
                                    .collect(Collectors.toList());
                    ConsistentHashSelector<T> tConsistentHashSelector =
                            new ConsistentHashSelector<>(availableInvoker, methodName, invokersHashCode);
                    selectors.put(key, tConsistentHashSelector);
                    selector =
                            (ExtendConsistentHashLoadBalance.ConsistentHashSelector<T>)
                                    selectors.get(key);
                    log.info("【new selector by invokeKey : {},availableInvoker:{}】",
                            invokeKey,
                            availableInvoker.stream()
                                    .map(
                                            invoke ->
                                                    invoke.getUrl().getIp()
                                                            + ":"
                                                            + invoke.getUrl().getPort())
                                    .collect(Collectors.joining(",")));

                    NodeChangeEvent event = new NodeChangeEvent(this, tConsistentHashSelector);
                    publisher.publishEvent(event);
                }
            }
        }
        String hashKey = selector.toKey(invocation.getArguments());
        Invoker<T> select = selector.select(hashKey);
        log.info("【plcDeviceCode: {} dispatch to  ip : {}:{}",
                hashKey,
                select.getUrl().getIp(),
                select.getUrl().getPort());
        // deviceCode route table  To Redis
        ConnectionRouteEvent event = new ConnectionRouteEvent(this, hashKey, select, selector);
        publisher.publishEvent(event);
        return select;
    }

    public static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        public final List<Invoker<T>> invokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.invokers = invokers;
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            String[] index =
                    COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(String key) {byte[] digest = md5(key);
            return selectForKey(hash(digest, 0));
        }

        @SuppressWarnings("unchecked")
        public String toKey(Object[] args) {StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {if (i >= 0 && i < args.length) {
                    // 只取 PlcReadSimpleRequest 的 DeviceCode 作为 hash 的 key
                    if (args[i] instanceof ArrayList) {
                        ArrayList<PlcReadSimpleRequest> list =
                                (ArrayList<PlcReadSimpleRequest>) args[i];
                        buf.append(list.get(0).getDeviceCode());
                        // 只取 PlcWriteSimpleRequest DeviceCode 作为 hash 的 key
                    } else if (args[i] instanceof PlcWriteSimpleRequest) {PlcWriteSimpleRequest req = (PlcWriteSimpleRequest) args[i];
                        buf.append(req.getDeviceCode());
                    } else if (args[i] instanceof String) {// PlcConnectionRequest req = (PlcConnectionRequest) args[i];
                        // 敞开连贯
                        String deviceCode = (String) args[i];
                        buf.append(deviceCode);
                    } else {buf.append(args[i]);
                    }
                }
            }
            return buf.toString();}

        private Invoker<T> selectForKey(long hash) {Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();}

        private long hash(byte[] digest, int number) {return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                            | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                            | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                            | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        private byte[] md5(String value) {
            MessageDigest md5;
            try {md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
            md5.update(bytes);
            return md5.digest();}

        public int getIdentityHashCode() {return identityHashCode;}
    }
}

7.3 配置 spi 扩大

extendconsistenthash=com.weareint.dispatchservice.extendconsistenthash.ExtendConsistentHashLoadBalance

7.4 应用自定义的扩大 loadbalance

@DubboReference(loadbalance = "extendconsistenthash")
private IDeviceWriteService deviceWriteService;

7.5 已知扩大

总结

本篇次要解说了 什么是一致性 hash 它有哪些长处和存在的问题,以及 带虚构节点的一致性 hash,最初介绍了一些 dubbo 的一致性 hash 实现,dubbo 自带的有 bug,然而提供了 spi 扩大机制 你能够本人去实现,目前我就是这样去解决的。

参考文章:
https://www.cnblogs.com/fengy…,
https://blog.csdn.net/wudiyon…

欢送大家拜访 集体博客 Johnny 小屋

正文完
 0