乐趣区

关于java:Spring-Cloud-Eureka源码分析之三级缓存的设计原理及源码分析

Eureka Server 为了提供响应效率,提供了两层的缓存构造,将 Eureka Client 所须要的注册信息,间接存储在缓存构造中,实现原理如下图所示。

第一层缓存:readOnlyCacheMap,实质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认工夫为 30 秒。

readOnlyCacheMap:是一个 CurrentHashMap 只读缓存,这个次要是为了供客户端获取注册信息时应用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做比照,如果数据不统一,则以 readWriteCacheMap 的数据为准。

第二层缓存:readWriteCacheMap,实质上是 Guava 缓存。

readWriteCacheMap:readWriteCacheMap 的数据次要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 办法去加载,加载胜利之后将数据放入缓存,同时返回数据。

readWriteCacheMap 缓存过期工夫,默认为 180 秒,当服务下线、过期、注册、状态变更,都会来革除此缓存中的数据。

Eureka Client 获取全量或者增量的数据时,会先从一级缓存中获取;如果一级缓存中不存在,再从二级缓存中获取;如果二级缓存也不存在,这时候先将存储层的数据同步到缓存中,再从缓存中获取。

通过 Eureka Server 的二层缓存机制,能够十分无效地晋升 Eureka Server 的响应工夫,通过数据存储层和缓存层的数据切割,依据应用场景来提供不同的数据反对。

多级缓存的意义

这里为什么要设计多级缓存呢?起因很简略,就是当存在大规模的服务注册和更新时,如果只是批改一个 ConcurrentHashMap 数据,那么势必因为锁的存在导致竞争,影响性能。

而 Eureka 又是 AP 模型,只须要满足最终可用就行。所以它在这里用到多级缓存来实现读写拆散。注册办法写的时候间接写内存注册表,写完表之后被动生效读写缓存。

获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较简单)。并且,读写缓存会更新回写只读缓存

  • responseCacheUpdateIntervalMs:readOnlyCacheMap 缓存更新的定时器工夫距离,默认为 30 秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期工夫,默认为 180 秒。

缓存初始化

readWriteCacheMap 应用的是 LoadingCache 对象,它是 guava 中提供的用来实现内存缓存的一个 api。创立形式如下

LoadingCache<Long, String> cache = CacheBuilder.newBuilder()
    // 缓存池大小,在缓存项靠近该大小时,Guava 开始回收旧的缓存项
    .maximumSize(10000)
    // 设置工夫对象没有被读 / 写访问则对象从内存中删除(在另外的线程外面不定期维护)
    .expireAfterAccess(10, TimeUnit.MINUTES)
    // 移除监听器, 缓存项被移除时会触发
    .removalListener(new RemovalListener <Long, String>() {
        @Override
        public void onRemoval(RemovalNotification<Long, String> rn) {// 执行逻辑操作}
    })
    .recordStats()// 开启 Guava Cache 的统计性能
    .build(new CacheLoader<String, Object>() {
        @Override
        public Object load(String key) {// 从 SQL 或者 NoSql 获取对象}
    });//CacheLoader 类 实现主动加载

其中,CacheLoader 是用来实现缓存主动加载的性能,当触发 readWriteCacheMap.get(key) 办法时,就会回调 CacheLoader.load 办法,依据 key 去服务注册信息中去查找实例数据进行缓存

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
        .removalListener(new RemovalListener<Key, Value>() {
            @Override
            public void onRemoval(RemovalNotification<Key, Value> notification) {Key removedKey = notification.getKey();
                if (removedKey.hasRegions()) {Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                }
            }
        })
        .build(new CacheLoader<Key, Value>() {
            @Override
            public Value load(Key key) throws Exception {if (key.hasRegions()) {Key cloneWithNoRegions = key.cloneWithoutRegions();
                    regionSpecificKeys.put(cloneWithNoRegions, key);
                }
                Value value = generatePayload(key);  // 留神这里
                return value;
            }
        });

而缓存的加载,是基于 generatePayload 办法实现的,代码如下。

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } else {tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            case VIP:
            case SVIP:
                tracer = serializeViptimer.start();
                payload = getPayLoad(key, getApplicationsForVip(key, registry));
                break;
            default:
                logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                payload = "";
                break;
        }
        return new Value(payload);
    } finally {if (tracer != null) {tracer.stop();
        }
    }
}

此办法承受一个 Key 类型的参数,返回一个 Value 类型。其中 Key 中重要的字段有:

  • KeyType,示意 payload 文本格式,有 JSON 和 XML 两种值。
  • EntityType,示意缓存的类型,有 Application , VIP , SVIP 三种值。
  • entityName,示意缓存的名称,可能是单个利用名,也可能是 ALL_APPSALL_APPS_DELTA

Value 则有一个 String 类型的 payload 和一个 byte 数组,示意 gzip 压缩后的字节。

缓存同步

ResponseCacheImpl 这个类的结构实现中,初始化了一个定时工作,这个定时工作每个

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    // 省略...
    if (shouldUseReadOnlyResponseCache) {timer.schedule(getCacheUpdateTask(),
                       new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                       responseCacheUpdateIntervalMs);
    }
}

默认每 30s 从 readWriteCacheMap 更新有差别的数据同步到 readOnlyCacheMap 中

private TimerTask getCacheUpdateTask() {return new TimerTask() {
        @Override
        public void run() {logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) { // 遍历只读汇合
                if (logger.isDebugEnabled()) {logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) { // 判断差别信息,如果有差别,则更新
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {CurrentRequestVersion.remove();
                }
            }
        }
    };
}

缓存生效

在 AbstractInstanceRegistry.register 这个办法中,当实现服务信息保留后,会调用 invalidateCache 生效缓存

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    //....
     invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    //....
}

最终调用 ResponseCacheImpl.invalidate 办法,实现缓存的生效机制

public void invalidate(Key... keys) {for (Key key : keys) {logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                     key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        readWriteCacheMap.invalidate(key);
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {for (Key keysWithRegion : keysWithRegions) {logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                             key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

退出移动版