关于redis:Redis数据倾斜与JD开源hotkey源码分析揭秘

38次阅读

共计 46162 个字符,预计需要花费 116 分钟才能阅读完成。

1 前言
之前旁边的小伙伴问我热点数据相干问题,在给他粗略地解说一波 redis 数据歪斜的案例之后,本人也顺道回顾了一些对于热点数据处理的方法论,同时也想起去年所学习 JD 开源我的项目 hotkey——专门用来解决热点数据问题的框架。在这里联合两者所关联到的知识点,通过几个小图和局部粗略的解说,来让大家理解相干方法论以及 hotkey 的源码解析。

2 Redis 数据歪斜
2.1 定义与危害
先说说数据歪斜的定义,借用百度词条的解释:

对于集群零碎,个别缓存是分布式的,即不同节点负责肯定范畴的缓存数据。咱们把缓存数据分散度不够,导致大量的缓存数据集中到了一台或者几台服务节点上,称为数据歪斜。一般来说数据歪斜是因为负载平衡施行的成果不好引起的。

从下面的定义中能够得悉,数据歪斜的起因个别是因为 LB 的成果不好,导致局部节点数据量十分集中。

那这又会有什么危害呢?

如果产生了数据歪斜,那么保留了大量数据,或者是保留了热点数据的实例的解决压力就会增大,速度变慢,甚至还可能会引起这个实例的内存资源耗尽,从而解体。这是咱们在利用切片集群时要防止的。

2.2 数据歪斜的分类
2.2.1 数据量歪斜(写入歪斜)

1. 图示

如图,在某些状况下,实例上的数据分布不平衡,某个实例上的数据特地多。

2.bigkey 导致歪斜

某个实例上正好保留了 bigkey。bigkey 的 value 值很大(String 类型),或者是 bigkey 保留了大量汇合元素(汇合类型),会导致这个实例的数据量减少,内存资源耗费也相应减少。

应答办法

在业务层生成数据时,要尽量避免把过多的数据保留在同一个键值对中。
如果 bigkey 正好是汇合类型,还有一个办法,就是把 bigkey 拆分成很多个小的汇合类型数据,扩散保留在不同的实例上。
3.Slot 调配不均导致歪斜

先简略的介绍一下 slot 的概念,slot 其实全名是 Hash Slot(哈希槽),在 Redis Cluster 切片集群中一共有 16384 个 Slot,这些哈希槽相似于数据分区,每个键值对都会依据它的 key,被映射到一个哈希槽中。Redis Cluster 计划采纳哈希槽来解决数据和实例之间的映射关系。

一张图来解释,数据、哈希槽、实例这三者的映射散布状况。

这里的 CRC16(city)%16384 能够简略的了解为将 key1 依据 CRC16 算法取 hash 值而后对 slot 个数取模,失去的就是 slot 地位为 14484,他所对应的实例节点是第三个。

运维在构建切片集群时候,须要手动调配哈希槽,并且把 16384 个槽都调配完,否则 Redis 集群无奈失常工作。因为是手动调配,则可能会导致局部实例所调配的 slot 过多,导致数据歪斜。

应答办法

应用 CLUSTER SLOTS 命令来查

看 slot 分配情况,应用 CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE 这三个命令来进行 slot 数据的迁徙,具体内容不再这里细说,感兴趣的同学能够自行学习一下。

4.Hash Tag 导致歪斜

Hash Tag 定义 : 指当一个 key 蕴含 {} 的时候,就不对整个 key 做 hash,而仅对 {} 包含的字符串做 hash。
假如 hash 算法为 sha1。对 user:{user1}:ids 和 user:{user1}:tweets,其 hash 值都等同于 sha1(user1)。
Hash Tag 劣势 : 如果不同 key 的 Hash Tag 内容都是一样的,那么,这些 key 对应的数据会被映射到同一个 Slot 中,同时会被调配到同一个实例上。
Hash Tag 劣势 : 如果不合理应用,会导致大量的数据可能被集中到一个实例上产生数据歪斜,集群中的负载不平衡。
2.2.2 数据拜访歪斜(读取歪斜 - 热 key 问题)

一般来说数据拜访歪斜就是热 key 问题导致的,如何解决 redis 热 key 问题也是面试中常会问到的。所以理解相干概念及方法论也是不可或缺的一环。

1. 图示

如图,尽管每个集群实例上的数据量相差不大,然而某个实例上的数据是热点数据,被拜访得十分频繁。

然而为啥会有热点数据的产生呢?

2. 产生热 key 的起因及危害

1)用户生产的数据远大于生产的数据(热卖商品、热点新闻、热点评论、明星直播)。

在日常工作生存中一些突发的事件,例如:双十一期间某些热门商品的提价促销,当这其中的某一件商品被数万次点击浏览或者购买时,会造成一个较大的需求量,这种状况下就会造成热点问题。

同理,被大量刊发、浏览的热点新闻、热点评论、明星直播等,这些典型的读多写少的场景也会产生热点问题。

2)申请分片集中,超过单 Server 的性能极限。

在服务端读数据进行拜访时,往往会对数据进行分片切分,此过程中会在某一主机 Server 上对相应的 Key 进行拜访,当拜访超过 Server 极限时,就会导致热点 Key 问题的产生。

如果热点过于集中,热点 Key 的缓存过多,超过目前的缓存容量时,就会导致缓存分片服务被打垮景象的产生。当缓存服务解体后,此时再有申请产生,会缓存到后盾 DB 上,因为 DB 自身性能较弱,在面临大申请时很容易产生申请穿透景象,会进一步导致雪崩景象,重大影响设施的性能。

3. 罕用的热 key 问题解决办法:

解决方案一: 备份热 key

能够把热点数据复制多份,在每一个数据正本的 key 中减少一个随机后缀,让它和其它正本数据不会被映射到同一个 Slot 中。

这里相当于把一份数据复制到其余实例上,这样在拜访的时候也减少随机前缀,将对一个实例的拜访压力,均摊到其余实例上

例如: 咱们在放入缓存时就将对应业务的缓存 key 拆分成多个不同的 key。如下图所示,咱们首先在更新缓存的一侧,将 key 拆成 N 份,比方一个 key 名字叫做”good_100”,那咱们就能够把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增时都须要去改变这 N 个 key,这一步就是拆 key。

对于 service 端来讲,咱们就须要想方法尽量将本人拜访的流量足够的平均。

如何给本人行将拜访的热 key 上退出后缀?几种方法,依据本机的 ip 或 mac 地址做 hash,之后的值与拆 key 的数量做取余,最终决定拼接成什么样的 key 后缀,从而打到哪台机器上;服务启动时的一个随机数对拆 key 的数量做取余。

伪代码如下:

const M = N * 2
// 生成随机数
random = GenRandom(0, M)
// 结构备份新 key
bakHotKey = hotKey +“_”+ random
data = redis.GET(bakHotKey)
if data == NULL {data = GetFromDB()
  redis.SET(bakHotKey, expireTime + GenRandom(0,5))
}

解决方案二: 本地缓存 + 动静计算主动发现热点缓存

该计划通过被动发现热点并对其进行存储来解决热点 Key 的问题。首先 Client 也会拜访 SLB,并且通过 SLB 将各种申请散发至 Proxy 中,Proxy 会依照基于路由的形式将申请转发至后端的 Redis 中。

在热点 key 的解决上是采纳在服务端减少缓存的形式进行。具体来说就是在 Proxy 上减少本地缓存,本地缓存采纳 LRU 算法来缓存热点数据,后端节点减少热点数据计算模块来返回热点数据。

Proxy 架构的次要有以下长处:

Proxy 本地缓存热点,读能力可程度扩大
DB 节点定时计算热点数据汇合
DB 反馈 Proxy 热点数据
对客户端齐全通明,不需做任何兼容
热点数据的发现与存储

对于热点数据的发现,首先会在一个周期内对 Key 进行申请统计,在达到申请量级后会对热点 Key 进行热点定位,并将所有的热点 Key 放入一个小的 LRU 链表内,在通过 Proxy 申请进行拜访时,若 Redis 发现待访点是一个热点,就会进入一个反馈阶段,同时对该数据进行标记。

能够应用一个 etcd 或者 zk 集群来存储反馈的热点数据,而后本地所有节点监听该热点数据,进而加载到本地 JVM 缓存中。

热点数据的获取

在热点 Key 的解决上次要分为写入跟读取两种模式,在数据写入过程当 SLB 收到数据 K1 并将其通过某一个 Proxy 写入一个 Redis,实现数据的写入。

假若通过后端热点模块计算发现 K1 成为热点 key 后,Proxy 会将该热点进行缓存,当下次客户端再进行拜访 K1 时,能够不经 Redis。

最初因为 proxy 是能够程度裁减的,因而能够任意加强热点数据的拜访能力。

最佳成熟计划: JD 开源 hotKey 这是目前较为成熟的主动探测热 key、分布式一致性缓存解决方案。原理就是在 client 端做洞察,而后上报对应 hotkey,server 端检测到后,将对应 hotkey 下发到对应服务端做本地缓存,并且能保障本地缓存和近程缓存的一致性。

在这里咱们就不细谈了,这篇文章的第三局部:JD 开源 hotkey 源码解析外面会率领大家理解其整体原理。

3 JD 开源 hotkey—主动探测热 key、分布式一致性缓存解决方案
3.1 解决痛点
从下面可知,热点 key 问题在并发量比拟高的零碎中 (特地是做秒杀流动) 呈现的频率会比拟高,对系统带来的危害也很大。

那么针对此,hotkey 诞生的目标是什么?须要解决的痛点是什么?以及它的实现原理。

在这里援用我的项目上的一段话来概述: 对任意突发性的无奈事后感知的热点数据,包含并不限于热点数据(如突发大量申请同一个商品)、热用户(如歹意爬虫刷子)、热接口(突发海量申请同一个接口)等,进行毫秒级精准探测到。而后对这些热数据、热用户等,推送到所有服务端 JVM 内存中,以大幅加重对后端数据存储层的冲击,并能够由使用者决定如何调配、应用这些热 key(譬如对热商品做本地缓存、对热用户进行回绝拜访、对热接口进行熔断或返回默认值)。这些热数据在整个服务端集群内放弃一致性,并且业务隔离。

外围性能:热数据探测并推送至集群各个服务器

3.2 集成形式
集成形式在这里就不详述了,感兴趣的同学能够自行搜寻。

3.3 源码解析
3.3.1 架构简介

1. 全景图一览

流程介绍:

客户端通过援用 hotkey 的 client 包,在启动的时候上报本人的信息给 worker,同时和 worker 之间建设长连贯。定时拉取配置核心下面的规定信息和 worker 集群信息。
客户端调用 hotkey 的 ishot()的办法来首先匹配规定,而后统计是不是热 key。
通过定时工作把热 key 数据上传到 worker 节点。
worker 集群在收取到所有对于这个 key 的数据当前(因为通过 hash 来决定 key 上传到哪个 worker 的,所以同一个 key 只会在同一个 worker 节点上),在和定义的规定进行匹配后判断是不是热 key,如果是则推送给客户端,实现本地缓存。
2. 角色形成

这里间接借用作者的形容:

1)etcd 集群 etcd 作为一个高性能的配置核心,能够以极小的资源占用,提供高效的监听订阅服务。次要用于寄存规定配置,各 worker 的 ip 地址,以及探测出的热 key、手工增加的热 key 等。

2)client 端 jar 包就是在服务中增加的援用 jar,引入后,就能够以便捷的形式去判断某 key 是否热 key。同时,该 jar 实现了 key 上报、监听 etcd 里的 rule 变动、worker 信息变动、热 key 变动,对热 key 进行本地 caffeine 缓存等。

3) worker 端集群 worker 端是一个独立部署的 Java 程序,启动后会连贯 etcd,并定期上报本人的 ip 信息,供 client 端获取地址并进行长连贯。之后,次要就是对各个 client 发来的待测 key 进行累加计算,当达到 etcd 里设定的 rule 阈值后,将热 key 推送到各个 client。

4) dashboard 控制台控制台是一个带可视化界面的 Java 程序,也是连贯到 etcd,之后在控制台设置各个 APP 的 key 规定,譬如 2 秒 20 次算热。而后当 worker 探测进去热 key 后,会将 key 发往 etcd,dashboard 也会监听热 key 信息,进行入库保留记录。同时,dashboard 也能够手工增加、删除热 key,供各个 client 端监听。

3.hotkey 工程构造

3.3.2 client 端

次要从上面三个方面来解析源码:

1. 客户端启动器

1)启动形式

@PostConstruct
public void init() {ClientStarter.Builder builder = new ClientStarter.Builder();
    ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();
    starter.startPipeline();}

appName: 是这个利用的名称,个别为 ${spring.application.name}的值,后续所有的配置都以此为结尾

etcd: 是 etcd 集群的地址,用逗号分隔,配置核心。

还能够看到 ClientStarter 实现了建造者模式,使代码更为简介。

2)外围入口
com.jd.platform.hotkey.client.ClientStarter#startPipeline

/**
 * 启动监听 etcd
 */
public void startPipeline() {JdLogger.info(getClass(), "etcdServer:" + etcdServer);
    // 设置 caffeine 的最大容量
    Context.CAFFEINE_SIZE = caffeineSize;

    // 设置 etcd 地址
    EtcdConfigFactory.buildConfigCenter(etcdServer);
    // 开始定时推送
    PushSchedulerStarter.startPusher(pushPeriod);
    PushSchedulerStarter.startCountPusher(10);
    // 开启 worker 重连器
    WorkerRetryConnector.retryConnectWorkers();

    registEventBus();

    EtcdStarter starter = new EtcdStarter();
    // 与 etcd 相干的监听都开启
    starter.start();}

该办法次要有五个性能:

① 设置本地缓存 (caffeine) 的最大值,并创立 etcd 实例

// 设置 caffeine 的最大容量
Context.CAFFEINE_SIZE = caffeineSize;

// 设置 etcd 地址
EtcdConfigFactory.buildConfigCenter(etcdServer);

caffeineSize 是本地缓存的最大值,在启动的时候能够设置,不设置默认为 200000。
etcdServer 是下面说的 etcd 集群地址。

Context 能够了解为一个配置类,外面就蕴含两个字段:

public class Context {
    public static String APP_NAME;

    public static int CAFFEINE_SIZE;
}

EtcdConfigFactory 是 ectd 配置核心的工厂类

public class EtcdConfigFactory {
    private static IConfigCenter configCenter;

    private EtcdConfigFactory() {}

    public static IConfigCenter configCenter() {return configCenter;}

    public static void buildConfigCenter(String etcdServer) {
        // 连贯多个时,逗号分隔
        configCenter = JdEtcdBuilder.build(etcdServer);
    }
}

通过其 configCenter()办法获取创立 etcd 实例对象,IConfigCenter 接口封装了 etcd 实例对象的行为(包含根本的 crud、监控、续约等)

② 创立并启动定时工作:PushSchedulerStarter

// 开始定时推送
PushSchedulerStarter.startPusher(pushPeriod);// 每 0.5 秒推送一次待测 key
PushSchedulerStarter.startCountPusher(10);// 每 10 秒推送一次数量统计, 不可配置

pushPeriod 是推送的间隔时间,能够再启动的时候设置,最小为 0.05s,推送越快,探测的越密集,会越快探测进去,但对 client 资源耗费相应增大

PushSchedulerStarter 类

/**
     * 每 0.5 秒推送一次待测 key
     */
    public static void startPusher(Long period) {if (period == null || period <= 0) {period = 500L;}
        @SuppressWarnings("PMD.ThreadPoolCreationRule")
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 热 key 的收集器
            IKeyCollector<HotKeyModel, HotKeyModel> collectHK = KeyHandlerFactory.getCollector();
            // 这里相当于每 0.5 秒,通过 netty 来给 worker 来推送收集到的热 key 的信息,次要是一些热 key 的元数据信息(热 key 起源的 app 和 key 的类型和是否是删除事件,还有该热 key 的上报次数)
            // 这外面还有就是该热 key 在每次上报的时候都会生成一个全局的惟一 id,还有该热 key 每次上报的创立工夫是在 netty 发送的时候来生成,同一批次的热 key 工夫是雷同的
            List<HotKeyModel> hotKeyModels = collectHK.lockAndGetResult();
            if(CollectionUtil.isNotEmpty(hotKeyModels)){
                // 积攒了半秒的 key 汇合,依照 hash 散发到不同的 worker
                KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);
                collectHK.finishOnce();}

        },0, period, TimeUnit.MILLISECONDS);
    }
    /**
     * 每 10 秒推送一次数量统计
     */
    public static void startCountPusher(Integer period) {if (period == null || period <= 0) {period = 10;}
        @SuppressWarnings("PMD.ThreadPoolCreationRule")
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));
        scheduledExecutorService.scheduleAtFixedRate(() -> {IKeyCollector<KeyHotModel, KeyCountModel> collectHK = KeyHandlerFactory.getCounter();
            List<KeyCountModel> keyCountModels = collectHK.lockAndGetResult();
            if(CollectionUtil.isNotEmpty(keyCountModels)){
                // 积攒了 10 秒的数量,依照 hash 散发到不同的 worker
                KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);
                collectHK.finishOnce();}
        },0, period, TimeUnit.SECONDS);
    }

从下面两个办法可知,都是通过定时线程池来实现定时工作的,都是守护线程。

咱们重点关注一下 KeyHandlerFactory 类,它是 client 端设计的一个比拟奇妙的中央,从类名上直译为 key 解决工厂。具体的实例对象是 DefaultKeyHandler:

public class DefaultKeyHandler {
    // 推送 HotKeyMsg 音讯到 Netty 的推送者
    private IKeyPusher iKeyPusher = new NettyKeyPusher();
    // 待测 key 的收集器,这外面蕴含两个 map,key 次要是热 key 的名字,value 次要是热 key 的元数据信息(比方: 热 key 起源的 app 和 key 的类型和是否是删除事件)
    private IKeyCollector<HotKeyModel, HotKeyModel> iKeyCollector = new TurnKeyCollector();
    // 数量收集器, 这外面蕴含两个 map,这外面 key 是相应的规定,HitCount 外面是这个规定的总拜访次数和热后拜访次数
    private IKeyCollector<KeyHotModel, KeyCountModel> iKeyCounter = new TurnCountCollector();

    public IKeyPusher keyPusher() {return iKeyPusher;}
    public IKeyCollector<HotKeyModel, HotKeyModel> keyCollector() {return iKeyCollector;}
    public IKeyCollector<KeyHotModel, KeyCountModel> keyCounter() {return iKeyCounter;}
}

这外面有三个成员对象,别离是封装推送音讯到 netty 的 NettyKeyPusher、待测 key 收集器 TurnKeyCollector、数量收集器 TurnCountCollector,其中后两者都实现了接口 IKeyCollector,能对 hotkey 的解决起到无效的聚合,充分体现了代码的高内聚。
先来看看封装推送音讯到 netty 的 NettyKeyPusher:

/**
 * 将 msg 推送到 netty 的 pusher
 * @author wuweifeng wrote on 2020-01-06
 * @version 1.0
 */
public class NettyKeyPusher implements IKeyPusher {
    @Override
    public void send(String appName, List<HotKeyModel> list) {
        // 积攒了半秒的 key 汇合,依照 hash 散发到不同的 worker
        long now = System.currentTimeMillis();

        Map<Channel, List<HotKeyModel>> map = new HashMap<>();
        for(HotKeyModel model : list) {model.setCreateTime(now);
            Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());
            if (channel == null) {continue;}
            List<HotKeyModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());
            newList.add(model);
        }

        for (Channel channel : map.keySet()) {
            try {List<HotKeyModel> batch = map.get(channel);
                HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);
                hotKeyMsg.setHotKeyModels(batch);
                channel.writeAndFlush(hotKeyMsg).sync();} catch (Exception e) {
                try {InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
                    JdLogger.error(getClass(),"flush error" + insocket.getAddress().getHostAddress());
                } catch (Exception ex) {JdLogger.error(getClass(),"flush error");
                }
            }
        }
    }
    @Override
    public void sendCount(String appName, List<KeyCountModel> list) {
        // 积攒了 10 秒的数量,依照 hash 散发到不同的 worker
        long now = System.currentTimeMillis();
        Map<Channel, List<KeyCountModel>> map = new HashMap<>();
        for(KeyCountModel model : list) {model.setCreateTime(now);
            Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());
            if (channel == null) {continue;}
            List<KeyCountModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());
            newList.add(model);
        }
        for (Channel channel : map.keySet()) {
            try {List<KeyCountModel> batch = map.get(channel);
                HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);
                hotKeyMsg.setKeyCountModels(batch);
                channel.writeAndFlush(hotKeyMsg).sync();} catch (Exception e) {
                try {InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
                    JdLogger.error(getClass(),"flush error" + insocket.getAddress().getHostAddress());
                } catch (Exception ex) {JdLogger.error(getClass(),"flush error");
                }
            }
        }
    }
}

send(String appName, List list)
次要是将 TurnKeyCollector 收集的待测 key 通过 netty 推送给 worker,HotKeyModel 对象次要是一些热 key 的元数据信息(热 key 起源的 app 和 key 的类型和是否是删除事件,还有该热 key 的上报次数)

sendCount(String appName, List list)
次要是将 TurnCountCollector 收集的规定所对应的 key 通过 netty 推送给 worker,KeyCountModel 对象次要是一些 key 所对应的规定信息以及拜访次数等

WorkerInfoHolder.chooseChannel(model.getRuleKey())
依据 hash 算法获取 key 对应的服务器,散发到对应服务器相应的 Channel 连贯,所以服务端能够程度有限扩容,毫无压力问题。

再来剖析一下 key 收集器:TurnKeyCollector 与 TurnCountCollector:
实现 IKeyCollector 接口:

/**
 * 对 hotkey 进行聚合
 * @author wuweifeng wrote on 2020-01-06
 * @version 1.0
 */
public interface IKeyCollector<T, V> {
    /**
     * 锁定后的返回值
     */
    List<V> lockAndGetResult();
    /**
     * 输出的参数
     */
    void collect(T t);

    void finishOnce();}

lockAndGetResult()
次要是获取返回 collect 办法收集的信息,并将本地暂存的信息清空,不便下个统计周期积攒数据。

collect(T t)
顾名思义他是收集 api 调用的时候,将收集的到 key 信息放到本地存储。

finishOnce()
该办法目前实现都是空,无需关注。

待测 key 收集器:TurnKeyCollector

public class TurnKeyCollector implements IKeyCollector<HotKeyModel, HotKeyModel> {// 这 map 外面的 key 次要是热 key 的名字,value 次要是热 key 的元数据信息(比方: 热 key 起源的 app 和 key 的类型和是否是删除事件)
    private ConcurrentHashMap<String, HotKeyModel> map0 = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, HotKeyModel> map1 = new ConcurrentHashMap<>();
    private AtomicLong atomicLong = new AtomicLong(0);

    @Override
    public List<HotKeyModel> lockAndGetResult() {
        // 自增后,对应的 map 就会进行被写入,期待被读取
        atomicLong.addAndGet(1);
        List<HotKeyModel> list;
        // 能够察看这里与 collect 办法外面的雷同地位,会发现一个是操作 map0 一个是操作 map1,这样保障在读 map 的时候,不会阻塞写 map,// 两个 map 同时提供轮流提供读写能力,设计的很奇妙,值得学习
        if (atomicLong.get() % 2 == 0) {list = get(map1);
            map1.clear();} else {list = get(map0);
            map0.clear();}
        return list;
    }
    private List<HotKeyModel> get(ConcurrentHashMap<String, HotKeyModel> map) {return CollectionUtil.list(false, map.values());
    }
    @Override
    public void collect(HotKeyModel hotKeyModel) {String key = hotKeyModel.getKey();
        if (StrUtil.isEmpty(key)) {return;}
        if (atomicLong.get() % 2 == 0) {
            // 不存在时返回 null 并将 key-value 放入,已有雷同 key 时,返回该 key 对应的 value,并且不笼罩
            HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);
            if (model != null) {
                // 减少该 hotMey 上报的次数
                model.add(hotKeyModel.getCount());
            }
        } else {HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);
            if (model != null) {model.add(hotKeyModel.getCount());
            }
        }
    }
    @Override
    public void finishOnce() {}
}

能够看到该类中有两个 ConcurrentHashMap 和一个 AtomicLong,通过对 AtomicLong 来自增,而后对 2 取模,来别离管制两个 map 的读写能力,保障每个 map 都能做读写,并且同一个 map 不能同时读写,这样能够防止并发汇合读写不阻塞,这一块无锁化的设计还是十分奇妙的,极大的进步了收集的吞吐量。

key 数量收集器:TurnCountCollector
这里的设计与 TurnKeyCollector 大同小异,咱们就不细谈了。值得一提的是它外面有个并行处理的机制,当收集的数量超过 DATA_CONVERT_SWITCH_THRESHOLD=5000 的阈值时,lockAndGetResult 解决是应用 java Stream 并行流解决,晋升解决的效率。

③ 开启 worker 重连器

// 开启 worker 重连器
WorkerRetryConnector.retryConnectWorkers();
public class WorkerRetryConnector {

    /**
     * 定时去重连没连上的 workers
     */
    public static void retryConnectWorkers() {@SuppressWarnings("PMD.ThreadPoolCreationRule")
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));
        // 开启拉取 etcd 的 worker 信息,如果拉取失败,则定时持续拉取
        scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);
    }

    private static void reConnectWorkers() {List<String> nonList = WorkerInfoHolder.getNonConnectedWorkers();
        if (nonList.size() == 0) {return;}
        JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);
        NettyClient.getInstance().connect(nonList);// 这里会触发 netty 连贯办法 channelActive
    }
}

也是通过定时线程来执行,默认工夫距离是 30s,不可设置。
通过 WorkerInfoHolder 来管制 client 的 worker 连贯信息,连贯信息是个 List,用的 CopyOnWriteArrayList,毕竟是一个读多写少的场景,相似与元数据信息。

/**
 * 保留 worker 的 ip 地址和 Channel 的映射关系,这是有序的。每次 client 发送音讯时,都会依据该 map 的 size 进行 hash
 * 如 key- 1 就发送到 workerHolder 的第 1 个 Channel 去,key- 2 就发到第 2 个 Channel 去
 */
private static final List<Server> WORKER_HOLDER = new CopyOnWriteArrayList<>();
④ 注册 EventBus 事件订阅者

private void registEventBus() {
    //netty 连接器会关注 WorkerInfoChangeEvent 事件
    EventBusCenter.register(new WorkerChangeSubscriber());
    // 热 key 探测回调关注热 key 事件
    EventBusCenter.register(new ReceiveNewKeySubscribe());
    //Rule 的变动的事件
    EventBusCenter.register(new KeyRuleHolder());
}

应用 guava 的 EventBus 事件音讯总线,利用公布 / 订阅者模式来对我的项目进行解耦。它能够利用很少的代码,来实现多组件间通信。

基本原理图如下:

监听 worker 信息变动:WorkerChangeSubscriber

/**
 * 监听 worker 信息变动
 */
@Subscribe
public void connectAll(WorkerInfoChangeEvent event) {List<String> addresses = event.getAddresses();
    if (addresses == null) {addresses = new ArrayList<>();
    }

    WorkerInfoHolder.mergeAndConnectNew(addresses);
}
/**
 * 当 client 与 worker 的连贯断开后,删除
 */
@Subscribe
public void channelInactive(ChannelInactiveEvent inactiveEvent) {
    // 获取断线的 channel
    Channel channel = inactiveEvent.getChannel();
    InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
    String address = socketAddress.getHostName() + ":" + socketAddress.getPort();
    JdLogger.warn(getClass(), "this channel is inactive :" + socketAddress + "trying to remove this connection");

    WorkerInfoHolder.dealChannelInactive(address);
}

监听热 key 回调事件:ReceiveNewKeySubscribe

private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();

@Subscribe
public void newKeyComing(ReceiveNewKeyEvent event) {HotKeyModel hotKeyModel = event.getModel();
    if (hotKeyModel == null) {return;}
    // 收到新 key 推送
    if (receiveNewKeyListener != null) {receiveNewKeyListener.newKey(hotKeyModel);
    }
}

该办法会收到新的热 key 订阅事件之后,会将其退出到 KeyHandlerFactory 的收集器外面解决。

外围解决逻辑:DefaultNewKeyListener#newKey:

@Override
public void newKey(HotKeyModel hotKeyModel) {long now = System.currentTimeMillis();
    // 如果 key 达到时曾经过来 1 秒了,记录一下。手工删除 key 时,没有 CreateTime
    if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {JdLogger.warn(getClass(), "the key comes too late :" + hotKeyModel.getKey() + "now" +
                +now + "keyCreateAt" + hotKeyModel.getCreateTime());
    }
    if (hotKeyModel.isRemove()) {
        // 如果是删除事件,就间接删除
        deleteKey(hotKeyModel.getKey());
        return;
    }
    // 曾经是热 key 了,又推过来同样的热 key,做个日志记录,并刷新一下
    if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {JdLogger.warn(getClass(), "receive repeat hot key:" + hotKeyModel.getKey() + "at" + now);
    }
    addKey(hotKeyModel.getKey());
}
private void deleteKey(String key) {CacheFactory.getNonNullCache(key).delete(key);
}
private void addKey(String key) {ValueModel valueModel = ValueModel.defaultValue(key);
  if (valueModel == null) {
      // 不合乎任何规定
      deleteKey(key);
      return;
  }

// 如果原来该 key 曾经存在了,那么 value 就被重置,过期工夫也会被重置。如果原来不存在,就新增的热 key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
如果该 HotKeyModel 外面是删除事件,则获取 RULE_CACHE_MAP 外面该 key 超时工夫对应的 caffeine,而后从中删除该 key 缓存,而后返回 (这里相当于删除了本地缓存)。
如果不是删除事件,则在 RULE_CACHE_MAP 对应的 caffeine 缓存中增加该 key 的缓存。
这里有个留神点,如果不为删除事件,调用 addKey()办法在 caffeine 减少缓存的时候,value 是一个魔术值 0x12fcf76,这个值只代表加了这个缓存,然而这个缓存在查问的时候相当于为 null。
监听 Rule 的变动事件:KeyRuleHolder

能够看到外面有两个成员属性:RULE_CACHE_MAP,KEY_RULES


/**
 * 保留超时工夫和 caffeine 的映射,key 是超时工夫,value 是 caffeine[(String,Object)]
 */
private static final ConcurrentHashMap<Integer, LocalCache> RULE_CACHE_MAP = new ConcurrentHashMap<>();
/**
 * 这里 KEY_RULES 是保留 etcd 外面该 appName 所对应的所有 rule
 */
private static final List<KeyRule> KEY_RULES = new ArrayList<>();

ConcurrentHashMap RULE_CACHE_MAP:

保留超时工夫和 caffeine 的映射,key 是超时工夫,value 是 caffeine[(String,Object)]。
奇妙的设计: 这里将 key 的过期工夫作为分桶策略,这样同一个过期工夫的 key 就会在一个桶 (caffeine) 外面,这外面每一个 caffeine 都是 client 的本地缓存,也就是说 hotKey 的本地缓存的 KV 实际上是存储在这外面的。
List KEY_RULES:

这里 KEY_RULES 是保留 etcd 外面该 appName 所对应的所有 rule。
具体监听 KeyRuleInfoChangeEvent 事件办法:

@Subscribe
public void ruleChange(KeyRuleInfoChangeEvent event) {JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());
    List<KeyRule> ruleList = event.getKeyRules();
    if (ruleList == null) {return;}

    putRules(ruleList);
}

外围解决逻辑:KeyRuleHolder#putRules:

/**
 * 所有的规定,如果规定的超时工夫变动了,会重建 caffeine
 */
public static void putRules(List<KeyRule> keyRules) {synchronized (KEY_RULES) {
        // 如果规定为空,清空规定表
        if (CollectionUtil.isEmpty(keyRules)) {KEY_RULES.clear();
            RULE_CACHE_MAP.clear();
            return;
        }
        KEY_RULES.clear();
        KEY_RULES.addAll(keyRules);
        Set<Integer> durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());
        for (Integer duration : RULE_CACHE_MAP.keySet()) {
            // 先革除掉那些在 RULE_CACHE_MAP 里存的,然而 rule 里已没有的
            if (!durationSet.contains(duration)) {RULE_CACHE_MAP.remove(duration);
            }
        }
        // 遍历所有的规定
        for (KeyRule keyRule : keyRules) {int duration = keyRule.getDuration();
            // 这里如果 RULE_CACHE_MAP 外面没有超时工夫为 duration 的 value,则新建一个放入到 RULE_CACHE_MAP 外面
            // 比方 RULE_CACHE_MAP 原本就是空的,则在这里来构建 RULE_CACHE_MAP 的映射关系
            //TODO 如果 keyRules 外面蕴含雷同 duration 的 keyRule,则也只会建一个 key 为 duration,value 为 caffeine,其中 caffeine 是(string,object)
            if (RULE_CACHE_MAP.get(duration) == null) {LocalCache cache = CacheFactory.build(duration);
                RULE_CACHE_MAP.put(duration, cache);
            }
        }
    }
}

应用 synchronized 关键字来保障线程平安;
如果规定为空,清空规定表 (RULE_CACHE_MAP、KEY_RULES);
应用传递进来的 keyRules 来笼罩 KEY_RULES;
革除掉 RULE_CACHE_MAP 外面在 keyRules 没有的映射关系;
遍历所有的 keyRules,如果 RULE_CACHE_MAP 外面没有相干的超时工夫 key,则在外面赋值;
⑤ 启动 EtcdStarter(etcd 连贯管理器)

EtcdStarter starter = new EtcdStarter();
// 与 etcd 相干的监听都开启
starter.start();

public void start() {fetchWorkerInfo();
fetchRule();
startWatchRule();
// 监听热 key 事件,只监听手工增加、删除的 key
startWatchHotKey();}

fetchWorkerInfo()

从 etcd 外面拉取 worker 集群地址信息 allAddress,并更新 WorkerInfoHolder 外面的 WORKER_HOLDER

/**
 * 每隔 30 秒拉取 worker 信息
 */
private void fetchWorkerInfo() {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 开启拉取 etcd 的 worker 信息,如果拉取失败,则定时持续拉取
    scheduledExecutorService.scheduleAtFixedRate(() -> {JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");
        fetch();}, 0, 30, TimeUnit.SECONDS);
}

应用定时线程池来执行,单线程。
定时从 etcd 外面获取,地址 /jd/workers/+$appName 或 default,工夫距离不可设置,默认 30 秒,这外面存储的是 worker 地址的 ip+port。
公布 WorkerInfoChangeEvent 事件。
备注: 地址有 $appName 或 default,在 worker 外面配置,如果把 worker 放到某个 appName 下,则该 worker 只会参加该 app 的计算。
fetchRule()

定时线程来执行,单线程,工夫距离不可设置,默认是 5 秒,当拉取规定配置和手动配置的 hotKey 胜利后,该线程被终止(也就是说只会胜利执行一次),执行失败继续执行

private void fetchRule() {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 开启拉取 etcd 的 worker 信息,如果拉取失败,则定时持续拉取
    scheduledExecutorService.scheduleAtFixedRate(() -> {JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");
        boolean success = fetchRuleFromEtcd();
        if (success) {
            // 拉取已存在的热 key
            fetchExistHotKey();
            // 这里如果拉取规定和拉取手动配置的 hotKey 胜利之后,则该定时执行线程进行
            scheduledExecutorService.shutdown();}
    }, 0, 5, TimeUnit.SECONDS);
}

fetchRuleFromEtcd()

从 etcd 外面获取该 appName 配置的 rule 规定,地址 /jd/rules/+$appName。
如果查出来规定 rules 为空,会通过公布 KeyRuleInfoChangeEvent 事件来清空本地的 rule 配置缓存和所有的规定 key 缓存。
公布 KeyRuleInfoChangeEvent 事件。
fetchExistHotKey()

从 etcd 外面获取该 appName 手动配置的热 key,地址 /jd/hotkeys/+$appName。
公布 ReceiveNewKeyEvent 事件,并且内容 HotKeyModel 不是删除事件。
startWatchRule()

/**
 * 异步监听 rule 规定变动
 */
private void startWatchRule() {ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.submit(() -> {JdLogger.info(getClass(), "--- begin watch rule change ----");
        try {IConfigCenter configCenter = EtcdConfigFactory.configCenter();
            KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);
            // 如果有新事件,即 rule 的变更,就从新拉取所有的信息
            while (watchIterator.hasNext()) {
                // 这句必须写,next 会让他卡住,除非真的有新 rule 变更
                WatchUpdate watchUpdate = watchIterator.next();
                List<Event> eventList = watchUpdate.getEvents();
                JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is" + eventList);

                // 全量拉取 rule 信息
                fetchRuleFromEtcd();}
        } catch (Exception e) {JdLogger.error(getClass(), "watch err");
        }
    });
}

异步监听 rule 规定变动,应用 etcd 监听地址为 /jd/rules/+$appName 的节点变动。
应用线程池,单线程,异步监听 rule 规定变动,如果有事件变动,则调用 fetchRuleFromEtcd()办法。
startWatchHotKey()
异步开始监听热 key 变动信息,应用 etcd 监听地址前缀为 /jd/hotkeys/+$appName

/**
 * 异步开始监听热 key 变动信息,该目录里只有手工增加的 key 信息
 */
private void startWatchHotKey() {ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.submit(() -> {JdLogger.info(getClass(), "--- begin watch hotKey change ----");
        IConfigCenter configCenter = EtcdConfigFactory.configCenter();
        try {KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);
            // 如果有新事件,即新 key 产生或删除
            while (watchIterator.hasNext()) {WatchUpdate watchUpdate = watchIterator.next();

                List<Event> eventList = watchUpdate.getEvents();
                KeyValue keyValue = eventList.get(0).getKv();
                Event.EventType eventType = eventList.get(0).getType();
                try {
                    // 从这个中央能够看出,etcd 给的返回是节点的全门路,而咱们须要的 key 要去掉前缀
                    String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");
                    // 如果是删除 key,就立即删除
                    if (Event.EventType.DELETE == eventType) {HotKeyModel model = new HotKeyModel();
                        model.setRemove(true);
                        model.setKey(key);
                        EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
                    } else {HotKeyModel model = new HotKeyModel();
                        model.setRemove(false);
                        String value = keyValue.getValue().toStringUtf8();
                        // 新增热 key
                        JdLogger.info(getClass(), "etcd receive new key :" + key + "--value:" + value);
                        // 如果这是一个删除指令,就什么也不干
                        //TODO 这里有个疑难,监听到 worker 主动探测收回的惰性删除指令,这里之间跳过了,然而本地缓存没有更新吧?//TODO 所以我猜想在客户端应用判断缓存是否存在的 api 外面,应该会判断相干缓存的 value 值是否为 "#[DELETE]#" 删除标记
                        // 解疑: 这里的确只监听手工配置的 hotKey,etcd 的 /jd/hotkeys/+$appName 该地址只是手动配置 hotKey,worker 主动探测的 hotKey 是间接通过 netty 通道来告知 client 的
                        if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {continue;}
                        // 手工创立的 value 是工夫戳
                        model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));
                        model.setKey(key);
                        EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
                    }
                } catch (Exception e) {JdLogger.error(getClass(), "new key err:" + keyValue);
                }

            }
        } catch (Exception e) {JdLogger.error(getClass(), "watch err");
        }
    });

}

应用线程池,单线程,异步监听热 key 变动
应用 etcd 监听前缀地址的以后节点以及子节点的所有变动值
删除节点动作
公布 ReceiveNewKeyEvent 事件,并且内容 HotKeyModel 是删除事件
新增 or 更新节点动作
事件变动的 value 值为删除标记 #[DELETE]#
如果是删除标记的话,代表是 worker 主动探测或者 client 须要删除的指令。
如果是删除标记则什么也不做,间接跳过 (这里从 HotKeyPusher#push 办法能够看到,做删除事件的操作时候,他会给 /jd/hotkeys/+$appName 的节点外面减少一个值为删除标记的节点,而后再删除雷同门路的节点,这样就能够触发下面的删除节点事件,所以这里判断如果是删除标记间接跳过)。
不为删除标记
公布 ReceiveNewKeyEvent 事件,事件内容 HotKeyModel 外面的 createTime 是 kv 对应的工夫戳
疑难: 这里代码正文外面说只监听手工增加或者删除的 hotKey,难道说 /jd/hotkeys/+$appName 地址只是手工配置的地址吗?

解疑: 这里的确只监听手工配置的 hotKey,etcd 的 /jd/hotkeys/+$appName 该地址只是手动配置 hotKey,worker 主动探测的 hotKey 是间接通过 netty 通道来告知 client 的

5.API 解析

1)流程图示
① 查问流程

② 删除流程:

从下面的流程图中,大家应该晓得该热点 key 在代码中是如何扭转的,这里再给大家解说一下外围 API 的源码解析,限于篇幅的起因,咱们不一个个贴相干源码了,只是单纯的通知你它的外部逻辑是怎么样的。

2)外围类:JdHotKeyStore

JdHotKeyStore 是封装 client 调用的 api 外围类,蕴含下面 10 个公共办法,咱们重点解析其中 6 个重要办法:

① isHotKey(String key)
判断是否在规定内,如果不在,返回 false
判断是否是热 key,如果不是或者是且过期工夫在 2s 内,则给 TurnKeyCollector#collect 收集
最初给 TurnCountCollector#collect 做统计收集

② get(String key)
从本地 caffeine 取值
如果取到的 value 是个魔术值,只代表退出到 caffeine 缓存外面了,查问的话为 null

③ smartSet(String key, Object value)
判断是否是热 key,这里不论它在不在规定内,如果是热 key,则给 value 赋值,如果不为热 key 什么也不做

④ forceSet(String key, Object value)
强制给 value 赋值
如果该 key 不在规定配置内,则传递的 value 不失效,本地缓存的赋值 value 会被变为 null

⑤ getValue(String key, KeyType keyType)
获取 value,如果 value 不存在则调用 HotKeyPusher#push 办法发往 netty
如果没有为该 key 配置规定,就不必上报 key,间接返回 null
如果取到的 value 是个魔术值,只代表退出到 caffeine 缓存外面了,查问的话为 null

⑥ remove(String key)
删除某 key(本地的 caffeine 缓存),会告诉整个集群删除(通过 etcd 来告诉集群删除)

3)client 上传热 key 入口调用类:HotKeyPusher
外围办法:

public static void push(String key, KeyType keyType, int count, boolean remove) {if (count <= 0) {count = 1;}
    if (keyType == null) {keyType = KeyType.REDIS_KEY;}
    if (key == null) {return;}
    // 这里之所以用 LongAdder 是为了保障多线程计数的线程安全性,尽管这里是在办法内调用的,然而在 TurnKeyCollector 的两个 map 外面,// 存储了 HotKeyModel 的实例对象,这样在多个线程同时批改 count 的计数属性时,会存在线程平安计数不精确问题
    LongAdder adderCnt = new LongAdder();
    adderCnt.add(count);

    HotKeyModel hotKeyModel = new HotKeyModel();
    hotKeyModel.setAppName(Context.APP_NAME);
    hotKeyModel.setKeyType(keyType);
    hotKeyModel.setCount(adderCnt);
    hotKeyModel.setRemove(remove);
    hotKeyModel.setKey(key);


    if (remove) {
        // 如果是删除 key,就间接发到 etcd 去,不必做聚合。然而有点问题当初,这个删除只能删手工增加的 key,不能删 worker 探测进去的
        // 因为各个 client 都在监听手工增加的那个 path,没监听主动探测的 path。所以如果手工的那个 path 下,没有该 key,那么是删除不了的。// 删不了,就达不到集群监听删除事件的成果,怎么办呢?能够通过新增的形式,新增一个热 key,而后删除它
        //TODO 这里为啥不间接删除该节点,难道 worker 主动探测解决的 hotKey 不会往该节点减少新增事件吗?// 释疑:worker 依据探测配置的规定,当判断出某个 key 为 hotKey 后,的确不会往 keyPath 外面退出节点,他只是单纯的往本地缓存外面退出一个空值,代表是热点 key
        EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);
        EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 这里很奇妙待补充形容
        // 也删 worker 探测的目录
        EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));
    } else {
        // 如果 key 是规定内的要被探测的 key,就积攒期待传送
        if (KeyRuleHolder.isKeyInRule(key)) {
            // 积攒起来,期待每半秒发送一次
            KeyHandlerFactory.getCollector().collect(hotKeyModel);
        }
    }
}

从下面的源码中可知:

这里之所以用 LongAdder 是为了保障多线程计数的线程安全性,尽管这里是在办法内调用的,然而在 TurnKeyCollector 的两个 map 外面,存储了 HotKeyModel 的实例对象,这样在多个线程同时批改 count 的计数属性时,会存在线程平安计数不精确问题。
如果是 remove 删除类型,在删除手动配置的热 key 配置门路的同时,还会删除 dashboard 展现热 key 的配置门路。
只有在规定配置的 key,才会被积攒探测发送到 worker 内进行计算。
3. 通信机制(与 worker 交互)

1)NettyClient:netty 连接器

public class NettyClient {private static final NettyClient nettyClient = new NettyClient();

    private Bootstrap bootstrap;

    public static NettyClient getInstance() {return nettyClient;}

    private NettyClient() {if (bootstrap == null) {bootstrap = initBootstrap();
        }
    }

    private Bootstrap initBootstrap() {
        // 少线程
        EventLoopGroup group = new NioEventLoopGroup(2);

        Bootstrap bootstrap = new Bootstrap();
        NettyClientHandler nettyClientHandler = new NettyClientHandler();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());
                        ch.pipeline()
                                .addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))// 这里就是定义 TCP 多个包之间的分隔符,为了更好的做拆包
                                .addLast(new MsgDecoder())
                                .addLast(new MsgEncoder())
                                //30 秒没音讯时,就发心跳包过来
                                .addLast(new IdleStateHandler(0, 0, 30))
                                .addLast(nettyClientHandler);
                    }
                });
        return bootstrap;
    }
}

应用 Reactor 线程模型,只有 2 个工作线程,没有独自设置主线程
长连贯,开启 TCP_NODELAY
netty 的分隔符”$()$”,相似 TCP 报文分段的规范,不便拆包
Protobuf 序列化与反序列化
30s 没有音讯发给对端的时候,发送一个心跳包判活
工作线程处理器 NettyClientHandler
JDhotkey 的 tcp 协定设计就是收发字符串,每个 tcp 音讯包应用特殊字符 $()$ 来宰割
长处:这样实现非常简单。

取得音讯包后进行 json 或者 protobuf 反序列化。

毛病:是须要,从字节流 -》反序列化成字符串 -》反序列化成音讯对象,两层序列化损耗了一部分性能。

protobuf 还好序列化很快,然而 json 序列化的速度只有几十万每秒,会损耗一部分性能。

2)NettyClientHandler: 工作线程处理器

@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<HotKeyMsg> {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            // 这里示意如果读写都挂了
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                // 向服务端发送音讯
                ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));
            }
        }

        super.userEventTriggered(ctx, evt);
    }
    // 在 Channel 注册 EventLoop、绑定 SocketAddress 和连贯 ChannelFuture 的时候都有可能会触发 ChannelInboundHandler 的 channelActive 办法的调用
    // 相似 TCP 三次握手胜利之后触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) {JdLogger.info(getClass(), "channelActive:" + ctx.name());
        ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));
    }
    // 相似 TCP 四次挥手之后,期待 2MSL 工夫之后触发(大略 180s),比方 channel 通道敞开会触发(channel.close())

    // 客户端 channel 被动敞开连贯时,会向服务端发送一个写申请,而后服务端 channel 所在的 selector 会监听到一个 OP_READ 事件,而后
    // 执行数据读取操作,而读取时发现客户端 channel 曾经敞开了,则读取数据字节个数返回 -1,而后执行 close 操作,敞开该 channel 对应的底层 socket,// 并在 pipeline 中,从 head 开始,往下将 InboundHandler,并触发 handler 的 channelInactive 和 channelUnregistered 办法的执行,以及移除 pipeline 中的 handlers 一系列操作。@Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);
        // 断线了,可能只是 client 和 server 断了,但都和 etcd 没断。也可能是 client 本人断网了,也可能是 server 断了
        // 公布断线事件。后续 10 秒后进行重连,依据 etcd 里的 worker 信息来决定是否重连,如果 etcd 里没了,就不重连。如果 etcd 里有,就重连
        notifyWorkerChange(ctx.channel());
    }
    private void notifyWorkerChange(Channel channel) {EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {if (MessageType.PONG == msg.getMessageType()) {JdLogger.info(getClass(), "heart beat");
            return;
        }
        if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {JdLogger.info(getClass(), "receive new key :" + msg);
            if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {return;}
            for (HotKeyModel model : msg.getHotKeyModels()) {EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
            }
        }
    }
}

userEventTriggered

收到对端发来的心跳包,返回 new HotKeyMsg(MessageType.PING, Context.APP_NAME)
channelActive

在 Channel 注册 EventLoop、绑定 SocketAddress 和连贯 ChannelFuture 的时候都有可能会触发 ChannelInboundHandler 的 channelActive 办法的调用
相似 TCP 三次握手胜利之后触发,给对端发送 new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)
channelInactive

相似 TCP 四次挥手之后,期待 2MSL 工夫之后触发 (大略 180s),比方 channel 通道敞开会触发(channel.close()) 该办法,公布 ChannelInactiveEvent 事件,来 10s 后重连
channelRead0

接管 PONG 音讯类型时,打个日志返回
接管 RESPONSE_NEW_KEY 音讯类型时,公布 ReceiveNewKeyEvent 事件
3.3.3 worker 端

1. 入口启动加载:7 个 @PostConstruct

1)worker 端对 etcd 相干的解决:EtcdStarter
① 第一个 @PostConstruct:watchLog()

@PostConstruct
public void watchLog() {AsyncPool.asyncDo(() -> {
        try {
            // 取 etcd 的是否开启日志配置,地址 /jd/logOn
            String loggerOn = configCenter.get(ConfigConstant.logToggle);
            LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);
        } catch (StatusRuntimeException ex) {logger.error(ETCD_DOWN);
        }
        // 监听 etcd 地址 /jd/logOn 是否开启日志配置,并实时更改开关
        KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);
        while (watchIterator.hasNext()) {WatchUpdate watchUpdate = watchIterator.next();
            List<Event> eventList = watchUpdate.getEvents();
            KeyValue keyValue = eventList.get(0).getKv();
            logger.info("log toggle changed :" + keyValue);
            String value = keyValue.getValue().toStringUtf8();
            LOGGER_ON = "true".equals(value) || "1".equals(value);
        }
    });
}

放到线程池外面异步执行
取 etcd 的是否开启日志配置,地址 /jd/logOn,默认 true
监听 etcd 地址 /jd/logOn 是否开启日志配置,并实时更改开关
因为有 etcd 的监听,所以会始终执行,而不是执行一次完结
② 第二个 @PostConstruct:watch()

/**
 * 启动回调监听器,监听 rule 变动
 */
@PostConstruct
public void watch() {AsyncPool.asyncDo(() -> {
        KvClient.WatchIterator watchIterator;
        if (isForSingle()) {watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);
        } else {watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);
        }
        while (watchIterator.hasNext()) {WatchUpdate watchUpdate = watchIterator.next();
            List<Event> eventList = watchUpdate.getEvents();
            KeyValue keyValue = eventList.get(0).getKv();
            logger.info("rule changed :" + keyValue);
            try {ruleChange(keyValue);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    });
}
/**
     * rule 发生变化时,更新缓存的 rule
     */
    private synchronized void ruleChange(KeyValue keyValue) {String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");
        if (StrUtil.isEmpty(appName)) {return;}
        String ruleJson = keyValue.getValue().toStringUtf8();
        List<KeyRule> keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);
        KeyRuleHolder.put(appName, keyRules);
    }

通过 etcd.workerPath 配置,来判断该 worker 是否为某个 app 独自服务的,默认为”default”,如果是默认值,代表该 worker 参加在 etcd 上所有 app client 的计算,否则只为某个 app 来服务计算

应用 etcd 来监听 rule 规定变动,如果是共享的 worker,监听地址前缀为”/jd/rules/“,如果为某个 app 独享,监听地址为”/jd/rules/“+$etcd.workerPath

如果规定变动,则批改对应 app 在本地存储的 rule 缓存,同时清理该 app 在本地存储的 KV 缓存

KeyRuleHolder:rule 缓存本地存储

Map> RULE_MAP,这个 map 是 concurrentHashMap,map 的 kv 别离是 appName 和对应的 rule
绝对于 client 的 KeyRuleHolder 的区别:worker 是存储所有 app 规定,每个 app 对应一个规定桶,所以用 map
CaffeineCacheHolder:key 缓存本地存储

Map> CACHE_MAP,也是 concurrentHashMap,map 的 kv 别离是 appName 和对应的 kv 的 caffeine
绝对于 client 的 caffeine,第一是 worker 没有做缓存接口比方 LocalCache,第二是 client 的 map 的 kv 别离是超时工夫、以及雷同超时工夫所对应 key 的缓存桶
放到线程池外面异步执行,因为有 etcd 的监听,所以会始终执行,而不是执行一次完结

③ 第三个 @PostConstruct:watchWhiteList()

/**
 * 启动回调监听器,监听白名单变动,只监听本人所在的 app,白名单 key 不参加热 key 计算,间接疏忽
 */
@PostConstruct
public void watchWhiteList() {AsyncPool.asyncDo(() -> {
        // 从 etcd 配置中获取所有白名单
        fetchWhite();
        KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);
        while (watchIterator.hasNext()) {WatchUpdate watchUpdate = watchIterator.next();
            logger.info("whiteList changed");
            try {fetchWhite();
            } catch (Exception e) {e.printStackTrace();
            }
        }
    });
}

拉取并监听 etcd 白名单 key 配置,地址为 /jd/whiteList/+$etcd.workerPath
在白名单的 key,不参加热 key 计算,间接疏忽
放到线程池外面异步执行,因为有 etcd 的监听,所以会始终执行,而不是执行一次完结
④ 第四个 @PostConstruct:makeSureSelfOn()

/**
 * 每隔一会去 check 一下,本人还在不在 etcd 里
 */
@PostConstruct
public void makeSureSelfOn() {
    // 开启上传 worker 信息
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {if (canUpload) {uploadSelfInfo();
            }
        } catch (Exception e) {//do nothing}
    }, 0, 5, TimeUnit.SECONDS);
}

在线程池外面异步执行,定时执行,工夫距离为 5s
将本机 woker 的 hostName,ip+port 以 kv 的模式定时上报给 etcd,地址为 /jd/workers/+$etcd.workPath+”/“+$hostName,续期工夫为 8s
有一个 canUpload 的开关来管制 worker 是否向 etcd 来定时续期,如果这个开关敞开了,代表 worker 不向 etcd 来续期,这样当下面地址的 kv 到期之后,etcd 会删除该节点,这样 client 循环判断 worker 信息变动了
2)将热 key 推送到 dashboard 供入库:DashboardPusher

① 第五个 @PostConstruct:uploadToDashboard()

@Component
public class DashboardPusher implements IPusher {
    /**
     * 热 key 集中营
     */
    private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void uploadToDashboard() {AsyncPool.asyncDo(() -> {while (true) {
                try {
                    // 要么 key 达到 1 千个,要么达到 1 秒,就汇总上报给 etcd 一次
                    List<HotKeyModel> tempModels = new ArrayList<>();
                    Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);
                    if (CollectionUtil.isEmpty(tempModels)) {continue;}

                    // 将热 key 推到 dashboard
                    DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));
                } catch (Exception e) {e.printStackTrace();
                }
            }
        });
    }
}

当热 key 的数量达到 1000 或者每隔 1s,把热 key 的数据通过与 dashboard 的 netty 通道来发送给 dashboard,数据类型为 REQUEST_HOT_KEY
LinkedBlockingQueue hotKeyStoreQueue:worker 计算的给 dashboard 热 key 的集中营,所有给 dashboard 推送热 key 存储在外面
3)推送到各客户端服务器:AppServerPusher

① 第六个 @PostConstruct:batchPushToClient()

public class AppServerPusher implements IPusher {
    /**
     * 热 key 集中营
     */
    private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();

    /**
     * 和 dashboard 那边的推送次要区别在于,给 app 推送每 10ms 一次,dashboard 那边 1s 一次
     */
    @PostConstruct
    public void batchPushToClient() {AsyncPool.asyncDo(() -> {while (true) {
                try {List<HotKeyModel> tempModels = new ArrayList<>();
                    // 每 10ms 推送一次
                    Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
                    if (CollectionUtil.isEmpty(tempModels)) {continue;}
                    Map<String, List<HotKeyModel>> allAppHotKeyModels = new HashMap<>();
                    // 拆分出每个 app 的热 key 汇合,按 app 分堆
                    for (HotKeyModel hotKeyModel : tempModels) {List<HotKeyModel> oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());
                        oneAppModels.add(hotKeyModel);
                    }
                    // 遍历所有 app,进行推送
                    for (AppInfo appInfo : ClientInfoHolder.apps) {List<HotKeyModel> list = allAppHotKeyModels.get(appInfo.getAppName());
                        if (CollectionUtil.isEmpty(list)) {continue;}
                        HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);
                        hotKeyMsg.setHotKeyModels(list);

                        // 整个 app 全副发送
                        appInfo.groupPush(hotKeyMsg);
                    }
                    // 推送完,及时清理不应用内存
                    allAppHotKeyModels = null;
                } catch (Exception e) {e.printStackTrace();
                }
            }
        });
    }
}

会依照 key 的 appName 来进行分组,而后通过对应 app 的 channelGroup 来推送
当热 key 的数量达到 10 或者每隔 10ms,把热 key 的数据通过与 app 的 netty 通道来发送给 app,数据类型为 RESPONSE_NEW_KEY
LinkedBlockingQueue hotKeyStoreQueue:worker 计算的给 client 热 key 的集中营,所有给 client 推送热 key 存储在外面
4)client 实例节点解决:NodesServerStarter
① 第七个 @PostConstruct:start()

public class NodesServerStarter {@Value("${netty.port}")
    private int port;
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Resource
    private IClientChangeListener iClientChangeListener;
    @Resource
    private List<INettyMsgFilter> messageFilters;

    @PostConstruct
    public void start() {AsyncPool.asyncDo(() -> {logger.info("netty server is starting");
            NodesServer nodesServer = new NodesServer();
            nodesServer.setClientChangeListener(iClientChangeListener);
            nodesServer.setMessageFilters(messageFilters);
            try {nodesServer.startNettyServer(port);
            } catch (Exception e) {e.printStackTrace();
            }
        });
    }
}

线程池外面异步执行,启动 client 端的 nettyServer
iClientChangeListener 和 messageFilters 这两个依赖最终会被传递到 netty 音讯处理器外面,iClientChangeListener 会作为 channel 下线解决来删除 ClientInfoHolder 下线或者超时的通道,messageFilters 会作为 netty 收到事件音讯的解决过滤器(责任链模式)
② 依赖的 bean:IClientChangeListener iClientChangeListener

public interface IClientChangeListener {
    /**
     * 发现新连贯
     */
    void newClient(String appName, String channelId, ChannelHandlerContext ctx);
    /**
     * 客户端掉线
     */
    void loseClient(ChannelHandlerContext ctx);
}

对客户端的治理,新来 (newClient)(会触发 netty 的连贯办法 channelActive)、断线(loseClient)(会触发 netty 的断连办法 channelInactive()) 的治理
client 的连贯信息次要是在 ClientInfoHolder 外面

List apps,这外面的 AppInfo 次要是 appName 和对应的 channelGroup
对 apps 的 add 和 remove 次要是通过新来(newClient)、断线(loseClient)
③ 依赖的 bean:List messageFilters

/**
 * 对 netty 来的音讯,进行过滤解决
 * @author wuweifeng wrote on 2019-12-11
 * @version 1.0
 */
public interface INettyMsgFilter {boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);
}

对 client 发给 worker 的 netty 音讯,进行过滤解决,共有四个实现类,也就是说底下四个过滤器都是收到 client 发送的 netty 音讯来做解决

④ 各个音讯解决的类型:MessageType

APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), // 命中率
REQUEST_HOT_KEY((byte) 8), // 热 key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);
程序 1:HeartBeatFilter

当音讯类型为 PING,则给对应的 client 示例返回 PONG
程序 2:AppNameFilter

当音讯类型为 APP_NAME,代表 client 与 worker 建设连贯胜利,而后调用 iClientChangeListener 的 newClient 办法减少 apps 元数据信息
程序 3:HotKeyFilter

解决接管音讯类型为 REQUEST_NEW_KEY
先给 HotKeyFilter.totalReceiveKeyCount 原子类增 1,该原子类代表 worker 实例接管到的 key 的总数
publishMsg 办法,将音讯通过自建的生产者消费者模型(KeyProducer,KeyConsumer),来把音讯给发到生产者中散发生产
接管到的音讯 HotKeyMsg 外面 List
首先判断 HotKeyModel 外面的 key 是否在白名单内,如果在则跳过,否则将 HotKeyModel 通过 KeyProducer 发送
程序 4:KeyCounterFilter

解决接管类型为 REQUEST_HIT_COUNT
这个过滤器是专门给 dashboard 来汇算 key 的,所以这个 appName 间接设置为该 worker 配置的 appName
该过滤器的数据起源都是 client 的 NettyKeyPusher#sendCount(String appName, List list),这外面的数据都是默认积攒 10s 的,这个 10s 是能够配置的,这一点在 client 外面有讲
将结构的 new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞队列 LinkedBlockingQueue COUNTER_QUEUE 中,而后让 CounterConsumer 来生产解决,生产逻辑是单线程的
CounterConsumer:热 key 统计消费者
放在公共线程池中,来单线程执行
从阻塞队列 COUNTER_QUEUE 外面取数据,而后将外面的 key 的统计数据公布到 etcd 的 /jd/keyHitCount/+ appName +“/“+ IpUtils.getIp() +“-“+ System.currentTimeMillis()外面,该门路是 worker 服务的 client 集群或者 default,用来寄存客户端 hotKey 拜访次数和总拜访次数的 path,而后让 dashboard 来订阅统计展现
2. 三个定时工作:3 个 @Scheduled

1)定时工作 1:EtcdStarter#pullRules()

/**
 * 每隔 1 分钟拉取一次,所有的 app 的 rule
 */
@Scheduled(fixedRate = 60000)
public void pullRules() {
    try {if (isForSingle()) {String value = configCenter.get(ConfigConstant.rulePath + workerPath);
            if (!StrUtil.isEmpty(value)) {List<KeyRule> keyRules = FastJsonUtils.toList(value, KeyRule.class);
                KeyRuleHolder.put(workerPath, keyRules);
            }
        } else {List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.rulePath);
            for (KeyValue keyValue : keyValues) {ruleChange(keyValue);
            }
        }
    } catch (StatusRuntimeException ex) {logger.error(ETCD_DOWN);
    }
}

每隔 1 分钟拉取一次 etcd 地址为 /jd/rules/ 的规定变动,如果 worker 所服务的 app 或者 default 的 rule 有变动,则更新规定的缓存,并清空该 appName 所对应的本地 key 缓存

2)定时工作 2:EtcdStarter#uploadClientCount()

/**
     * 每隔 10 秒上传一下 client 的数量到 etcd 中
     */
    @Scheduled(fixedRate = 10000)
    public void uploadClientCount() {
        try {String ip = IpUtils.getIp();
            for (AppInfo appInfo : ClientInfoHolder.apps) {String appName = appInfo.getAppName();
                int count = appInfo.size();
                // 即使是 full gc 也不能超过 3 秒,因为这里给的过期工夫是 13s,因为该定时工作每隔 10s 执行一次,如果 full gc 或者说上报给 etcd 的工夫超过 3s,// 则在 dashboard 查问不到 client 的数量
                configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);
            }
            configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);
            // 上报每秒 QPS(接管 key 数量、解决 key 数量)String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));
            configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);
            logger.info(totalCount + "expireCount:" + expireTotalCount + "offerCount:" + totalOfferCount);
            // 如果是稳固始终有 key 发送的利用,倡议开启该监控,以防止可能产生的网络故障
            if (openMonitor) {checkReceiveKeyCount();
            }
//            configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);
        } catch (Exception ex) {logger.error(ETCD_DOWN);
        }
    }

每个 10s 将 worker 计算存储的 client 信息上报给 etcd,来不便 dashboard 来查问展现,比方 /jd/count/ 对应 client 数量,/jd/caffeineSize/ 对应 caffeine 缓存的大小,/jd/totalKeyCount/ 对应该 worker 接管的 key 总量和解决的 key 总量
能够从代码中看到,下面所有 etcd 的节点租期工夫都是 13s,而该定时工作是每 10s 执行一次,意味着如果 full gc 或者说上报给 etcd 的工夫超过 3s,则在 dashboard 查问不到 client 的相干汇算信息
长时间不收到 key,判断网络状态不好,断开 worker 给 etcd 地址为 /jd/workers/+$workerPath 节点的续租,因为 client 会循环判断该地址的节点是否变动,使得 client 从新连贯 worker 或者断开失联的 worker
3)定时工作 3:EtcdStarter#fetchDashboardIp()

/**
 * 每隔 30 秒去获取一下 dashboard 的地址
 */
@Scheduled(fixedRate = 30000)
public void fetchDashboardIp() {
    try {
        // 获取 DashboardIp
        List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);
        // 是空,给个正告
        if (CollectionUtil.isEmpty(keyValues)) {logger.warn("very important warn !!! Dashboard ip is null!!!");
            return;
        }
        String dashboardIp = keyValues.get(0).getValue().toStringUtf8();
        NettyClient.getInstance().connect(dashboardIp);
    } catch (Exception e) {e.printStackTrace();
    }
}

每隔 30s 拉取一次 etcd 前缀为 /jd/dashboard/ 的 dashboard 连贯 ip 的值,并且判断 DashboardHolder.hasConnected 外面是否为未连贯状态,如果是则从新连贯 worker 与 dashboard 的 netty 通道

3. 自建的生产者消费者模型(KeyProducer,KeyConsumer)

个别生产者消费者模型蕴含三大元素:生产者、消费者、音讯存储队列
这里音讯存储队列是 DispatcherConfig 外面的 QUEUE,应用 LinkedBlockingQueue,默认大小为 200W

1)KeyProducer

@Component
public class KeyProducer {public void push(HotKeyModel model, long now) {if (model == null || model.getKey() == null) {return;}
        // 5 秒前的过期音讯就不解决了
        if (now - model.getCreateTime() > InitConstant.timeOut) {expireTotalCount.increment();
            return;
        }
        try {QUEUE.put(model);
            totalOfferCount.increment();} catch (InterruptedException e) {e.printStackTrace();
        }
    }

}

判断接管到的 HotKeyModel 是否超出”netty.timeOut”配置的工夫,如果是将 expireTotalCount 纪录过期总数给自增,而后返回

2)KeyConsumer

public class KeyConsumer {

private IKeyListener iKeyListener;
public void setKeyListener(IKeyListener iKeyListener) {this.iKeyListener = iKeyListener;}
public void beginConsume() {while (true) {
        try {
            // 从这里能够看出,这里的生产者消费者模型,实质上还是拉模式,之所以不应用 EventBus,是因为须要队列来做缓冲
            HotKeyModel model = QUEUE.take();
            if (model.isRemove()) {iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
            } else {iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
            }
            // 处理完毕,将数量加 1
            totalDealCount.increment();} catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

}


@Override
public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
   //cache 里的 key,appName+keyType+key
   String key = buildKey(hotKeyModel);
   hotCache.invalidate(key);
   CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
   // 推送所有 client 删除
   hotKeyModel.setCreateTime(SystemClock.now());
   logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());
   for (IPusher pusher : iPushers) {// 这里能够看到,删除热 key 的 netty 音讯只给 client 端发了过来,没有给 dashboard 发过来(DashboardPusher 外面的 remove 是个空办法)
       pusher.remove(hotKeyModel);
   }
}
    @Override
    public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
        //cache 里的 key
        String key = buildKey(hotKeyModel);
        // 判断是不是刚热不久
        //hotCache 对应的 caffeine 有效期为 5s,也就是说该 key 会保留 5s,在 5s 内不反复解决雷同的 hotKey。// 毕竟 hotKey 都是刹时流量,能够防止在这 5s 内反复推送给 client 和 dashboard,防止有效的网络开销
        Object o = hotCache.getIfPresent(key);
        if (o != null) {return;}

        //********** watch here ************//
        // 该办法会被 InitConstant.threadCount 个线程同时调用,存在多线程问题
        // 上面的那句 addCount 是加了锁的,代表给 Key 累加数量时是原子性的,不会产生多加、少加的状况,到了设定的阈值肯定会 hot
        // 譬如阈值是 2,如果多个线程累加,在没 hot 前,hot 的状态必定是对的,譬如 thread1 加 1,thread2 加 1,那么 thread2 会 hot 返回 true,开启推送
        // 然而极其状况下,譬如阈值是 10,以后是 9,thread1 走到这里时,加 1,返回 true,thread2 也走到这里,加 1,此时是 11,返回 true,问题来了
        // 该 key 会走上面的 else 两次,也就是 2 次推送。// 所以呈现问题的起因是 hotCache.getIfPresent(key)这一句在并发状况下,没 return 掉,放了两个 key+ 1 到 addCount 这一步时,会有问题
        // 测试代码在 TestBlockQueue 类,间接运行能够看到会同时 hot

        // 那么该问题用解决吗,NO,不须要解决,1 首先要产生的条件极其刻薄,很难触发,以京东这样高的并发量,线上我也没见过触发间断 2 次推送同一个 key 的
        //2 即使触发了,结果也是能够承受的,2 次推送而已,毫无影响,客户端无感知。然而如果非要解决,就要对 slidingWindow 实例加锁了,必然有一些开销

        // 所以只有保障 key 数量不多计算就能够,少计算了没事。因为热 key 必然频率高,漏计几次没事。但非热 key,多计算了,被干成了热 key 就不对了
        SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);// 从这里可知,每个 app 的每个 key 都会对应一个滑动窗口
        // 看看 hot 没
        boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

        if (!hot) {
            // 如果没 hot,从新 put,cache 会主动刷新过期工夫
            CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);
        } else {
            // 这里之所以放入的 value 为 1,是因为 hotCache 是用来专门存储刚生成的 hotKey
            //hotCache 对应的 caffeine 有效期为 5s,也就是说该 key 会保留 5s,在 5s 内不反复解决雷同的 hotKey。// 毕竟 hotKey 都是刹时流量,能够防止在这 5s 内反复推送给 client 和 dashboard,防止有效的网络开销
            hotCache.put(key, 1);

            // 删掉该 key
            // 这个 key 从实际上是专门针对 slidingWindow 的 key,他的组合逻辑是 appName+keyType+key,而不是给 client 和 dashboard 推送的 hotKey
            CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

            // 开启推送
            hotKeyModel.setCreateTime(SystemClock.now());

            // 当开关关上时,打印日志。大促时敞开日志,就不打印了
            if (EtcdStarter.LOGGER_ON) {logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());
            }

            // 别离推送到各 client 和 etcd
            for (IPusher pusher : iPushers) {pusher.push(hotKeyModel);
            }

        }

    }

“thread.count”配置即为消费者个数,多个消费者独特生产一个 QUEUE 队列
生产者消费者模型,实质上还是拉模式,之所以不应用 EventBus,是因为须要队列来做缓冲
依据 HotKeyModel 外面是否是删除音讯类型

删除音讯类型
依据 HotKeyModel 外面的 appName+keyType+key 的名字,来构建 caffeine 外面的 newkey,该 newkey 在 caffeine 外面次要是用来与 slidingWindow 滑动工夫窗对应
删除 hotCache 外面 newkey 的缓存,放入的缓存 kv 别离是 newKey 和 1,hotCache 作用是用来存储该生成的热 key,hotCache 对应的 caffeine 有效期为 5s,也就是说该 key 会保留 5s,在 5s 内不反复解决雷同的 hotKey。毕竟 hotKey 都是刹时流量,能够防止在这 5s 内反复推送给 client 和 dashboard,防止有效的网络开销
删除 CaffeineCacheHolder 外面对应 appName 的 caffeine 外面的 newKey,这外面存储的是 slidingWindow 滑动窗口
推送给该 HotKeyModel 对应的所有 client 实例,用来让 client 删除该 HotKeyModel
非删除音讯类型
依据 HotKeyModel 外面的 appName+keyType+key 的名字,来构建 caffeine 外面的 newkey,该 newkey 在 caffeine 外面次要是用来与 slidingWindow 滑动工夫窗对应
通过 hotCache 来判断该 newkey 是否刚热不久,如果是则返回
依据滑动工夫窗口来计算判断该 key 是否为 hotKey(这里能够学习一下滑动工夫窗口的设计),并返回或者生成该 newKey 对应的滑动窗口
如果没有达到热 key 的规范
通过 CaffeineCacheHolder 从新 put,cache 会主动刷新过期工夫
如果达到了热 key 规范
向 hotCache 外面减少 newkey 对应的缓存,value 为 1 示意刚为热 key。
删除 CaffeineCacheHolder 外面对应 newkey 的滑动窗口缓存。
向该 hotKeyModel 对应的 app 的 client 推送 netty 音讯,示意新产生 hotKey,使得 client 本地缓存,然而推送的 netty 音讯只代表为热 key,client 本地缓存不会存储 key 对应的 value 值,须要调用 JdHotKeyStore 外面的 api 来给本地缓存的 value 赋值
向 dashboard 推送 hotKeyModel,示意新产生 hotKey
3)计算热 key 滑动窗口的设计
限于篇幅的起因,这里就不细谈了,间接贴出我的项目作者对其写的阐明文章:Java 简略实现滑动窗口

3.3.4 dashboard 端

这个没啥可说的了,就是连贯 etcd、mysql,增删改查,不过京东的前端框架很不便,间接返回 list 就能够成列表。

4 总结
文章第二局部为大家解说了 redis 数据歪斜的起因以及应答计划,并对热点问题进行了深刻,从发现热 key 到解决热 key 的两个关键问题的总结。

文章第三局部是热 key 问题解决方案——JD 开源 hotkey 的源码解析,别离从 client 端、worker 端、dashboard 端来进行全方位解说,包含其设计、应用及相干原理。

心愿通过这篇文章,可能使大家不仅学习到相干方法论,也能明确其方法论具体的落地计划,一起学习,一起成长。

作者:李鹏

正文完
 0