1 前言
之前旁边的小伙伴问我热点数据相干问题,在给他粗略地解说一波redis数据歪斜的案例之后,本人也顺道回顾了一些对于热点数据处理的方法论,同时也想起去年所学习JD开源我的项目hotkey——专门用来解决热点数据问题的框架。在这里联合两者所关联到的知识点,通过几个小图和局部粗略的解说,来让大家理解相干方法论以及hotkey的源码解析。

2 Redis数据歪斜
2.1 定义与危害
先说说数据歪斜的定义,借用百度词条的解释:

对于集群零碎,个别缓存是分布式的,即不同节点负责肯定范畴的缓存数据。咱们把缓存数据分散度不够,导致大量的缓存数据集中到了一台或者几台服务节点上,称为数据歪斜。一般来说数据歪斜是因为负载平衡施行的成果不好引起的。

从下面的定义中能够得悉,数据歪斜的起因个别是因为LB的成果不好,导致局部节点数据量十分集中。

那这又会有什么危害呢?

如果产生了数据歪斜,那么保留了大量数据,或者是保留了热点数据的实例的解决压力就会增大,速度变慢,甚至还可能会引起这个实例的内存资源耗尽,从而解体。这是咱们在利用切片集群时要防止的。

2.2 数据歪斜的分类
2.2.1 数据量歪斜(写入歪斜)

1.图示

如图,在某些状况下,实例上的数据分布不平衡,某个实例上的数据特地多。

2.bigkey导致歪斜

某个实例上正好保留了 bigkey。bigkey 的 value 值很大(String 类型),或者是 bigkey 保留了大量汇合元素(汇合类型),会导致这个实例的数据量减少,内存资源耗费也相应减少。

应答办法

在业务层生成数据时,要尽量避免把过多的数据保留在同一个键值对中。
如果 bigkey 正好是汇合类型,还有一个办法,就是把 bigkey 拆分成很多个小的汇合类型数据,扩散保留在不同的实例上。
3.Slot调配不均导致歪斜

先简略的介绍一下slot的概念,slot其实全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 个 Slot,这些哈希槽相似于数据分区,每个键值对都会依据它的 key,被映射到一个哈希槽中。Redis Cluster 计划采纳哈希槽来解决数据和实例之间的映射关系。

一张图来解释,数据、哈希槽、实例这三者的映射散布状况。

这里的CRC16(city)%16384能够简略的了解为将key1依据CRC16算法取hash值而后对slot个数取模,失去的就是slot地位为14484,他所对应的实例节点是第三个。

运维在构建切片集群时候,须要手动调配哈希槽,并且把16384 个槽都调配完,否则 Redis 集群无奈失常工作。因为是手动调配,则可能会导致局部实例所调配的slot过多,导致数据歪斜。

应答办法

应用CLUSTER SLOTS 命令来查

看slot分配情况,应用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE这三个命令来进行slot数据的迁徙,具体内容不再这里细说,感兴趣的同学能够自行学习一下。

4.Hash Tag导致歪斜

Hash Tag 定义 :指当一个key蕴含 {} 的时候,就不对整个key做hash,而仅对 {} 包含的字符串做hash。
假如hash算法为sha1。对user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。
Hash Tag 劣势 :如果不同 key 的 Hash Tag 内容都是一样的,那么,这些 key 对应的数据会被映射到同一个 Slot 中,同时会被调配到同一个实例上。
Hash Tag 劣势 :如果不合理应用,会导致大量的数据可能被集中到一个实例上产生数据歪斜,集群中的负载不平衡。
2.2.2 数据拜访歪斜(读取歪斜-热key问题)

一般来说数据拜访歪斜就是热key问题导致的,如何解决redis热key问题也是面试中常会问到的。所以理解相干概念及方法论也是不可或缺的一环。

1.图示

如图,尽管每个集群实例上的数据量相差不大,然而某个实例上的数据是热点数据,被拜访得十分频繁。

然而为啥会有热点数据的产生呢?

2.产生热key的起因及危害

1)用户生产的数据远大于生产的数据(热卖商品、热点新闻、热点评论、明星直播)。

在日常工作生存中一些突发的事件,例如:双十一期间某些热门商品的提价促销,当这其中的某一件商品被数万次点击浏览或者购买时,会造成一个较大的需求量,这种状况下就会造成热点问题。

同理,被大量刊发、浏览的热点新闻、热点评论、明星直播等,这些典型的读多写少的场景也会产生热点问题。

2)申请分片集中,超过单 Server 的性能极限。

在服务端读数据进行拜访时,往往会对数据进行分片切分,此过程中会在某一主机 Server 上对相应的 Key 进行拜访,当拜访超过 Server 极限时,就会导致热点 Key 问题的产生。

如果热点过于集中,热点 Key 的缓存过多,超过目前的缓存容量时,就会导致缓存分片服务被打垮景象的产生。当缓存服务解体后,此时再有申请产生,会缓存到后盾 DB 上,因为DB 自身性能较弱,在面临大申请时很容易产生申请穿透景象,会进一步导致雪崩景象,重大影响设施的性能。

3.罕用的热key问题解决办法:

解决方案一: 备份热key

能够把热点数据复制多份,在每一个数据正本的 key 中减少一个随机后缀,让它和其它正本数据不会被映射到同一个 Slot 中。

这里相当于把一份数据复制到其余实例上,这样在拜访的时候也减少随机前缀,将对一个实例的拜访压力,均摊到其余实例上

例如:咱们在放入缓存时就将对应业务的缓存key拆分成多个不同的key。如下图所示,咱们首先在更新缓存的一侧,将key拆成N份,比方一个key名字叫做”good_100”,那咱们就能够把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增时都须要去改变这N个key,这一步就是拆key。

对于service端来讲,咱们就须要想方法尽量将本人拜访的流量足够的平均。

如何给本人行将拜访的热key上退出后缀?几种方法,依据本机的ip或mac地址做hash,之后的值与拆key的数量做取余,最终决定拼接成什么样的key后缀,从而打到哪台机器上;服务启动时的一个随机数对拆key的数量做取余。

伪代码如下:

const M = N * 2//生成随机数random = GenRandom(0, M)//结构备份新keybakHotKey = hotKey + “_” + randomdata = redis.GET(bakHotKey)if data == NULL {  data = GetFromDB()  redis.SET(bakHotKey, expireTime + GenRandom(0,5))}

解决方案二: 本地缓存+动静计算主动发现热点缓存

该计划通过被动发现热点并对其进行存储来解决热点 Key 的问题。首先 Client 也会拜访 SLB,并且通过 SLB 将各种申请散发至 Proxy 中,Proxy 会依照基于路由的形式将申请转发至后端的 Redis 中。

在热点 key 的解决上是采纳在服务端减少缓存的形式进行。具体来说就是在 Proxy 上减少本地缓存,本地缓存采纳 LRU 算法来缓存热点数据,后端节点减少热点数据计算模块来返回热点数据。

Proxy 架构的次要有以下长处:

Proxy 本地缓存热点,读能力可程度扩大
DB 节点定时计算热点数据汇合
DB 反馈 Proxy 热点数据
对客户端齐全通明,不需做任何兼容
热点数据的发现与存储

对于热点数据的发现,首先会在一个周期内对 Key 进行申请统计,在达到申请量级后会对热点 Key 进行热点定位,并将所有的热点 Key 放入一个小的 LRU 链表内,在通过 Proxy 申请进行拜访时,若 Redis 发现待访点是一个热点,就会进入一个反馈阶段,同时对该数据进行标记。

能够应用一个etcd或者zk集群来存储反馈的热点数据,而后本地所有节点监听该热点数据,进而加载到本地JVM缓存中。

热点数据的获取

在热点 Key 的解决上次要分为写入跟读取两种模式,在数据写入过程当 SLB 收到数据 K1 并将其通过某一个 Proxy 写入一个 Redis,实现数据的写入。

假若通过后端热点模块计算发现 K1 成为热点 key 后, Proxy 会将该热点进行缓存,当下次客户端再进行拜访 K1 时,能够不经 Redis。

最初因为 proxy 是能够程度裁减的,因而能够任意加强热点数据的拜访能力。

最佳成熟计划: JD开源hotKey这是目前较为成熟的主动探测热key、分布式一致性缓存解决方案。原理就是在client端做洞察,而后上报对应hotkey,server端检测到后,将对应hotkey下发到对应服务端做本地缓存,并且能保障本地缓存和近程缓存的一致性。

在这里咱们就不细谈了,这篇文章的第三局部:JD开源hotkey源码解析外面会率领大家理解其整体原理。

3 JD开源hotkey—主动探测热key、分布式一致性缓存解决方案
3.1 解决痛点
从下面可知,热点key问题在并发量比拟高的零碎中(特地是做秒杀流动)呈现的频率会比拟高,对系统带来的危害也很大。

那么针对此,hotkey诞生的目标是什么?须要解决的痛点是什么?以及它的实现原理。

在这里援用我的项目上的一段话来概述:对任意突发性的无奈事后感知的热点数据,包含并不限于热点数据(如突发大量申请同一个商品)、热用户(如歹意爬虫刷子)、热接口(突发海量申请同一个接口)等,进行毫秒级精准探测到。而后对这些热数据、热用户等,推送到所有服务端JVM内存中,以大幅加重对后端数据存储层的冲击,并能够由使用者决定如何调配、应用这些热key(譬如对热商品做本地缓存、对热用户进行回绝拜访、对热接口进行熔断或返回默认值)。这些热数据在整个服务端集群内放弃一致性,并且业务隔离。

外围性能:热数据探测并推送至集群各个服务器

3.2 集成形式
集成形式在这里就不详述了,感兴趣的同学能够自行搜寻。

3.3 源码解析
3.3.1 架构简介

1.全景图一览

流程介绍:

客户端通过援用hotkey的client包,在启动的时候上报本人的信息给worker,同时和worker之间建设长连贯。定时拉取配置核心下面的规定信息和worker集群信息。
客户端调用hotkey的ishot()的办法来首先匹配规定,而后统计是不是热key。
通过定时工作把热key数据上传到worker节点。
worker集群在收取到所有对于这个key的数据当前(因为通过hash来决定key 上传到哪个worker的,所以同一个key只会在同一个worker节点上),在和定义的规定进行匹配后判断是不是热key,如果是则推送给客户端,实现本地缓存。
2.角色形成

这里间接借用作者的形容:

1)etcd集群etcd作为一个高性能的配置核心,能够以极小的资源占用,提供高效的监听订阅服务。次要用于寄存规定配置,各worker的ip地址,以及探测出的热key、手工增加的热key等。

2)client端jar包就是在服务中增加的援用jar,引入后,就能够以便捷的形式去判断某key是否热key。同时,该jar实现了key上报、监听etcd里的rule变动、worker信息变动、热key变动,对热key进行本地caffeine缓存等。

3) worker端集群worker端是一个独立部署的Java程序,启动后会连贯etcd,并定期上报本人的ip信息,供client端获取地址并进行长连贯。之后,次要就是对各个client发来的待测key进行累加计算,当达到etcd里设定的rule阈值后,将热key推送到各个client。

4) dashboard控制台控制台是一个带可视化界面的Java程序,也是连贯到etcd,之后在控制台设置各个APP的key规定,譬如2秒20次算热。而后当worker探测进去热key后,会将key发往etcd,dashboard也会监听热key信息,进行入库保留记录。同时,dashboard也能够手工增加、删除热key,供各个client端监听。

3.hotkey工程构造

3.3.2 client端

次要从上面三个方面来解析源码:

1.客户端启动器

1)启动形式

@PostConstructpublic void init() {    ClientStarter.Builder builder = new ClientStarter.Builder();    ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();    starter.startPipeline();}

appName:是这个利用的名称,个别为${spring.application.name}的值,后续所有的配置都以此为结尾

etcd:是etcd集群的地址,用逗号分隔,配置核心。

还能够看到ClientStarter实现了建造者模式,使代码更为简介。

2)外围入口
com.jd.platform.hotkey.client.ClientStarter#startPipeline

/** * 启动监听etcd */public void startPipeline() {    JdLogger.info(getClass(), "etcdServer:" + etcdServer);    //设置caffeine的最大容量    Context.CAFFEINE_SIZE = caffeineSize;    //设置etcd地址    EtcdConfigFactory.buildConfigCenter(etcdServer);    //开始定时推送    PushSchedulerStarter.startPusher(pushPeriod);    PushSchedulerStarter.startCountPusher(10);    //开启worker重连器    WorkerRetryConnector.retryConnectWorkers();    registEventBus();    EtcdStarter starter = new EtcdStarter();    //与etcd相干的监听都开启    starter.start();}

该办法次要有五个性能:

① 设置本地缓存(caffeine)的最大值,并创立etcd实例

//设置caffeine的最大容量Context.CAFFEINE_SIZE = caffeineSize;//设置etcd地址EtcdConfigFactory.buildConfigCenter(etcdServer);

caffeineSize是本地缓存的最大值,在启动的时候能够设置,不设置默认为200000。
etcdServer是下面说的etcd集群地址。

Context能够了解为一个配置类,外面就蕴含两个字段:

public class Context {    public static String APP_NAME;    public static int CAFFEINE_SIZE;}

EtcdConfigFactory是ectd配置核心的工厂类

public class EtcdConfigFactory {    private static IConfigCenter configCenter;    private EtcdConfigFactory() {}    public static IConfigCenter configCenter() {        return configCenter;    }    public static void buildConfigCenter(String etcdServer) {        //连贯多个时,逗号分隔        configCenter = JdEtcdBuilder.build(etcdServer);    }}

通过其configCenter()办法获取创立etcd实例对象,IConfigCenter接口封装了etcd实例对象的行为(包含根本的crud、监控、续约等)

② 创立并启动定时工作:PushSchedulerStarter

//开始定时推送PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待测keyPushSchedulerStarter.startCountPusher(10);//每10秒推送一次数量统计,不可配置

pushPeriod是推送的间隔时间,能够再启动的时候设置,最小为0.05s,推送越快,探测的越密集,会越快探测进去,但对client资源耗费相应增大

PushSchedulerStarter类

/**     * 每0.5秒推送一次待测key     */    public static void startPusher(Long period) {        if (period == null || period <= 0) {            period = 500L;        }        @SuppressWarnings("PMD.ThreadPoolCreationRule")        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));        scheduledExecutorService.scheduleAtFixedRate(() -> {            //热key的收集器            IKeyCollector<HotKeyModel, HotKeyModel> collectHK = KeyHandlerFactory.getCollector();            //这里相当于每0.5秒,通过netty来给worker来推送收集到的热key的信息,次要是一些热key的元数据信息(热key起源的app和key的类型和是否是删除事件,还有该热key的上报次数)            //这外面还有就是该热key在每次上报的时候都会生成一个全局的惟一id,还有该热key每次上报的创立工夫是在netty发送的时候来生成,同一批次的热key工夫是雷同的            List<HotKeyModel> hotKeyModels = collectHK.lockAndGetResult();            if(CollectionUtil.isNotEmpty(hotKeyModels)){                //积攒了半秒的key汇合,依照hash散发到不同的worker                KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);                collectHK.finishOnce();            }        },0, period, TimeUnit.MILLISECONDS);    }    /**     * 每10秒推送一次数量统计     */    public static void startCountPusher(Integer period) {        if (period == null || period <= 0) {            period = 10;        }        @SuppressWarnings("PMD.ThreadPoolCreationRule")        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));        scheduledExecutorService.scheduleAtFixedRate(() -> {            IKeyCollector<KeyHotModel, KeyCountModel> collectHK = KeyHandlerFactory.getCounter();            List<KeyCountModel> keyCountModels = collectHK.lockAndGetResult();            if(CollectionUtil.isNotEmpty(keyCountModels)){                //积攒了10秒的数量,依照hash散发到不同的worker                KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);                collectHK.finishOnce();            }        },0, period, TimeUnit.SECONDS);    }

从下面两个办法可知,都是通过定时线程池来实现定时工作的,都是守护线程。

咱们重点关注一下KeyHandlerFactory类,它是client端设计的一个比拟奇妙的中央,从类名上直译为key解决工厂。具体的实例对象是DefaultKeyHandler:

public class DefaultKeyHandler {    //推送HotKeyMsg音讯到Netty的推送者    private IKeyPusher iKeyPusher = new NettyKeyPusher();    //待测key的收集器,这外面蕴含两个map,key次要是热key的名字,value次要是热key的元数据信息(比方:热key起源的app和key的类型和是否是删除事件)    private IKeyCollector<HotKeyModel, HotKeyModel> iKeyCollector = new TurnKeyCollector();    //数量收集器,这外面蕴含两个map,这外面key是相应的规定,HitCount外面是这个规定的总拜访次数和热后拜访次数    private IKeyCollector<KeyHotModel, KeyCountModel> iKeyCounter = new TurnCountCollector();    public IKeyPusher keyPusher() {        return iKeyPusher;    }    public IKeyCollector<HotKeyModel, HotKeyModel> keyCollector() {        return iKeyCollector;    }    public IKeyCollector<KeyHotModel, KeyCountModel> keyCounter() {        return iKeyCounter;    }}

这外面有三个成员对象,别离是封装推送音讯到netty的NettyKeyPusher、待测key收集器TurnKeyCollector、数量收集器TurnCountCollector,其中后两者都实现了接口IKeyCollector,能对hotkey的解决起到无效的聚合,充分体现了代码的高内聚。
先来看看封装推送音讯到netty的NettyKeyPusher:

/** * 将msg推送到netty的pusher * @author wuweifeng wrote on 2020-01-06 * @version 1.0 */public class NettyKeyPusher implements IKeyPusher {    @Override    public void send(String appName, List<HotKeyModel> list) {        //积攒了半秒的key汇合,依照hash散发到不同的worker        long now = System.currentTimeMillis();        Map<Channel, List<HotKeyModel>> map = new HashMap<>();        for(HotKeyModel model : list) {            model.setCreateTime(now);            Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());            if (channel == null) {                continue;            }            List<HotKeyModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());            newList.add(model);        }        for (Channel channel : map.keySet()) {            try {                List<HotKeyModel> batch = map.get(channel);                HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);                hotKeyMsg.setHotKeyModels(batch);                channel.writeAndFlush(hotKeyMsg).sync();            } catch (Exception e) {                try {                    InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();                    JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());                } catch (Exception ex) {                    JdLogger.error(getClass(),"flush error");                }            }        }    }    @Override    public void sendCount(String appName, List<KeyCountModel> list) {        //积攒了10秒的数量,依照hash散发到不同的worker        long now = System.currentTimeMillis();        Map<Channel, List<KeyCountModel>> map = new HashMap<>();        for(KeyCountModel model : list) {            model.setCreateTime(now);            Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());            if (channel == null) {                continue;            }            List<KeyCountModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());            newList.add(model);        }        for (Channel channel : map.keySet()) {            try {                List<KeyCountModel> batch = map.get(channel);                HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);                hotKeyMsg.setKeyCountModels(batch);                channel.writeAndFlush(hotKeyMsg).sync();            } catch (Exception e) {                try {                    InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();                    JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());                } catch (Exception ex) {                    JdLogger.error(getClass(),"flush error");                }            }        }    }}

send(String appName, List list)
次要是将TurnKeyCollector收集的待测key通过netty推送给worker,HotKeyModel对象次要是一些热key的元数据信息(热key起源的app和key的类型和是否是删除事件,还有该热key的上报次数)

sendCount(String appName, List list)
次要是将TurnCountCollector收集的规定所对应的key通过netty推送给worker,KeyCountModel对象次要是一些key所对应的规定信息以及拜访次数等

WorkerInfoHolder.chooseChannel(model.getRuleKey())
依据hash算法获取key对应的服务器,散发到对应服务器相应的Channel 连贯,所以服务端能够程度有限扩容,毫无压力问题。

再来剖析一下key收集器:TurnKeyCollector与TurnCountCollector:
实现IKeyCollector接口:

/** * 对hotkey进行聚合 * @author wuweifeng wrote on 2020-01-06 * @version 1.0 */public interface IKeyCollector<T, V> {    /**     * 锁定后的返回值     */    List<V> lockAndGetResult();    /**     * 输出的参数     */    void collect(T t);    void finishOnce();}

lockAndGetResult()
次要是获取返回collect办法收集的信息,并将本地暂存的信息清空,不便下个统计周期积攒数据。

collect(T t)
顾名思义他是收集api调用的时候,将收集的到key信息放到本地存储。

finishOnce()
该办法目前实现都是空,无需关注。

待测key收集器:TurnKeyCollector

public class TurnKeyCollector implements IKeyCollector<HotKeyModel, HotKeyModel> {    //这map外面的key次要是热key的名字,value次要是热key的元数据信息(比方:热key起源的app和key的类型和是否是删除事件)    private ConcurrentHashMap<String, HotKeyModel> map0 = new ConcurrentHashMap<>();    private ConcurrentHashMap<String, HotKeyModel> map1 = new ConcurrentHashMap<>();    private AtomicLong atomicLong = new AtomicLong(0);    @Override    public List<HotKeyModel> lockAndGetResult() {        //自增后,对应的map就会进行被写入,期待被读取        atomicLong.addAndGet(1);        List<HotKeyModel> list;        //能够察看这里与collect办法外面的雷同地位,会发现一个是操作map0一个是操作map1,这样保障在读map的时候,不会阻塞写map,        //两个map同时提供轮流提供读写能力,设计的很奇妙,值得学习        if (atomicLong.get() % 2 == 0) {            list = get(map1);            map1.clear();        } else {            list = get(map0);            map0.clear();        }        return list;    }    private List<HotKeyModel> get(ConcurrentHashMap<String, HotKeyModel> map) {        return CollectionUtil.list(false, map.values());    }    @Override    public void collect(HotKeyModel hotKeyModel) {        String key = hotKeyModel.getKey();        if (StrUtil.isEmpty(key)) {            return;        }        if (atomicLong.get() % 2 == 0) {            //不存在时返回null并将key-value放入,已有雷同key时,返回该key对应的value,并且不笼罩            HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);            if (model != null) {                //减少该hotMey上报的次数                model.add(hotKeyModel.getCount());            }        } else {            HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);            if (model != null) {                model.add(hotKeyModel.getCount());            }        }    }    @Override    public void finishOnce() {}}

能够看到该类中有两个ConcurrentHashMap和一个AtomicLong,通过对AtomicLong来自增,而后对2取模,来别离管制两个map的读写能力,保障每个map都能做读写,并且同一个map不能同时读写,这样能够防止并发汇合读写不阻塞,这一块无锁化的设计还是十分奇妙的,极大的进步了收集的吞吐量。

key数量收集器:TurnCountCollector
这里的设计与TurnKeyCollector大同小异,咱们就不细谈了。值得一提的是它外面有个并行处理的机制,当收集的数量超过DATA_CONVERT_SWITCH_THRESHOLD=5000的阈值时,lockAndGetResult解决是应用java Stream并行流解决,晋升解决的效率。

③ 开启worker重连器

//开启worker重连器WorkerRetryConnector.retryConnectWorkers();public class WorkerRetryConnector {    /**     * 定时去重连没连上的workers     */    public static void retryConnectWorkers() {        @SuppressWarnings("PMD.ThreadPoolCreationRule")        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));        //开启拉取etcd的worker信息,如果拉取失败,则定时持续拉取        scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);    }    private static void reConnectWorkers() {        List<String> nonList = WorkerInfoHolder.getNonConnectedWorkers();        if (nonList.size() == 0) {            return;        }        JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);        NettyClient.getInstance().connect(nonList);//这里会触发netty连贯办法channelActive    }}

也是通过定时线程来执行,默认工夫距离是30s,不可设置。
通过WorkerInfoHolder来管制client的worker连贯信息,连贯信息是个List,用的CopyOnWriteArrayList,毕竟是一个读多写少的场景,相似与元数据信息。

/** * 保留worker的ip地址和Channel的映射关系,这是有序的。每次client发送音讯时,都会依据该map的size进行hash * 如key-1就发送到workerHolder的第1个Channel去,key-2就发到第2个Channel去 */private static final List<Server> WORKER_HOLDER = new CopyOnWriteArrayList<>();④ 注册EventBus事件订阅者private void registEventBus() {    //netty连接器会关注WorkerInfoChangeEvent事件    EventBusCenter.register(new WorkerChangeSubscriber());    //热key探测回调关注热key事件    EventBusCenter.register(new ReceiveNewKeySubscribe());    //Rule的变动的事件    EventBusCenter.register(new KeyRuleHolder());}

应用guava的EventBus事件音讯总线,利用公布/订阅者模式来对我的项目进行解耦。它能够利用很少的代码,来实现多组件间通信。

基本原理图如下:

监听worker信息变动:WorkerChangeSubscriber

/** * 监听worker信息变动 */@Subscribepublic void connectAll(WorkerInfoChangeEvent event) {    List<String> addresses = event.getAddresses();    if (addresses == null) {        addresses = new ArrayList<>();    }    WorkerInfoHolder.mergeAndConnectNew(addresses);}/** * 当client与worker的连贯断开后,删除 */@Subscribepublic void channelInactive(ChannelInactiveEvent inactiveEvent) {    //获取断线的channel    Channel channel = inactiveEvent.getChannel();    InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();    String address = socketAddress.getHostName() + ":" + socketAddress.getPort();    JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");    WorkerInfoHolder.dealChannelInactive(address);}

监听热key回调事件:ReceiveNewKeySubscribe

private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();@Subscribepublic void newKeyComing(ReceiveNewKeyEvent event) {    HotKeyModel hotKeyModel = event.getModel();    if (hotKeyModel == null) {        return;    }    //收到新key推送    if (receiveNewKeyListener != null) {        receiveNewKeyListener.newKey(hotKeyModel);    }}

该办法会收到新的热key订阅事件之后,会将其退出到KeyHandlerFactory的收集器外面解决。

外围解决逻辑:DefaultNewKeyListener#newKey:

@Overridepublic void newKey(HotKeyModel hotKeyModel) {    long now = System.currentTimeMillis();    //如果key达到时曾经过来1秒了,记录一下。手工删除key时,没有CreateTime    if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {        JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +                +now + " keyCreateAt " + hotKeyModel.getCreateTime());    }    if (hotKeyModel.isRemove()) {        //如果是删除事件,就间接删除        deleteKey(hotKeyModel.getKey());        return;    }    //曾经是热key了,又推过来同样的热key,做个日志记录,并刷新一下    if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {        JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);    }    addKey(hotKeyModel.getKey());}private void deleteKey(String key) {        CacheFactory.getNonNullCache(key).delete(key);}private void addKey(String key) {  ValueModel valueModel = ValueModel.defaultValue(key);  if (valueModel == null) {      //不合乎任何规定      deleteKey(key);      return;  }

//如果原来该key曾经存在了,那么value就被重置,过期工夫也会被重置。如果原来不存在,就新增的热key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
如果该HotKeyModel外面是删除事件,则获取RULE_CACHE_MAP外面该key超时工夫对应的caffeine,而后从中删除该key缓存,而后返回(这里相当于删除了本地缓存)。
如果不是删除事件,则在RULE_CACHE_MAP对应的caffeine缓存中增加该key的缓存。
这里有个留神点,如果不为删除事件,调用addKey()办法在caffeine减少缓存的时候,value是一个魔术值0x12fcf76,这个值只代表加了这个缓存,然而这个缓存在查问的时候相当于为null。
监听Rule的变动事件:KeyRuleHolder

能够看到外面有两个成员属性:RULE_CACHE_MAP,KEY_RULES

/** * 保留超时工夫和caffeine的映射,key是超时工夫,value是caffeine[(String,Object)] */private static final ConcurrentHashMap<Integer, LocalCache> RULE_CACHE_MAP = new ConcurrentHashMap<>();/** * 这里KEY_RULES是保留etcd外面该appName所对应的所有rule */private static final List<KeyRule> KEY_RULES = new ArrayList<>();

ConcurrentHashMap RULE_CACHE_MAP:

保留超时工夫和caffeine的映射,key是超时工夫,value是caffeine[(String,Object)]。
奇妙的设计:这里将key的过期工夫作为分桶策略,这样同一个过期工夫的key就会在一个桶(caffeine)外面,这外面每一个caffeine都是client的本地缓存,也就是说hotKey的本地缓存的KV实际上是存储在这外面的。
List KEY_RULES:

这里KEY_RULES是保留etcd外面该appName所对应的所有rule。
具体监听KeyRuleInfoChangeEvent事件办法:

@Subscribepublic void ruleChange(KeyRuleInfoChangeEvent event) {    JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());    List<KeyRule> ruleList = event.getKeyRules();    if (ruleList == null) {        return;    }    putRules(ruleList);}

外围解决逻辑:KeyRuleHolder#putRules:

/** * 所有的规定,如果规定的超时工夫变动了,会重建caffeine */public static void putRules(List<KeyRule> keyRules) {    synchronized (KEY_RULES) {        //如果规定为空,清空规定表        if (CollectionUtil.isEmpty(keyRules)) {            KEY_RULES.clear();            RULE_CACHE_MAP.clear();            return;        }        KEY_RULES.clear();        KEY_RULES.addAll(keyRules);        Set<Integer> durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());        for (Integer duration : RULE_CACHE_MAP.keySet()) {            //先革除掉那些在RULE_CACHE_MAP里存的,然而rule里已没有的            if (!durationSet.contains(duration)) {                RULE_CACHE_MAP.remove(duration);            }        }        //遍历所有的规定        for (KeyRule keyRule : keyRules) {            int duration = keyRule.getDuration();            //这里如果RULE_CACHE_MAP外面没有超时工夫为duration的value,则新建一个放入到RULE_CACHE_MAP外面            //比方RULE_CACHE_MAP原本就是空的,则在这里来构建RULE_CACHE_MAP的映射关系            //TODO 如果keyRules外面蕴含雷同duration的keyRule,则也只会建一个key为duration,value为caffeine,其中caffeine是(string,object)            if (RULE_CACHE_MAP.get(duration) == null) {                LocalCache cache = CacheFactory.build(duration);                RULE_CACHE_MAP.put(duration, cache);            }        }    }}

应用synchronized关键字来保障线程平安;
如果规定为空,清空规定表(RULE_CACHE_MAP、KEY_RULES);
应用传递进来的keyRules来笼罩KEY_RULES;
革除掉RULE_CACHE_MAP外面在keyRules没有的映射关系;
遍历所有的keyRules,如果RULE_CACHE_MAP外面没有相干的超时工夫key,则在外面赋值;
⑤ 启动EtcdStarter(etcd连贯管理器)

EtcdStarter starter = new EtcdStarter();//与etcd相干的监听都开启starter.start();public void start() {fetchWorkerInfo();fetchRule();startWatchRule();//监听热key事件,只监听手工增加、删除的keystartWatchHotKey();}

fetchWorkerInfo()

从etcd外面拉取worker集群地址信息allAddress,并更新WorkerInfoHolder外面的WORKER_HOLDER

/** * 每隔30秒拉取worker信息 */private void fetchWorkerInfo() {    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();    //开启拉取etcd的worker信息,如果拉取失败,则定时持续拉取    scheduledExecutorService.scheduleAtFixedRate(() -> {        JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");        fetch();    }, 0, 30, TimeUnit.SECONDS);}

应用定时线程池来执行,单线程。
定时从etcd外面获取,地址/jd/workers/+$appName或default,工夫距离不可设置,默认30秒,这外面存储的是worker地址的ip+port。
公布WorkerInfoChangeEvent事件。
备注:地址有$appName或default,在worker外面配置,如果把worker放到某个appName下,则该worker只会参加该app的计算。
fetchRule()

定时线程来执行,单线程,工夫距离不可设置,默认是5秒,当拉取规定配置和手动配置的hotKey胜利后,该线程被终止(也就是说只会胜利执行一次),执行失败继续执行

private void fetchRule() {    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();    //开启拉取etcd的worker信息,如果拉取失败,则定时持续拉取    scheduledExecutorService.scheduleAtFixedRate(() -> {        JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");        boolean success = fetchRuleFromEtcd();        if (success) {            //拉取已存在的热key            fetchExistHotKey();            //这里如果拉取规定和拉取手动配置的hotKey胜利之后,则该定时执行线程进行            scheduledExecutorService.shutdown();        }    }, 0, 5, TimeUnit.SECONDS);}

fetchRuleFromEtcd()

从etcd外面获取该appName配置的rule规定,地址/jd/rules/+$appName。
如果查出来规定rules为空,会通过公布KeyRuleInfoChangeEvent事件来清空本地的rule配置缓存和所有的规定key缓存。
公布KeyRuleInfoChangeEvent事件。
fetchExistHotKey()

从etcd外面获取该appName手动配置的热key,地址/jd/hotkeys/+$appName。
公布ReceiveNewKeyEvent事件,并且内容HotKeyModel不是删除事件。
startWatchRule()

/** * 异步监听rule规定变动 */private void startWatchRule() {    ExecutorService executorService = Executors.newSingleThreadExecutor();    executorService.submit(() -> {        JdLogger.info(getClass(), "--- begin watch rule change ----");        try {            IConfigCenter configCenter = EtcdConfigFactory.configCenter();            KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);            //如果有新事件,即rule的变更,就从新拉取所有的信息            while (watchIterator.hasNext()) {                //这句必须写,next会让他卡住,除非真的有新rule变更                WatchUpdate watchUpdate = watchIterator.next();                List<Event> eventList = watchUpdate.getEvents();                JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);                //全量拉取rule信息                fetchRuleFromEtcd();            }        } catch (Exception e) {            JdLogger.error(getClass(), "watch err");        }    });}

异步监听rule规定变动,应用etcd监听地址为/jd/rules/+$appName的节点变动。
应用线程池,单线程,异步监听rule规定变动,如果有事件变动,则调用fetchRuleFromEtcd()办法。
startWatchHotKey()
异步开始监听热key变动信息,应用etcd监听地址前缀为/jd/hotkeys/+$appName

/** * 异步开始监听热key变动信息,该目录里只有手工增加的key信息 */private void startWatchHotKey() {    ExecutorService executorService = Executors.newSingleThreadExecutor();    executorService.submit(() -> {        JdLogger.info(getClass(), "--- begin watch hotKey change ----");        IConfigCenter configCenter = EtcdConfigFactory.configCenter();        try {            KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);            //如果有新事件,即新key产生或删除            while (watchIterator.hasNext()) {                WatchUpdate watchUpdate = watchIterator.next();                List<Event> eventList = watchUpdate.getEvents();                KeyValue keyValue = eventList.get(0).getKv();                Event.EventType eventType = eventList.get(0).getType();                try {                    //从这个中央能够看出,etcd给的返回是节点的全门路,而咱们须要的key要去掉前缀                    String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");                    //如果是删除key,就立即删除                    if (Event.EventType.DELETE == eventType) {                        HotKeyModel model = new HotKeyModel();                        model.setRemove(true);                        model.setKey(key);                        EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));                    } else {                        HotKeyModel model = new HotKeyModel();                        model.setRemove(false);                        String value = keyValue.getValue().toStringUtf8();                        //新增热key                        JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);                        //如果这是一个删除指令,就什么也不干                        //TODO 这里有个疑难,监听到worker主动探测收回的惰性删除指令,这里之间跳过了,然而本地缓存没有更新吧?                        //TODO 所以我猜想在客户端应用判断缓存是否存在的api外面,应该会判断相干缓存的value值是否为"#[DELETE]#"删除标记                        //解疑:这里的确只监听手工配置的hotKey,etcd的/jd/hotkeys/+$appName该地址只是手动配置hotKey,worker主动探测的hotKey是间接通过netty通道来告知client的                        if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {                            continue;                        }                        //手工创立的value是工夫戳                        model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));                        model.setKey(key);                        EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));                    }                } catch (Exception e) {                    JdLogger.error(getClass(), "new key err :" + keyValue);                }            }        } catch (Exception e) {            JdLogger.error(getClass(), "watch err");        }    });}

应用线程池,单线程,异步监听热key变动
应用etcd监听前缀地址的以后节点以及子节点的所有变动值
删除节点动作
公布ReceiveNewKeyEvent事件,并且内容HotKeyModel是删除事件
新增or更新节点动作
事件变动的value值为删除标记#[DELETE]#
如果是删除标记的话,代表是worker主动探测或者client须要删除的指令。
如果是删除标记则什么也不做,间接跳过(这里从HotKeyPusher#push办法能够看到,做删除事件的操作时候,他会给/jd/hotkeys/+$appName的节点外面减少一个值为删除标记的节点,而后再删除雷同门路的节点,这样就能够触发下面的删除节点事件,所以这里判断如果是删除标记间接跳过)。
不为删除标记
公布ReceiveNewKeyEvent事件,事件内容HotKeyModel外面的createTime是kv对应的工夫戳
疑难: 这里代码正文外面说只监听手工增加或者删除的hotKey,难道说/jd/hotkeys/+$appName地址只是手工配置的地址吗?

解疑: 这里的确只监听手工配置的hotKey,etcd的/jd/hotkeys/+$appName该地址只是手动配置hotKey,worker主动探测的hotKey是间接通过netty通道来告知client的

5.API解析

1)流程图示
① 查问流程

② 删除流程:

从下面的流程图中,大家应该晓得该热点key在代码中是如何扭转的,这里再给大家解说一下外围API的源码解析,限于篇幅的起因,咱们不一个个贴相干源码了,只是单纯的通知你它的外部逻辑是怎么样的。

2)外围类:JdHotKeyStore

JdHotKeyStore是封装client调用的api外围类,蕴含下面10个公共办法,咱们重点解析其中6个重要办法:

① isHotKey(String key)
判断是否在规定内,如果不在,返回false
判断是否是热key,如果不是或者是且过期工夫在2s内,则给TurnKeyCollector#collect收集
最初给TurnCountCollector#collect做统计收集

② get(String key)
从本地caffeine取值
如果取到的value是个魔术值,只代表退出到caffeine缓存外面了,查问的话为null

③ smartSet(String key, Object value)
判断是否是热key,这里不论它在不在规定内,如果是热key,则给value赋值,如果不为热key什么也不做

④ forceSet(String key, Object value)
强制给value赋值
如果该key不在规定配置内,则传递的value不失效,本地缓存的赋值value会被变为null

⑤ getValue(String key, KeyType keyType)
获取value,如果value不存在则调用HotKeyPusher#push办法发往netty
如果没有为该key配置规定,就不必上报key,间接返回null
如果取到的value是个魔术值,只代表退出到caffeine缓存外面了,查问的话为null

⑥ remove(String key)
删除某key(本地的caffeine缓存),会告诉整个集群删除(通过etcd来告诉集群删除)

3)client上传热key入口调用类:HotKeyPusher
外围办法:

public static void push(String key, KeyType keyType, int count, boolean remove) {    if (count <= 0) {        count = 1;    }    if (keyType == null) {        keyType = KeyType.REDIS_KEY;    }    if (key == null) {        return;    }    //这里之所以用LongAdder是为了保障多线程计数的线程安全性,尽管这里是在办法内调用的,然而在TurnKeyCollector的两个map外面,    //存储了HotKeyModel的实例对象,这样在多个线程同时批改count的计数属性时,会存在线程平安计数不精确问题    LongAdder adderCnt = new LongAdder();    adderCnt.add(count);    HotKeyModel hotKeyModel = new HotKeyModel();    hotKeyModel.setAppName(Context.APP_NAME);    hotKeyModel.setKeyType(keyType);    hotKeyModel.setCount(adderCnt);    hotKeyModel.setRemove(remove);    hotKeyModel.setKey(key);    if (remove) {        //如果是删除key,就间接发到etcd去,不必做聚合。然而有点问题当初,这个删除只能删手工增加的key,不能删worker探测进去的        //因为各个client都在监听手工增加的那个path,没监听主动探测的path。所以如果手工的那个path下,没有该key,那么是删除不了的。        //删不了,就达不到集群监听删除事件的成果,怎么办呢?能够通过新增的形式,新增一个热key,而后删除它        //TODO 这里为啥不间接删除该节点,难道worker主动探测解决的hotKey不会往该节点减少新增事件吗?        //释疑:worker依据探测配置的规定,当判断出某个key为hotKey后,的确不会往keyPath外面退出节点,他只是单纯的往本地缓存外面退出一个空值,代表是热点key        EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);        EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 这里很奇妙待补充形容        //也删worker探测的目录        EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));    } else {        //如果key是规定内的要被探测的key,就积攒期待传送        if (KeyRuleHolder.isKeyInRule(key)) {            //积攒起来,期待每半秒发送一次            KeyHandlerFactory.getCollector().collect(hotKeyModel);        }    }}

从下面的源码中可知:

这里之所以用LongAdder是为了保障多线程计数的线程安全性,尽管这里是在办法内调用的,然而在TurnKeyCollector的两个map外面,存储了HotKeyModel的实例对象,这样在多个线程同时批改count的计数属性时,会存在线程平安计数不精确问题。
如果是remove删除类型,在删除手动配置的热key配置门路的同时,还会删除dashboard展现热key的配置门路。
只有在规定配置的key,才会被积攒探测发送到worker内进行计算。
3.通信机制(与worker交互)

1)NettyClient:netty连接器

public class NettyClient {    private static final NettyClient nettyClient = new NettyClient();    private Bootstrap bootstrap;    public static NettyClient getInstance() {        return nettyClient;    }    private NettyClient() {        if (bootstrap == null) {            bootstrap = initBootstrap();        }    }    private Bootstrap initBootstrap() {        //少线程        EventLoopGroup group = new NioEventLoopGroup(2);        Bootstrap bootstrap = new Bootstrap();        NettyClientHandler nettyClientHandler = new NettyClientHandler();        bootstrap.group(group).channel(NioSocketChannel.class)                .option(ChannelOption.SO_KEEPALIVE, true)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel ch) {                        ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());                        ch.pipeline()                                .addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//这里就是定义TCP多个包之间的分隔符,为了更好的做拆包                                .addLast(new MsgDecoder())                                .addLast(new MsgEncoder())                                //30秒没音讯时,就发心跳包过来                                .addLast(new IdleStateHandler(0, 0, 30))                                .addLast(nettyClientHandler);                    }                });        return bootstrap;    }}

应用Reactor线程模型,只有2个工作线程,没有独自设置主线程
长连贯,开启TCP_NODELAY
netty的分隔符”$( )$”,相似TCP报文分段的规范,不便拆包
Protobuf序列化与反序列化
30s没有音讯发给对端的时候,发送一个心跳包判活
工作线程处理器NettyClientHandler
JDhotkey的tcp协定设计就是收发字符串,每个tcp音讯包应用特殊字符$( )$来宰割
长处:这样实现非常简单。

取得音讯包后进行json或者protobuf反序列化。

毛病:是须要,从字节流-》反序列化成字符串-》反序列化成音讯对象,两层序列化损耗了一部分性能。

protobuf还好序列化很快,然而json序列化的速度只有几十万每秒,会损耗一部分性能。

2)NettyClientHandler:工作线程处理器

@ChannelHandler.Sharablepublic class NettyClientHandler extends SimpleChannelInboundHandler<HotKeyMsg> {    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;            //这里示意如果读写都挂了            if (idleStateEvent.state() == IdleState.ALL_IDLE) {                //向服务端发送音讯                ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));            }        }        super.userEventTriggered(ctx, evt);    }    //在Channel注册EventLoop、绑定SocketAddress和连贯ChannelFuture的时候都有可能会触发ChannelInboundHandler的channelActive办法的调用    //相似TCP三次握手胜利之后触发    @Override    public void channelActive(ChannelHandlerContext ctx) {        JdLogger.info(getClass(), "channelActive:" + ctx.name());        ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));    }    //相似TCP四次挥手之后,期待2MSL工夫之后触发(大略180s),比方channel通道敞开会触发(channel.close())    //客户端channel被动敞开连贯时,会向服务端发送一个写申请,而后服务端channel所在的selector会监听到一个OP_READ事件,而后    //执行数据读取操作,而读取时发现客户端channel曾经敞开了,则读取数据字节个数返回-1,而后执行close操作,敞开该channel对应的底层socket,    //并在pipeline中,从head开始,往下将InboundHandler,并触发handler的channelInactive和channelUnregistered办法的执行,以及移除pipeline中的handlers一系列操作。    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        super.channelInactive(ctx);        //断线了,可能只是client和server断了,但都和etcd没断。也可能是client本人断网了,也可能是server断了        //公布断线事件。后续10秒后进行重连,依据etcd里的worker信息来决定是否重连,如果etcd里没了,就不重连。如果etcd里有,就重连        notifyWorkerChange(ctx.channel());    }    private void notifyWorkerChange(Channel channel) {        EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {        if (MessageType.PONG == msg.getMessageType()) {            JdLogger.info(getClass(), "heart beat");            return;        }        if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {            JdLogger.info(getClass(), "receive new key : " + msg);            if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {                return;            }            for (HotKeyModel model : msg.getHotKeyModels()) {                EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));            }        }    }}

userEventTriggered

收到对端发来的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)
channelActive

在Channel注册EventLoop、绑定SocketAddress和连贯ChannelFuture的时候都有可能会触发ChannelInboundHandler的channelActive办法的调用
相似TCP三次握手胜利之后触发,给对端发送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)
channelInactive

相似TCP四次挥手之后,期待2MSL工夫之后触发(大略180s),比方channel通道敞开会触发(channel.close())该办法,公布ChannelInactiveEvent事件,来10s后重连
channelRead0

接管PONG音讯类型时,打个日志返回
接管RESPONSE_NEW_KEY音讯类型时,公布ReceiveNewKeyEvent事件
3.3.3 worker端

1.入口启动加载:7个@PostConstruct

1)worker端对etcd相干的解决:EtcdStarter
① 第一个@PostConstruct:watchLog()

@PostConstructpublic void watchLog() {    AsyncPool.asyncDo(() -> {        try {            //取etcd的是否开启日志配置,地址/jd/logOn            String loggerOn = configCenter.get(ConfigConstant.logToggle);            LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);        } catch (StatusRuntimeException ex) {            logger.error(ETCD_DOWN);        }        //监听etcd地址/jd/logOn是否开启日志配置,并实时更改开关        KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);        while (watchIterator.hasNext()) {            WatchUpdate watchUpdate = watchIterator.next();            List<Event> eventList = watchUpdate.getEvents();            KeyValue keyValue = eventList.get(0).getKv();            logger.info("log toggle changed : " + keyValue);            String value = keyValue.getValue().toStringUtf8();            LOGGER_ON = "true".equals(value) || "1".equals(value);        }    });}

放到线程池外面异步执行
取etcd的是否开启日志配置,地址/jd/logOn,默认true
监听etcd地址/jd/logOn是否开启日志配置,并实时更改开关
因为有etcd的监听,所以会始终执行,而不是执行一次完结
② 第二个@PostConstruct:watch()

/** * 启动回调监听器,监听rule变动 */@PostConstructpublic void watch() {    AsyncPool.asyncDo(() -> {        KvClient.WatchIterator watchIterator;        if (isForSingle()) {            watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);        } else {                       watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);        }        while (watchIterator.hasNext()) {            WatchUpdate watchUpdate = watchIterator.next();            List<Event> eventList = watchUpdate.getEvents();            KeyValue keyValue = eventList.get(0).getKv();            logger.info("rule changed : " + keyValue);            try {                ruleChange(keyValue);            } catch (Exception e) {                e.printStackTrace();            }        }    });}/**     * rule发生变化时,更新缓存的rule     */    private synchronized void ruleChange(KeyValue keyValue) {        String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");        if (StrUtil.isEmpty(appName)) {            return;        }        String ruleJson = keyValue.getValue().toStringUtf8();        List<KeyRule> keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);        KeyRuleHolder.put(appName, keyRules);    }

通过etcd.workerPath配置,来判断该worker是否为某个app独自服务的,默认为”default”,如果是默认值,代表该worker参加在etcd上所有app client的计算,否则只为某个app来服务计算

应用etcd来监听rule规定变动,如果是共享的worker,监听地址前缀为”/jd/rules/“,如果为某个app独享,监听地址为”/jd/rules/“+$etcd.workerPath

如果规定变动,则批改对应app在本地存储的rule缓存,同时清理该app在本地存储的KV缓存

KeyRuleHolder:rule缓存本地存储

Map> RULE_MAP,这个map是concurrentHashMap,map的kv别离是appName和对应的rule
绝对于client的KeyRuleHolder的区别:worker是存储所有app规定,每个app对应一个规定桶,所以用map
CaffeineCacheHolder:key缓存本地存储

Map> CACHE_MAP,也是concurrentHashMap,map的kv别离是appName和对应的kv的caffeine
绝对于client的caffeine,第一是worker没有做缓存接口比方LocalCache,第二是client的map的kv别离是超时工夫、以及雷同超时工夫所对应key的缓存桶
放到线程池外面异步执行,因为有etcd的监听,所以会始终执行,而不是执行一次完结

③ 第三个@PostConstruct:watchWhiteList()

/** * 启动回调监听器,监听白名单变动,只监听本人所在的app,白名单key不参加热key计算,间接疏忽 */@PostConstructpublic void watchWhiteList() {    AsyncPool.asyncDo(() -> {        //从etcd配置中获取所有白名单        fetchWhite();        KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);        while (watchIterator.hasNext()) {            WatchUpdate watchUpdate = watchIterator.next();            logger.info("whiteList changed ");            try {                fetchWhite();            } catch (Exception e) {                e.printStackTrace();            }        }    });}

拉取并监听etcd白名单key配置,地址为/jd/whiteList/+$etcd.workerPath
在白名单的key,不参加热key计算,间接疏忽
放到线程池外面异步执行,因为有etcd的监听,所以会始终执行,而不是执行一次完结
④ 第四个@PostConstruct:makeSureSelfOn()

/** * 每隔一会去check一下,本人还在不在etcd里 */@PostConstructpublic void makeSureSelfOn() {    //开启上传worker信息    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();    scheduledExecutorService.scheduleAtFixedRate(() -> {        try {            if (canUpload) {                uploadSelfInfo();            }        } catch (Exception e) {            //do nothing        }    }, 0, 5, TimeUnit.SECONDS);}

在线程池外面异步执行,定时执行,工夫距离为5s
将本机woker的hostName,ip+port以kv的模式定时上报给etcd,地址为/jd/workers/+$etcd.workPath+”/“+$hostName,续期工夫为8s
有一个canUpload的开关来管制worker是否向etcd来定时续期,如果这个开关敞开了,代表worker不向etcd来续期,这样当下面地址的kv到期之后,etcd会删除该节点,这样client循环判断worker信息变动了
2)将热key推送到dashboard供入库:DashboardPusher

① 第五个@PostConstruct:uploadToDashboard()

@Componentpublic class DashboardPusher implements IPusher {    /**     * 热key集中营     */    private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();    @PostConstruct    public void uploadToDashboard() {        AsyncPool.asyncDo(() -> {            while (true) {                try {                    //要么key达到1千个,要么达到1秒,就汇总上报给etcd一次                    List<HotKeyModel> tempModels = new ArrayList<>();                    Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);                    if (CollectionUtil.isEmpty(tempModels)) {                        continue;                    }                    //将热key推到dashboard                    DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));                } catch (Exception e) {                    e.printStackTrace();                }            }        });    }}

当热key的数量达到1000或者每隔1s,把热key的数据通过与dashboard的netty通道来发送给dashboard,数据类型为REQUEST_HOT_KEY
LinkedBlockingQueue hotKeyStoreQueue:worker计算的给dashboard热key的集中营,所有给dashboard推送热key存储在外面
3)推送到各客户端服务器:AppServerPusher

① 第六个@PostConstruct:batchPushToClient()

public class AppServerPusher implements IPusher {    /**     * 热key集中营     */    private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();    /**     * 和dashboard那边的推送次要区别在于,给app推送每10ms一次,dashboard那边1s一次     */    @PostConstruct    public void batchPushToClient() {        AsyncPool.asyncDo(() -> {            while (true) {                try {                    List<HotKeyModel> tempModels = new ArrayList<>();                    //每10ms推送一次                    Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);                    if (CollectionUtil.isEmpty(tempModels)) {                        continue;                    }                    Map<String, List<HotKeyModel>> allAppHotKeyModels = new HashMap<>();                    //拆分出每个app的热key汇合,按app分堆                    for (HotKeyModel hotKeyModel : tempModels) {                        List<HotKeyModel> oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());                        oneAppModels.add(hotKeyModel);                    }                    //遍历所有app,进行推送                    for (AppInfo appInfo : ClientInfoHolder.apps) {                        List<HotKeyModel> list = allAppHotKeyModels.get(appInfo.getAppName());                        if (CollectionUtil.isEmpty(list)) {                            continue;                        }                        HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);                        hotKeyMsg.setHotKeyModels(list);                        //整个app全副发送                        appInfo.groupPush(hotKeyMsg);                    }                    //推送完,及时清理不应用内存                    allAppHotKeyModels = null;                } catch (Exception e) {                    e.printStackTrace();                }            }        });    }}

会依照key的appName来进行分组,而后通过对应app的channelGroup来推送
当热key的数量达到10或者每隔10ms,把热key的数据通过与app的netty通道来发送给app,数据类型为RESPONSE_NEW_KEY
LinkedBlockingQueue hotKeyStoreQueue:worker计算的给client热key的集中营,所有给client推送热key存储在外面
4)client实例节点解决:NodesServerStarter
① 第七个@PostConstruct:start()

public class NodesServerStarter {    @Value("${netty.port}")    private int port;    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private IClientChangeListener iClientChangeListener;    @Resource    private List<INettyMsgFilter> messageFilters;    @PostConstruct    public void start() {        AsyncPool.asyncDo(() -> {            logger.info("netty server is starting");            NodesServer nodesServer = new NodesServer();            nodesServer.setClientChangeListener(iClientChangeListener);            nodesServer.setMessageFilters(messageFilters);            try {                nodesServer.startNettyServer(port);            } catch (Exception e) {                e.printStackTrace();            }        });    }}

线程池外面异步执行,启动client端的nettyServer
iClientChangeListener和messageFilters这两个依赖最终会被传递到netty音讯处理器外面,iClientChangeListener会作为channel下线解决来删除ClientInfoHolder下线或者超时的通道,messageFilters会作为netty收到事件音讯的解决过滤器(责任链模式)
② 依赖的bean:IClientChangeListener iClientChangeListener

public interface IClientChangeListener {    /**     * 发现新连贯     */    void newClient(String appName, String channelId, ChannelHandlerContext ctx);    /**     * 客户端掉线     */    void loseClient(ChannelHandlerContext ctx);}

对客户端的治理,新来(newClient)(会触发netty的连贯办法channelActive)、断线(loseClient)(会触发netty的断连办法channelInactive())的治理
client的连贯信息次要是在ClientInfoHolder外面

List apps,这外面的AppInfo次要是appName和对应的channelGroup
对apps的add和remove次要是通过新来(newClient)、断线(loseClient)
③ 依赖的bean:List messageFilters

/** * 对netty来的音讯,进行过滤解决 * @author wuweifeng wrote on 2019-12-11 * @version 1.0 */public interface INettyMsgFilter {    boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);}

对client发给worker的netty音讯,进行过滤解决,共有四个实现类,也就是说底下四个过滤器都是收到client发送的netty音讯来做解决

④ 各个音讯解决的类型:MessageType

APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), //命中率
REQUEST_HOT_KEY((byte) 8), //热key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);
程序1:HeartBeatFilter

当音讯类型为PING,则给对应的client示例返回PONG
程序2:AppNameFilter

当音讯类型为APP_NAME,代表client与worker建设连贯胜利,而后调用iClientChangeListener的newClient办法减少apps元数据信息
程序3:HotKeyFilter

解决接管音讯类型为REQUEST_NEW_KEY
先给HotKeyFilter.totalReceiveKeyCount原子类增1,该原子类代表worker实例接管到的key的总数
publishMsg办法,将音讯通过自建的生产者消费者模型(KeyProducer,KeyConsumer),来把音讯给发到生产者中散发生产
接管到的音讯HotKeyMsg外面List
首先判断HotKeyModel外面的key是否在白名单内,如果在则跳过,否则将HotKeyModel通过KeyProducer发送
程序4:KeyCounterFilter

解决接管类型为REQUEST_HIT_COUNT
这个过滤器是专门给dashboard来汇算key的,所以这个appName间接设置为该worker配置的appName
该过滤器的数据起源都是client的NettyKeyPusher#sendCount(String appName, List list),这外面的数据都是默认积攒10s的,这个10s是能够配置的,这一点在client外面有讲
将结构的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞队列LinkedBlockingQueue COUNTER_QUEUE中,而后让CounterConsumer来生产解决,生产逻辑是单线程的
CounterConsumer:热key统计消费者
放在公共线程池中,来单线程执行
从阻塞队列COUNTER_QUEUE外面取数据,而后将外面的key的统计数据公布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()外面,该门路是worker服务的client集群或者default,用来寄存客户端hotKey拜访次数和总拜访次数的path,而后让dashboard来订阅统计展现
2.三个定时工作:3个@Scheduled

1)定时工作1:EtcdStarter#pullRules()

/** * 每隔1分钟拉取一次,所有的app的rule */@Scheduled(fixedRate = 60000)public void pullRules() {    try {        if (isForSingle()) {            String value = configCenter.get(ConfigConstant.rulePath + workerPath);            if (!StrUtil.isEmpty(value)) {                List<KeyRule> keyRules = FastJsonUtils.toList(value, KeyRule.class);                KeyRuleHolder.put(workerPath, keyRules);            }        } else {            List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.rulePath);            for (KeyValue keyValue : keyValues) {                ruleChange(keyValue);            }        }    } catch (StatusRuntimeException ex) {        logger.error(ETCD_DOWN);    }}

每隔1分钟拉取一次etcd地址为/jd/rules/的规定变动,如果worker所服务的app或者default的rule有变动,则更新规定的缓存,并清空该appName所对应的本地key缓存

2)定时工作2:EtcdStarter#uploadClientCount()

/**     * 每隔10秒上传一下client的数量到etcd中     */    @Scheduled(fixedRate = 10000)    public void uploadClientCount() {        try {            String ip = IpUtils.getIp();            for (AppInfo appInfo : ClientInfoHolder.apps) {                String appName = appInfo.getAppName();                int count = appInfo.size();                //即使是full gc也不能超过3秒,因为这里给的过期工夫是13s,因为该定时工作每隔10s执行一次,如果full gc或者说上报给etcd的工夫超过3s,                //则在dashboard查问不到client的数量                configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);            }            configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);            //上报每秒QPS(接管key数量、解决key数量)            String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));            configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);            logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);            //如果是稳固始终有key发送的利用,倡议开启该监控,以防止可能产生的网络故障            if (openMonitor) {                checkReceiveKeyCount();            }//            configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);        } catch (Exception ex) {            logger.error(ETCD_DOWN);        }    }

每个10s将worker计算存储的client信息上报给etcd,来不便dashboard来查问展现,比方/jd/count/对应client数量,/jd/caffeineSize/对应caffeine缓存的大小,/jd/totalKeyCount/对应该worker接管的key总量和解决的key总量
能够从代码中看到,下面所有etcd的节点租期工夫都是13s,而该定时工作是每10s执行一次,意味着如果full gc或者说上报给etcd的工夫超过3s,则在dashboard查问不到client的相干汇算信息
长时间不收到key,判断网络状态不好,断开worker给etcd地址为/jd/workers/+$workerPath节点的续租,因为client会循环判断该地址的节点是否变动,使得client从新连贯worker或者断开失联的worker
3)定时工作3:EtcdStarter#fetchDashboardIp()

/** * 每隔30秒去获取一下dashboard的地址 */@Scheduled(fixedRate = 30000)public void fetchDashboardIp() {    try {        //获取DashboardIp        List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);        //是空,给个正告        if (CollectionUtil.isEmpty(keyValues)) {            logger.warn("very important warn !!! Dashboard ip is null!!!");            return;        }        String dashboardIp = keyValues.get(0).getValue().toStringUtf8();        NettyClient.getInstance().connect(dashboardIp);    } catch (Exception e) {        e.printStackTrace();    }}

每隔30s拉取一次etcd前缀为/jd/dashboard/的dashboard连贯ip的值,并且判断DashboardHolder.hasConnected外面是否为未连贯状态,如果是则从新连贯worker与dashboard的netty通道

3.自建的生产者消费者模型(KeyProducer,KeyConsumer)

个别生产者消费者模型蕴含三大元素:生产者、消费者、音讯存储队列
这里音讯存储队列是DispatcherConfig外面的QUEUE,应用LinkedBlockingQueue,默认大小为200W

1)KeyProducer

@Componentpublic class KeyProducer {    public void push(HotKeyModel model, long now) {        if (model == null || model.getKey() == null) {            return;        }        //5秒前的过期音讯就不解决了        if (now - model.getCreateTime() > InitConstant.timeOut) {            expireTotalCount.increment();            return;        }        try {            QUEUE.put(model);            totalOfferCount.increment();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

判断接管到的HotKeyModel是否超出”netty.timeOut”配置的工夫,如果是将expireTotalCount纪录过期总数给自增,而后返回

2)KeyConsumer

public class KeyConsumer {

private IKeyListener iKeyListener;public void setKeyListener(IKeyListener iKeyListener) {    this.iKeyListener = iKeyListener;}public void beginConsume() {    while (true) {        try {            //从这里能够看出,这里的生产者消费者模型,实质上还是拉模式,之所以不应用EventBus,是因为须要队列来做缓冲            HotKeyModel model = QUEUE.take();            if (model.isRemove()) {                iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);            } else {                iKeyListener.newKey(model, KeyEventOriginal.CLIENT);            }            //处理完毕,将数量加1            totalDealCount.increment();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

}

@Overridepublic void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {   //cache里的key,appName+keyType+key   String key = buildKey(hotKeyModel);   hotCache.invalidate(key);   CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);   //推送所有client删除   hotKeyModel.setCreateTime(SystemClock.now());   logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());   for (IPusher pusher : iPushers) {       //这里能够看到,删除热key的netty音讯只给client端发了过来,没有给dashboard发过来(DashboardPusher外面的remove是个空办法)       pusher.remove(hotKeyModel);   }}    @Override    public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {        //cache里的key        String key = buildKey(hotKeyModel);        //判断是不是刚热不久        //hotCache对应的caffeine有效期为5s,也就是说该key会保留5s,在5s内不反复解决雷同的hotKey。        //毕竟hotKey都是刹时流量,能够防止在这5s内反复推送给client和dashboard,防止有效的网络开销        Object o = hotCache.getIfPresent(key);        if (o != null) {            return;        }        //********** watch here ************//        //该办法会被InitConstant.threadCount个线程同时调用,存在多线程问题        //上面的那句addCount是加了锁的,代表给Key累加数量时是原子性的,不会产生多加、少加的状况,到了设定的阈值肯定会hot        //譬如阈值是2,如果多个线程累加,在没hot前,hot的状态必定是对的,譬如thread1 加1,thread2加1,那么thread2会hot返回true,开启推送        //然而极其状况下,譬如阈值是10,以后是9,thread1走到这里时,加1,返回true,thread2也走到这里,加1,此时是11,返回true,问题来了        //该key会走上面的else两次,也就是2次推送。        //所以呈现问题的起因是hotCache.getIfPresent(key)这一句在并发状况下,没return掉,放了两个key+1到addCount这一步时,会有问题        //测试代码在TestBlockQueue类,间接运行能够看到会同时hot        //那么该问题用解决吗,NO,不须要解决,1 首先要产生的条件极其刻薄,很难触发,以京东这样高的并发量,线上我也没见过触发间断2次推送同一个key的        //2 即使触发了,结果也是能够承受的,2次推送而已,毫无影响,客户端无感知。然而如果非要解决,就要对slidingWindow实例加锁了,必然有一些开销        //所以只有保障key数量不多计算就能够,少计算了没事。因为热key必然频率高,漏计几次没事。但非热key,多计算了,被干成了热key就不对了        SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//从这里可知,每个app的每个key都会对应一个滑动窗口        //看看hot没        boolean hot = slidingWindow.addCount(hotKeyModel.getCount());        if (!hot) {            //如果没hot,从新put,cache会主动刷新过期工夫            CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);        } else {            //这里之所以放入的value为1,是因为hotCache是用来专门存储刚生成的hotKey            //hotCache对应的caffeine有效期为5s,也就是说该key会保留5s,在5s内不反复解决雷同的hotKey。            //毕竟hotKey都是刹时流量,能够防止在这5s内反复推送给client和dashboard,防止有效的网络开销            hotCache.put(key, 1);            //删掉该key            //这个key从实际上是专门针对slidingWindow的key,他的组合逻辑是appName+keyType+key,而不是给client和dashboard推送的hotKey            CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);            //开启推送            hotKeyModel.setCreateTime(SystemClock.now());            //当开关关上时,打印日志。大促时敞开日志,就不打印了            if (EtcdStarter.LOGGER_ON) {                logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());            }            //别离推送到各client和etcd            for (IPusher pusher : iPushers) {                pusher.push(hotKeyModel);            }        }    }

“thread.count”配置即为消费者个数,多个消费者独特生产一个QUEUE队列
生产者消费者模型,实质上还是拉模式,之所以不应用EventBus,是因为须要队列来做缓冲
依据HotKeyModel外面是否是删除音讯类型

删除音讯类型
依据HotKeyModel外面的appName+keyType+key的名字,来构建caffeine外面的newkey,该newkey在caffeine外面次要是用来与slidingWindow滑动工夫窗对应
删除hotCache外面newkey的缓存,放入的缓存kv别离是newKey和1,hotCache作用是用来存储该生成的热key,hotCache对应的caffeine有效期为5s,也就是说该key会保留5s,在5s内不反复解决雷同的hotKey。毕竟hotKey都是刹时流量,能够防止在这5s内反复推送给client和dashboard,防止有效的网络开销
删除CaffeineCacheHolder外面对应appName的caffeine外面的newKey,这外面存储的是slidingWindow滑动窗口
推送给该HotKeyModel对应的所有client实例,用来让client删除该HotKeyModel
非删除音讯类型
依据HotKeyModel外面的appName+keyType+key的名字,来构建caffeine外面的newkey,该newkey在caffeine外面次要是用来与slidingWindow滑动工夫窗对应
通过hotCache来判断该newkey是否刚热不久,如果是则返回
依据滑动工夫窗口来计算判断该key是否为hotKey(这里能够学习一下滑动工夫窗口的设计),并返回或者生成该newKey对应的滑动窗口
如果没有达到热key的规范
通过CaffeineCacheHolder从新put,cache会主动刷新过期工夫
如果达到了热key规范
向hotCache外面减少newkey对应的缓存,value为1示意刚为热key。
删除CaffeineCacheHolder外面对应newkey的滑动窗口缓存。
向该hotKeyModel对应的app的client推送netty音讯,示意新产生hotKey,使得client本地缓存,然而推送的netty音讯只代表为热key,client本地缓存不会存储key对应的value值,须要调用JdHotKeyStore外面的api来给本地缓存的value赋值
向dashboard推送hotKeyModel,示意新产生hotKey
3)计算热key滑动窗口的设计
限于篇幅的起因,这里就不细谈了,间接贴出我的项目作者对其写的阐明文章:Java简略实现滑动窗口

3.3.4 dashboard端

这个没啥可说的了,就是连贯etcd、mysql,增删改查,不过京东的前端框架很不便,间接返回list就能够成列表。

4 总结
文章第二局部为大家解说了redis数据歪斜的起因以及应答计划,并对热点问题进行了深刻,从发现热key到解决热key的两个关键问题的总结。

文章第三局部是热key问题解决方案——JD开源hotkey的源码解析,别离从client端、worker端、dashboard端来进行全方位解说,包含其设计、应用及相干原理。

心愿通过这篇文章,可能使大家不仅学习到相干方法论,也能明确其方法论具体的落地计划,一起学习,一起成长。

作者:李鹏