分布式数据缓存中的一致性哈希算法

49次阅读

共计 7274 个字符,预计需要花费 19 分钟才能阅读完成。

一致性哈希算法在分布式缓存领域的 MemCached,负载均衡领域的 Nginx 以及各类 RPC 框架中都有广泛的应用,它主要是为了解决传统哈希函数添加哈希表槽位数后要将关键字重新映射的问题。

本文会介绍一致性哈希算法的原理及其实现,并给出其不同哈希函数实现的性能数据对比,探讨 Redis 集群的数据分片实现等,文末会给出实现的具体 github 地址。

Memcached 与客户端分布式缓存

Memcached 是一个高性能的分布式缓存系统,然而服务端没有分布式功能,各个服务器不会相互通信。它的分布式实现依赖于客户端的程序库,这也是 Memcached 的一大特点。比如第三方的 spymemcached 客户端就基于一致性哈希算法实现了其分布式缓存的功能。

其具体步骤如下:

  • 向 Memcached 添加数据,首先客户端的算法根据 key 值计算出该 key 对应的服务器。
  • 服务器选定后,保存缓存数据。
  • 获取数据时,对于相同的 key,客户端的算法可以定位到相同的服务器,从而获取数据。

在这个过程中,客户端的算法首先要保证缓存的数据尽量均匀地分布在各个服务器上,其次是当个别服务器下线或者上线时,会出现数据迁移,应该尽量减少需要迁移的数据量。

客户端算法是客户端分布式缓存性能优劣的关键。

普通的哈希表算法一般都是计算出哈希值后,通过取余操作将 key 值映射到不同的服务器上,但是当服务器数量发生变化时,取余操作的除数发生变化,所有 key 所映射的服务器几乎都会改变,这对分布式缓存系统来说是不可以接收的。

一致性哈希算法能尽可能减少了服务器数量变化所导致的缓存迁移。

哈希算法

首先,一致性哈希算法依赖于普通的哈希算法。大多数同学对哈希算法的理解可能都停留在 JDK 的 hashCode 函数上。其实哈希算法有很多种实现,它们在不同方面都各有优劣,针对不同的场景可以使用不同的哈希算法实现。

下面,我们会介绍一下几款比较常见的哈希算法,并且了解一下它们在分布均匀程度,哈希碰撞概率和性能等方面的优劣。

MD5 算法:全称为 Message-Digest Algorithm 5,用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一,主流编程语言普遍已有 MD5 实现。MD5 的作用是把大容量信息压缩成一种保密的格式(就是把一个任意长度的字节串变换成定长的 16 进制数字串)。常见的文件完整性校验就是使用 MD5。

CRC 算法:全称为 CyclicRedundancyCheck,中文名称为循环冗余校验。它是一类重要的,编码和解码方法简单,检错和纠错能力强的哈希算法,在通信领域广泛地用于实现差错控制。

MurmurHash 算法:高运算性能,低碰撞率,由 Austin Appleby 创建于 2008 年,现已应用到 Hadoop、libstdc++、nginx、libmemcached 等开源系统。Java 界中 Redis,Memcached,Cassandra,HBase,Lucene 和 Guava 都在使用它。

FNV 算法:全称为 Fowler-Noll-Vo 算法,是以三位发明人 Glenn Fowler,Landon Curt Noll,Phong Vo 的名字来命名的,最早在 1991 年提出。FNV 能快速 hash 大量数据并保持较小的冲突率,它的高度分散使它适用于 hash 一些非常相近的字符串,比如 URL,hostname,文件名,text 和 IP 地址等。

Ketama 算法:一致性哈希算法的实现之一,其他的哈希算法有通用的一致性哈希算法实现,只不过是替换了哈希映射函数而已,但 Ketama 是一整套的流程,我们将在后面介绍。

一致性哈希算法

下面,我们以分布式缓存场景为例,分析一下一致性哈希算法环的原理。

首先将缓存服务器(ip + 端口号)进行哈希,映射成环上的一个节点,计算出缓存数据 key 值的 hash key,同样映射到环上,并顺时针选取最近的一个服务器节点作为该缓存应该存储的服务器。具体实现见后续的章节。

比如说,当存在 A,B,C,D 四个缓存服务器时,它们及其 key 值为 1 的缓存数据在一致性哈希环上的位置如下图所示,根据顺时针取最近一个服务器节点的规则,该缓存数据应该存储在服务器 B 上。

当要存储一个 key 值为 4 的缓存数据时,它在一致性哈希环上的位置如下所示,所以它应该存储在服务器 C 上。

类似的,key 值为 5,6 的数据应该存在服务 D 上,key 值为 7,8 的数据应该存储在服务 A 上。

此时,服务器 B 宕机下线,服务器 B 中存储的缓存数据要进行迁移,但由于一致性哈希环的存在,只需要迁移 key 值为 1 的数据,其他的数据的存储服务器不会发生变化。这也是一致性哈希算法比取余映射算法出色的地方。

由于服务器 B 下线,key 值为 1 的数据顺时针最近的服务器是 C,所以数据存迁移到服务器 C 上。

现实情况下,服务器在一致性哈希环上的位置不可能分布的这么均匀,导致了每个节点实际占据环上的区间大小不一。

这种情况下,可以增加虚节点来解决。通过增加虚节点,使得每个节点在环上所“管辖”的区域更加均匀。这样就既保证了在节点变化时,尽可能小的影响数据分布的变化,而同时又保证了数据分布的均匀。

具体实现

下面我们实现 Memcached 分布式缓存场景下的一致性哈希算法,并给出具体的测试性能数据。该实现借鉴了 kiritomoe 博文中的实现和 spymemcached 客户端代码。具体实现请看我的 github,地址为 https://github.com/ztelur/con…。

NodeLocator 是分布式缓存场景下一致性哈希算法的抽象,它有一个 getPrimary 函数,接收一个缓存数据的 key 值,输出存储该缓存数据的服务器实例。

public interface NodeLocator {MemcachedNode getPrimary(String k);
}

下面是通用的一致性哈希算法的实现,它使用 TreeMap 作为一致性哈希环的数据结构,其 ceilingEntry 函数可以获取环上最近的一个节点。buildConsistentHashRing 函数中包含了构建一致性哈希环的过程,默认加入了 12 个虚拟节点。

public class ConsistentHashNodeLocator implements NodeLocator {
    private final static int VIRTUAL_NODE_SIZE = 12;
    private final static String VIRTUAL_NODE_SUFFIX = "-";

    private volatile TreeMap<Long, MemcachedNode> hashRing;
    private final HashAlgorithm hashAlg;

    public ConsistentHashNodeLocator(List<MemcachedNode> nodes, HashAlgorithm hashAlg) {
        this.hashAlg = hashAlg;
        this.hashRing = buildConsistentHashRing(hashAlg, nodes);
    }


    @Override
    public MemcachedNode getPrimary(String k) {long hash = hashAlg.hash(k);
        return getNodeForKey(hashRing, hash);
    }

    private MemcachedNode getNodeForKey(TreeMap<Long, MemcachedNode> hashRing, long hash) {
        /* 向右找到第一个 key */
        Map.Entry<Long, MemcachedNode> locatedNode = hashRing.ceilingEntry(hash);
        /* 想象成为一个环,超出尾部取出第一个 */
        if (locatedNode == null) {locatedNode = hashRing.firstEntry();
        }
        return locatedNode.getValue();}

    private TreeMap<Long, MemcachedNode> buildConsistentHashRing(HashAlgorithm hashAlgorithm, List<MemcachedNode> nodes) {TreeMap<Long, MemcachedNode> virtualNodeRing = new TreeMap<>();
        for (MemcachedNode node : nodes) {for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                // 新增虚拟节点的方式如果有影响,也可以抽象出一个由物理节点扩展虚拟节点的类
                virtualNodeRing.put(hashAlgorithm.hash(node.getSocketAddress().toString() + VIRTUAL_NODE_SUFFIX + i), node);
            }
        }
        return virtualNodeRing;
    }
}

getPrimary 函数中,首先使用 HashAlgorithm 计算出 key 值对应的哈希值,然后调用 getNodeForKey 函数从 TreeMap 中获取对应的最近的服务器节点实例。

HashAlgorithm 是对哈希算法的抽象,一致性哈希算法可以使用各种普通的哈希算法,比如说 CRC,MurmurHash 和 FNV 等。下面,我们将会对比各种哈希算法给该实现带来的性能差异性。

性能测试

测试数据是评价一个算法好坏的最为真实有效的方法,量化的思维模式一定要有,这也是程序员进阶的法宝之一。我们以下面四个量化的指标对基于不同哈希函数的一致性哈希算法进行评测。

  • 统计每个服务器节点存储的缓存数量,计算方差和标准差。测量缓存分布均匀情况,我们可以模拟 50000 个缓存数据,分配到 100 个服务器,测试最后个节点存储缓存数据量的方差和标准差。
  • 随机 下线 10% 的服务器,重新分配缓存,统计缓存迁移比率。测量节点上下线的情况,我们可以模拟 50000 个缓存数据,分配到 100 个指定服务器,之后 随机 下线 10 个服务器并重新分配这 50000 个数据,统计缓存分配到不同服务器的比例,也就是迁移比率。
  • 使用 JMH 对不同哈希算法的执行效率进行对比。

具体评测算法如下。

public class NodeLocatorTest {

    /**
     * 测试分布的离散情况
     */
    @Test
    public void testDistribution() {List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        // 使用不同的 DefaultHashAlgorithm 进行测试,得出不同的数据
        NodeLocator nodeLocator = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.NATIVE_HASH);
        // 构造 50000 随机请求
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {keys.add(UUID.randomUUID().toString());
        }
        // 统计分布
        AtomicLongMap<MemcachedNode> atomicLongMap = AtomicLongMap.create();
        for (MemcachedNode server : servers) {atomicLongMap.put(server, 0);
        }
        for (String key : keys) {MemcachedNode node = nodeLocator.getPrimary(key);
            atomicLongMap.getAndIncrement(node);
        }
        System.out.println(StatisticsUtil.variance(atomicLongMap.asMap().values().toArray(new Long[]{})));
        System.out.println(StatisticsUtil.standardDeviation(atomicLongMap.asMap().values().toArray(new Long[]{})));

    }

    /**
     * 测试节点新增删除后的变化程度
     */
    @Test
    public void testNodeAddAndRemove() {List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        // 随机下线 10 个服务器, 先 shuffle,然后选择 0 到 90,简单模仿随机计算。Collections.shuffle(servers);
        List<MemcachedNode> serverChanged = servers.subList(0, 90);
        NodeLocator loadBalance = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.NATIVE_HASH);
        NodeLocator changedLoadBalance = new ConsistentHashNodeLocator(serverChanged, DefaultHashAlgorithm.NATIVE_HASH);

        // 构造 50000 随机请求
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {keys.add(UUID.randomUUID().toString());
        }
        int count = 0;
        for (String invocation : keys) {MemcachedNode origin = loadBalance.getPrimary(invocation);
            MemcachedNode changed = changedLoadBalance.getPrimary(invocation);
           // 统计发生变化的数值
            if (!origin.getSocketAddress().equals(changed.getSocketAddress())) count++;
        }
        System.out.println(count / 50000D);
    }
    static String[] ips = {...};
}

JMH 的测试脚本如下所示。

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class JMHBenchmark {

    private NodeLocator nodeLocator;
    private List<String> keys;

    @Benchmark
    public void test() {for (String key : keys) {MemcachedNode node = nodeLocator.getPrimary(key);
        }
    }

    public static void main(String[] args) throws RunnerException {Options opt = new OptionsBuilder()
                .include(JMHBenchmark.class.getSimpleName())
                .forks(1)
                .warmupIterations(5)
                .measurementIterations(5)
                .build();
        new Runner(opt).run();}

    @Setup
    public void prepare() {List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        nodeLocator = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.MURMUR_HASH);
        // 构造 50000 随机请求
        keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {keys.add(UUID.randomUUID().toString());
        }
    }

    @TearDown
    public void shutdown() {}
    static String[] ips = {...};
}

分别测试了 JDK 哈希算法,FNV132 算法,CRC 算法,MurmurHash 算法和 Ketama 算法,分别对应 DefaultHashAlgorithmNATIVE_HASHFNV1_32_HASHCRC_HASHMURMUR_HASHKETAMA_HASH。具体数据如下所示。

虚拟槽分区

有些文章说,Redis 集群并没有使用一致性哈希算法,而是使用虚拟槽分区算法。但是外网(地址见文末)上都说 Redis 使用的虚拟槽分区只是一致性哈希算法的变种,虚拟槽可以允许 Redis 动态扩容。

或许只有去了解一下 Redis 的源码才能对这个问题作出准确的回答。请了解的同学积极留言解答,谢谢。

github 地址:https://github.com/ztelur/con…
redis 分布式讨论的地址:https://www.reddit.com/r/redi…

参考

  • https://jistol.github.io/soft…
  • https://mp.weixin.qq.com/s/oe…

正文完
 0