本文次要钻研一下cache2k这款新型缓存

示例

Cache<String,String> cache = new Cache2kBuilder<String, String>() {}                .eternal(true)                .expireAfterWrite(5, TimeUnit.MINUTES)    // expire/refresh after 5 minutes                .setupWith(UniversalResiliencePolicy::enable, b -> b // enable resilience policy                                .resilienceDuration(30, TimeUnit.SECONDS)          // cope with at most 30 seconds                        // outage before propagating                        // exceptions                )                .refreshAhead(true)                       // keep fresh when expiring                .loader(k -> expensiveOperation(k))         // auto populating function                .build();

常见问题的解决方案

空值问题

JCache标准不反对null,所以cache2k默认也不反对,不过能够通过permitNullValues(true)来开启,这样子缓存就能够存储null值

cache stampede问题

又称作cache miss storm,指的是高并发场景缓存同时生效导致大面积回源,cache2k采纳的是block的申请形式,防止对同一个key并发回源

org/cache2k/core/HeapCache.java

protected Entry<K, V> getEntryInternal(K key, int hc, int val) {    if (loader == null) {      return peekEntryInternal(key, hc, val);    }    Entry<K, V> e;    for (;;) {      e = lookupOrNewEntry(key, hc, val);      if (e.hasFreshData(clock)) {        return e;      }      synchronized (e) {        e.waitForProcessing();        if (e.hasFreshData(clock)) {          return e;        }        if (e.isGone()) {          metrics.heapHitButNoRead();          metrics.goneSpin();          continue;        }        e.startProcessing(Entry.ProcessingState.LOAD, null);        break;      }    }    boolean finished = false;    try {      load(e);      finished = true;    } finally {      e.ensureAbort(finished);    }    if (e.getValueOrException() == null && isRejectNullValues()) {      return null;    }    return e;  }

同步回源造成的接口稳定性问题

cache2k提供了refreshAhead参数,在新数据没有拉取胜利之前,过期数据依然能够拜访,防止申请到来时发现数据过期触发同步回源造成接口延时增大问题。不过具体底层还依赖prefetchExecutor,如果refresh的时候没有足够的线程能够应用则会立马过期,期待下次get登程同步回源

org/cache2k/core/HeapCache.java

public void timerEventRefresh(Entry<K, V> e, Object task) {    metrics.timerEvent();    synchronized (e) {      if (e.getTask() != task) { return; }      try {        refreshExecutor.execute(createFireAndForgetAction(e, Operations.SINGLETON.refresh));      } catch (RejectedExecutionException ex) {        metrics.refreshRejected();        expireOrScheduleFinalExpireEvent(e);      }    }  }

默认的executor如下,采纳的是SynchronousQueue队列,能够通过builder本人去设置refreshExecutor

  Executor provideDefaultLoaderExecutor(int threadCount) {    int corePoolThreadSize = 0;    return new ThreadPoolExecutor(corePoolThreadSize, threadCount,      21, TimeUnit.SECONDS,      new SynchronousQueue<>(),      threadFactoryProvider.newThreadFactory(getThreadNamePrefix()),      new ThreadPoolExecutor.AbortPolicy());  }

回源故障问题

针对回源的上游呈现故障的问题,cache2k提供了ResiliencePolicy策略,其实现类为UniversalResiliencePolicy
当load办法抛出异样且cache外头还有数据的时候,异样不会抛给client,用以后的数据返回,这里有个resilienceDuration工夫,如果超过这个工夫load办法还持续抛出异样则异样会抛给client。如果没有独自设置resilienceDuration,则默认取的是expiryAfterWrite工夫

org/cache2k/core/HeapCache.java

private Object loadGotException(Entry<K, V> e, long t0, long t, Throwable wrappedException) {    ExceptionWrapper<K, V> exceptionWrapper =      new ExceptionWrapper(keyObjFromEntry(e), wrappedException, t0, e, exceptionPropagator);    long expiry = 0;    long refreshTime = 0;    boolean suppressException = false;    RefreshAheadPolicy.Context<Object> refreshCtx;    try {      if (e.isValidOrExpiredAndNoException()) {        expiry = timing.suppressExceptionUntil(e, exceptionWrapper);      }      if (expiry > t0) {        suppressException = true;      } else {        expiry = timing.cacheExceptionUntil(e, exceptionWrapper);      }      refreshCtx = getContext(e, t0, t, true, true, false, expiry);      refreshTime = timing.calculateRefreshTime(refreshCtx);    } catch (Exception ex) {      return resiliencePolicyException(e, t0, t, new ResiliencePolicyException(ex), null);    }    exceptionWrapper = new ExceptionWrapper<>(exceptionWrapper, Math.abs(expiry));    Object wrappedValue = exceptionWrapper;    if (expiry != 0) {      wrappedValue = timing.wrapLoadValueForRefresh(refreshCtx, e, exceptionWrapper);    }    Object loadResult;    synchronized (e) {      insertUpdateStats(e, (V) wrappedValue, t0, t, true, expiry, suppressException);      if (suppressException) {        e.setSuppressedLoadExceptionInformation(exceptionWrapper);        loadResult = e.getValueOrException();      } else {        if (isRecordModificationTime()) {          e.setModificationTime(t0);        }        e.setValueOrWrapper(exceptionWrapper);        loadResult = exceptionWrapper;      }      finishLoadOrEviction(e, expiry, refreshTime);    }    return loadResult;  }
这里timing.suppressExceptionUntil是委托给了ResiliencePolicy#suppressExceptionUntil

cache2k-addon/src/main/java/org/cache2k/addon/UniversalResiliencePolicy.java

public long suppressExceptionUntil(K key,                                     LoadExceptionInfo<K, V> loadExceptionInfo,                                     CacheEntry<K, V> cachedEntry) {    if (resilienceDuration == 0 || resilienceDuration == Long.MAX_VALUE) {      return resilienceDuration;    }    long maxSuppressUntil = loadExceptionInfo.getSinceTime() + resilienceDuration;    long deltaMs = calculateRetryDelta(loadExceptionInfo);    return Math.min(loadExceptionInfo.getLoadTime() + deltaMs, maxSuppressUntil);  }

UniversalResiliencePolicy还提供了异样重试的性能,重试距离为retryInterval,如果没有配置则为resilienceDuration的5%,采取的是指数退却的模式,factor为1.5

小结

cache2k提供了Guava Cache及Caffeine没有的ResiliencePolicy,针对C端高并发场景提供了容错的性能,值得借鉴一下。

集体公众号「码匠的流水账」(geek_luandun),欢送关注

doc

  • cache2k
  • cache2k User Guide
  • Introduction to cache2k
  • caffeine
  • guava cache