Eureka Server 为了提供响应效率,提供了两层的缓存构造,将 Eureka Client 所须要的注册信息,间接存储在缓存构造中,实现原理如下图所示。

第一层缓存:readOnlyCacheMap,实质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认工夫为 30 秒。

readOnlyCacheMap : 是一个 CurrentHashMap 只读缓存,这个次要是为了供客户端获取注册信息时应用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做比照,如果数据不统一,则以 readWriteCacheMap 的数据为准。

第二层缓存:readWriteCacheMap,实质上是 Guava 缓存。

readWriteCacheMap:readWriteCacheMap 的数据次要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 办法去加载,加载胜利之后将数据放入缓存,同时返回数据。

readWriteCacheMap 缓存过期工夫,默认为 180 秒,当服务下线、过期、注册、状态变更,都会来革除此缓存中的数据。

Eureka Client 获取全量或者增量的数据时,会先从一级缓存中获取;如果一级缓存中不存在,再从二级缓存中获取;如果二级缓存也不存在,这时候先将存储层的数据同步到缓存中,再从缓存中获取。

通过 Eureka Server 的二层缓存机制,能够十分无效地晋升 Eureka Server 的响应工夫,通过数据存储层和缓存层的数据切割,依据应用场景来提供不同的数据反对。

多级缓存的意义

这里为什么要设计多级缓存呢?起因很简略,就是当存在大规模的服务注册和更新时,如果只是批改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。

而Eureka又是AP模型,只须要满足最终可用就行。所以它在这里用到多级缓存来实现读写拆散。注册办法写的时候间接写内存注册表,写完表之后被动生效读写缓存。

获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较简单)。并且,读写缓存会更新回写只读缓存

  • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器工夫距离,默认为30秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期工夫,默认为 180 秒 。

缓存初始化

readWriteCacheMap应用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api。创立形式如下

LoadingCache<Long, String> cache = CacheBuilder.newBuilder()    //缓存池大小,在缓存项靠近该大小时, Guava开始回收旧的缓存项    .maximumSize(10000)    //设置工夫对象没有被读/写访问则对象从内存中删除(在另外的线程外面不定期维护)    .expireAfterAccess(10, TimeUnit.MINUTES)    //移除监听器,缓存项被移除时会触发    .removalListener(new RemovalListener <Long, String>() {        @Override        public void onRemoval(RemovalNotification<Long, String> rn) {            //执行逻辑操作        }    })    .recordStats()//开启Guava Cache的统计性能    .build(new CacheLoader<String, Object>() {        @Override        public Object load(String key) {            //从 SQL或者NoSql 获取对象        }    });//CacheLoader类 实现主动加载

其中,CacheLoader是用来实现缓存主动加载的性能,当触发readWriteCacheMap.get(key)办法时,就会回调CacheLoader.load办法,依据key去服务注册信息中去查找实例数据进行缓存

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {    this.serverConfig = serverConfig;    this.serverCodecs = serverCodecs;    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();    this.registry = registry;    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();    this.readWriteCacheMap =        CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)        .removalListener(new RemovalListener<Key, Value>() {            @Override            public void onRemoval(RemovalNotification<Key, Value> notification) {                Key removedKey = notification.getKey();                if (removedKey.hasRegions()) {                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);                }            }        })        .build(new CacheLoader<Key, Value>() {            @Override            public Value load(Key key) throws Exception {                if (key.hasRegions()) {                    Key cloneWithNoRegions = key.cloneWithoutRegions();                    regionSpecificKeys.put(cloneWithNoRegions, key);                }                Value value = generatePayload(key);  //留神这里                return value;            }        });

而缓存的加载,是基于generatePayload办法实现的,代码如下。

private Value generatePayload(Key key) {    Stopwatch tracer = null;    try {        String payload;        switch (key.getEntityType()) {            case Application:                boolean isRemoteRegionRequested = key.hasRegions();                if (ALL_APPS.equals(key.getName())) {                    if (isRemoteRegionRequested) {                        tracer = serializeAllAppsWithRemoteRegionTimer.start();                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));                    } else {                        tracer = serializeAllAppsTimer.start();                        payload = getPayLoad(key, registry.getApplications());                    }                } else if (ALL_APPS_DELTA.equals(key.getName())) {                    if (isRemoteRegionRequested) {                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();                        versionDeltaWithRegions.incrementAndGet();                        versionDeltaWithRegionsLegacy.incrementAndGet();                        payload = getPayLoad(key,                                             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));                    } else {                        tracer = serializeDeltaAppsTimer.start();                        versionDelta.incrementAndGet();                        versionDeltaLegacy.incrementAndGet();                        payload = getPayLoad(key, registry.getApplicationDeltas());                    }                } else {                    tracer = serializeOneApptimer.start();                    payload = getPayLoad(key, registry.getApplication(key.getName()));                }                break;            case VIP:            case SVIP:                tracer = serializeViptimer.start();                payload = getPayLoad(key, getApplicationsForVip(key, registry));                break;            default:                logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());                payload = "";                break;        }        return new Value(payload);    } finally {        if (tracer != null) {            tracer.stop();        }    }}

此办法承受一个 Key 类型的参数,返回一个 Value 类型。 其中 Key 中重要的字段有:

  • KeyType ,示意payload文本格式,有 JSON 和 XML 两种值。
  • EntityType ,示意缓存的类型,有 Application , VIP , SVIP 三种值。
  • entityName ,示意缓存的名称,可能是单个利用名,也可能是 ALL_APPSALL_APPS_DELTA

Value 则有一个 String 类型的payload和一个 byte 数组,示意gzip压缩后的字节。

缓存同步

ResponseCacheImpl这个类的结构实现中,初始化了一个定时工作,这个定时工作每个

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {    //省略...    if (shouldUseReadOnlyResponseCache) {        timer.schedule(getCacheUpdateTask(),                       new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)                                + responseCacheUpdateIntervalMs),                       responseCacheUpdateIntervalMs);    }}

默认每30s从readWriteCacheMap更新有差别的数据同步到readOnlyCacheMap中

private TimerTask getCacheUpdateTask() {    return new TimerTask() {        @Override        public void run() {            logger.debug("Updating the client cache from response cache");            for (Key key : readOnlyCacheMap.keySet()) { //遍历只读汇合                if (logger.isDebugEnabled()) {                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());                }                try {                    CurrentRequestVersion.set(key.getVersion());                    Value cacheValue = readWriteCacheMap.get(key);                    Value currentCacheValue = readOnlyCacheMap.get(key);                    if (cacheValue != currentCacheValue) { //判断差别信息,如果有差别,则更新                        readOnlyCacheMap.put(key, cacheValue);                    }                } catch (Throwable th) {                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);                } finally {                    CurrentRequestVersion.remove();                }            }        }    };}

缓存生效

在AbstractInstanceRegistry.register这个办法中,当实现服务信息保留后,会调用invalidateCache生效缓存

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {    //....     invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());    //....}
最终调用ResponseCacheImpl.invalidate办法,实现缓存的生效机制
public void invalidate(Key... keys) {    for (Key key : keys) {        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",                     key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());        readWriteCacheMap.invalidate(key);        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {            for (Key keysWithRegion : keysWithRegions) {                logger.debug("Invalidating the response cache key : {} {} {} {} {}",                             key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());                readWriteCacheMap.invalidate(keysWithRegion);            }        }    }}
版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!