关于缓存:cache2kGuava-Cache及Caffeine之外的新选择

43次阅读

共计 4488 个字符,预计需要花费 12 分钟才能阅读完成。

本文次要钻研一下 cache2k 这款新型缓存

示例

Cache<String,String> cache = new Cache2kBuilder<String, String>() {}
                .eternal(true)
                .expireAfterWrite(5, TimeUnit.MINUTES)    // expire/refresh after 5 minutes
                .setupWith(UniversalResiliencePolicy::enable, b -> b // enable resilience policy
                                .resilienceDuration(30, TimeUnit.SECONDS)          // cope with at most 30 seconds
                        // outage before propagating
                        // exceptions
                )
                .refreshAhead(true)                       // keep fresh when expiring
                .loader(k -> expensiveOperation(k))         // auto populating function
                .build();

常见问题的解决方案

空值问题

JCache 标准不反对 null,所以 cache2k 默认也不反对,不过能够通过 permitNullValues(true) 来开启,这样子缓存就能够存储 null 值

cache stampede 问题

又称作 cache miss storm,指的是高并发场景缓存同时生效导致大面积回源,cache2k 采纳的是 block 的申请形式,防止对同一个 key 并发回源

org/cache2k/core/HeapCache.java

protected Entry<K, V> getEntryInternal(K key, int hc, int val) {if (loader == null) {return peekEntryInternal(key, hc, val);
    }
    Entry<K, V> e;
    for (;;) {e = lookupOrNewEntry(key, hc, val);
      if (e.hasFreshData(clock)) {return e;}
      synchronized (e) {e.waitForProcessing();
        if (e.hasFreshData(clock)) {return e;}
        if (e.isGone()) {metrics.heapHitButNoRead();
          metrics.goneSpin();
          continue;
        }
        e.startProcessing(Entry.ProcessingState.LOAD, null);
        break;
      }
    }
    boolean finished = false;
    try {load(e);
      finished = true;
    } finally {e.ensureAbort(finished);
    }
    if (e.getValueOrException() == null && isRejectNullValues()) {return null;}
    return e;
  }

同步回源造成的接口稳定性问题

cache2k 提供了 refreshAhead 参数,在新数据没有拉取胜利之前,过期数据依然能够拜访,防止申请到来时发现数据过期触发同步回源造成接口延时增大问题。不过具体底层还依赖 prefetchExecutor,如果 refresh 的时候没有足够的线程能够应用则会立马过期,期待下次 get 登程同步回源

org/cache2k/core/HeapCache.java

public void timerEventRefresh(Entry<K, V> e, Object task) {metrics.timerEvent();
    synchronized (e) {if (e.getTask() != task) {return;}
      try {refreshExecutor.execute(createFireAndForgetAction(e, Operations.SINGLETON.refresh));
      } catch (RejectedExecutionException ex) {metrics.refreshRejected();
        expireOrScheduleFinalExpireEvent(e);
      }
    }
  }

默认的 executor 如下,采纳的是 SynchronousQueue 队列,能够通过 builder 本人去设置 refreshExecutor

  Executor provideDefaultLoaderExecutor(int threadCount) {
    int corePoolThreadSize = 0;
    return new ThreadPoolExecutor(corePoolThreadSize, threadCount,
      21, TimeUnit.SECONDS,
      new SynchronousQueue<>(),
      threadFactoryProvider.newThreadFactory(getThreadNamePrefix()),
      new ThreadPoolExecutor.AbortPolicy());
  }

回源故障问题

针对回源的上游呈现故障的问题,cache2k 提供了 ResiliencePolicy 策略,其实现类为 UniversalResiliencePolicy
当 load 办法抛出异样且 cache 外头还有数据的时候,异样不会抛给 client,用以后的数据返回,这里有个 resilienceDuration 工夫,如果超过这个工夫 load 办法还持续抛出异样则异样会抛给 client。如果没有独自设置 resilienceDuration,则默认取的是 expiryAfterWrite 工夫

org/cache2k/core/HeapCache.java

private Object loadGotException(Entry<K, V> e, long t0, long t, Throwable wrappedException) {
    ExceptionWrapper<K, V> exceptionWrapper =
      new ExceptionWrapper(keyObjFromEntry(e), wrappedException, t0, e, exceptionPropagator);
    long expiry = 0;
    long refreshTime = 0;
    boolean suppressException = false;
    RefreshAheadPolicy.Context<Object> refreshCtx;
    try {if (e.isValidOrExpiredAndNoException()) {expiry = timing.suppressExceptionUntil(e, exceptionWrapper);
      }
      if (expiry > t0) {suppressException = true;} else {expiry = timing.cacheExceptionUntil(e, exceptionWrapper);
      }
      refreshCtx = getContext(e, t0, t, true, true, false, expiry);
      refreshTime = timing.calculateRefreshTime(refreshCtx);
    } catch (Exception ex) {return resiliencePolicyException(e, t0, t, new ResiliencePolicyException(ex), null);
    }
    exceptionWrapper = new ExceptionWrapper<>(exceptionWrapper, Math.abs(expiry));
    Object wrappedValue = exceptionWrapper;
    if (expiry != 0) {wrappedValue = timing.wrapLoadValueForRefresh(refreshCtx, e, exceptionWrapper);
    }
    Object loadResult;
    synchronized (e) {insertUpdateStats(e, (V) wrappedValue, t0, t, true, expiry, suppressException);
      if (suppressException) {e.setSuppressedLoadExceptionInformation(exceptionWrapper);
        loadResult = e.getValueOrException();} else {if (isRecordModificationTime()) {e.setModificationTime(t0);
        }
        e.setValueOrWrapper(exceptionWrapper);
        loadResult = exceptionWrapper;
      }
      finishLoadOrEviction(e, expiry, refreshTime);
    }
    return loadResult;
  }

这里 timing.suppressExceptionUntil 是委托给了 ResiliencePolicy#suppressExceptionUntil

cache2k-addon/src/main/java/org/cache2k/addon/UniversalResiliencePolicy.java

public long suppressExceptionUntil(K key,
                                     LoadExceptionInfo<K, V> loadExceptionInfo,
                                     CacheEntry<K, V> cachedEntry) {if (resilienceDuration == 0 || resilienceDuration == Long.MAX_VALUE) {return resilienceDuration;}
    long maxSuppressUntil = loadExceptionInfo.getSinceTime() + resilienceDuration;
    long deltaMs = calculateRetryDelta(loadExceptionInfo);
    return Math.min(loadExceptionInfo.getLoadTime() + deltaMs, maxSuppressUntil);
  }

UniversalResiliencePolicy 还提供了异样重试的性能,重试距离为 retryInterval,如果没有配置则为 resilienceDuration 的 5%,采取的是指数退却的模式,factor 为 1.5

小结

cache2k 提供了 Guava Cache 及 Caffeine 没有的 ResiliencePolicy,针对 C 端高并发场景提供了容错的性能,值得借鉴一下。

集体公众号「码匠的流水账」(geek_luandun),欢送关注

doc

  • cache2k
  • cache2k User Guide
  • Introduction to cache2k
  • caffeine
  • guava cache

正文完
 0