序
本文次要钻研一下cache2k这款新型缓存
示例
Cache<String,String> cache = new Cache2kBuilder<String, String>() {} .eternal(true) .expireAfterWrite(5, TimeUnit.MINUTES) // expire/refresh after 5 minutes .setupWith(UniversalResiliencePolicy::enable, b -> b // enable resilience policy .resilienceDuration(30, TimeUnit.SECONDS) // cope with at most 30 seconds // outage before propagating // exceptions ) .refreshAhead(true) // keep fresh when expiring .loader(k -> expensiveOperation(k)) // auto populating function .build();
常见问题的解决方案
空值问题
JCache标准不反对null,所以cache2k默认也不反对,不过能够通过permitNullValues(true)来开启,这样子缓存就能够存储null值
cache stampede问题
又称作cache miss storm,指的是高并发场景缓存同时生效导致大面积回源,cache2k采纳的是block的申请形式,防止对同一个key并发回源
org/cache2k/core/HeapCache.java
protected Entry<K, V> getEntryInternal(K key, int hc, int val) { if (loader == null) { return peekEntryInternal(key, hc, val); } Entry<K, V> e; for (;;) { e = lookupOrNewEntry(key, hc, val); if (e.hasFreshData(clock)) { return e; } synchronized (e) { e.waitForProcessing(); if (e.hasFreshData(clock)) { return e; } if (e.isGone()) { metrics.heapHitButNoRead(); metrics.goneSpin(); continue; } e.startProcessing(Entry.ProcessingState.LOAD, null); break; } } boolean finished = false; try { load(e); finished = true; } finally { e.ensureAbort(finished); } if (e.getValueOrException() == null && isRejectNullValues()) { return null; } return e; }
同步回源造成的接口稳定性问题
cache2k提供了refreshAhead参数,在新数据没有拉取胜利之前,过期数据依然能够拜访,防止申请到来时发现数据过期触发同步回源造成接口延时增大问题。不过具体底层还依赖prefetchExecutor,如果refresh的时候没有足够的线程能够应用则会立马过期,期待下次get登程同步回源
org/cache2k/core/HeapCache.java
public void timerEventRefresh(Entry<K, V> e, Object task) { metrics.timerEvent(); synchronized (e) { if (e.getTask() != task) { return; } try { refreshExecutor.execute(createFireAndForgetAction(e, Operations.SINGLETON.refresh)); } catch (RejectedExecutionException ex) { metrics.refreshRejected(); expireOrScheduleFinalExpireEvent(e); } } }
默认的executor如下,采纳的是SynchronousQueue队列,能够通过builder本人去设置refreshExecutor
Executor provideDefaultLoaderExecutor(int threadCount) { int corePoolThreadSize = 0; return new ThreadPoolExecutor(corePoolThreadSize, threadCount, 21, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactoryProvider.newThreadFactory(getThreadNamePrefix()), new ThreadPoolExecutor.AbortPolicy()); }
回源故障问题
针对回源的上游呈现故障的问题,cache2k提供了ResiliencePolicy策略,其实现类为UniversalResiliencePolicy
当load办法抛出异样且cache外头还有数据的时候,异样不会抛给client,用以后的数据返回,这里有个resilienceDuration工夫,如果超过这个工夫load办法还持续抛出异样则异样会抛给client。如果没有独自设置resilienceDuration,则默认取的是expiryAfterWrite工夫
org/cache2k/core/HeapCache.java
private Object loadGotException(Entry<K, V> e, long t0, long t, Throwable wrappedException) { ExceptionWrapper<K, V> exceptionWrapper = new ExceptionWrapper(keyObjFromEntry(e), wrappedException, t0, e, exceptionPropagator); long expiry = 0; long refreshTime = 0; boolean suppressException = false; RefreshAheadPolicy.Context<Object> refreshCtx; try { if (e.isValidOrExpiredAndNoException()) { expiry = timing.suppressExceptionUntil(e, exceptionWrapper); } if (expiry > t0) { suppressException = true; } else { expiry = timing.cacheExceptionUntil(e, exceptionWrapper); } refreshCtx = getContext(e, t0, t, true, true, false, expiry); refreshTime = timing.calculateRefreshTime(refreshCtx); } catch (Exception ex) { return resiliencePolicyException(e, t0, t, new ResiliencePolicyException(ex), null); } exceptionWrapper = new ExceptionWrapper<>(exceptionWrapper, Math.abs(expiry)); Object wrappedValue = exceptionWrapper; if (expiry != 0) { wrappedValue = timing.wrapLoadValueForRefresh(refreshCtx, e, exceptionWrapper); } Object loadResult; synchronized (e) { insertUpdateStats(e, (V) wrappedValue, t0, t, true, expiry, suppressException); if (suppressException) { e.setSuppressedLoadExceptionInformation(exceptionWrapper); loadResult = e.getValueOrException(); } else { if (isRecordModificationTime()) { e.setModificationTime(t0); } e.setValueOrWrapper(exceptionWrapper); loadResult = exceptionWrapper; } finishLoadOrEviction(e, expiry, refreshTime); } return loadResult; }
这里timing.suppressExceptionUntil是委托给了ResiliencePolicy#suppressExceptionUntil
cache2k-addon/src/main/java/org/cache2k/addon/UniversalResiliencePolicy.java
public long suppressExceptionUntil(K key, LoadExceptionInfo<K, V> loadExceptionInfo, CacheEntry<K, V> cachedEntry) { if (resilienceDuration == 0 || resilienceDuration == Long.MAX_VALUE) { return resilienceDuration; } long maxSuppressUntil = loadExceptionInfo.getSinceTime() + resilienceDuration; long deltaMs = calculateRetryDelta(loadExceptionInfo); return Math.min(loadExceptionInfo.getLoadTime() + deltaMs, maxSuppressUntil); }
UniversalResiliencePolicy还提供了异样重试的性能,重试距离为retryInterval,如果没有配置则为resilienceDuration的5%,采取的是指数退却的模式,factor为1.5
小结
cache2k提供了Guava Cache及Caffeine没有的ResiliencePolicy,针对C端高并发场景提供了容错的性能,值得借鉴一下。
集体公众号「码匠的流水账」(geek_luandun),欢送关注
doc
- cache2k
- cache2k User Guide
- Introduction to cache2k
- caffeine
- guava cache