序
本文次要钻研一下 jedis 的 testWhileIdle
testWhileIdle
org/apache/commons/pool2/impl/GenericObjectPool.java
@Override
public void evict() throws Exception {assertOpen();
if (!idleObjects.isEmpty()) {
PooledObject<T> underTest = null;
final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
synchronized (evictionLock) {
final EvictionConfig evictionConfig = new EvictionConfig(getMinEvictableIdleDuration(),
getSoftMinEvictableIdleDuration(),
getMinIdle());
final boolean testWhileIdle = getTestWhileIdle();
for (int i = 0, m = getNumTests(); i < m; i++) {if (evictionIterator == null || !evictionIterator.hasNext()) {evictionIterator = new EvictionIterator(idleObjects);
}
if (!evictionIterator.hasNext()) {
// Pool exhausted, nothing to do here
return;
}
try {underTest = evictionIterator.next();
} catch (final NoSuchElementException nsee) {
// Object was borrowed in another thread
// Don't count this as an eviction test so reduce i;
i--;
evictionIterator = null;
continue;
}
if (!underTest.startEvictionTest()) {
// Object was borrowed in another thread
// Don't count this as an eviction test so reduce i;
i--;
continue;
}
// User provided eviction policy could throw all sorts of
// crazy exceptions. Protect against such an exception
// killing the eviction thread.
boolean evict;
try {
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (final Throwable t) {
// Slightly convoluted as SwallowedExceptionListener
// uses Exception rather than Throwable
PoolUtils.checkRethrow(t);
swallowException(new Exception(t));
// Don't evict on error conditions
evict = false;
}
if (evict) {destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();} else {if (testWhileIdle) {
boolean active = false;
try {factory.activateObject(underTest);
active = true;
} catch (final Exception e) {destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();}
if (active) {
boolean validate = false;
Throwable validationThrowable = null;
try {validate = factory.validateObject(underTest);
} catch (final Throwable t) {PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();
if (validationThrowable != null) {if (validationThrowable instanceof RuntimeException) {throw (RuntimeException) validationThrowable;
}
throw (Error) validationThrowable;
}
} else {
try {factory.passivateObject(underTest);
} catch (final Exception e) {destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();}
}
}
}
if (!underTest.endEvictionTest(idleObjects)) {
// TODO - May need to add code here once additional
// states are used
}
}
}
}
}
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {removeAbandoned(ac);
}
}
GenericObjectPool 的 evict 办法在 idleObjects 不为空的时候会执行 evict 逻辑,它先通过 getNumTests 获取每次要对多少个 idleObject 进行验证,之后循环解决,首先通过 evictionPolicy.evict 判断是否须要 evict,如果是则执行 destroy 办法,否则判断是否 testWhileIdle,若是则先执行 activateObject 办法,再执行 validateObject,如果 activateObject 或者 validateObject 失败则执行 destroy 办法,如果 validateObject 胜利则执行 passivateObject 办法
JedisFactory
redis/clients/jedis/JedisFactory.java
@Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != clientConfig.getDatabase()) {jedis.select(clientConfig.getDatabase());
}
}
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {final BinaryJedis jedis = pooledJedis.getObject();
try {String host = jedisSocketFactory.getHost();
int port = jedisSocketFactory.getPort();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return host.equals(connectionHost)
&& port == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {logger.error("Error while validating pooled Jedis object.", e);
return false;
}
}
@Override
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {// TODO maybe should select db 0? Not sure right now.}
@Override
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {jedis.quit();
}
} catch (RuntimeException e) {logger.warn("Error while QUIT", e);
}
try {jedis.close();
} catch (RuntimeException e) {logger.warn("Error while close", e);
}
}
}
JedisFactory 的 activateObject 判断 db 是否一样,不一样则执行 select 办法;validateObject 办法则执行 ping;passivateObject 办法为空操作;destroyObject 办法会判断是否 broken,非 broken 执行 quit,最初执行 close 办法
Evictor
org/apache/commons/pool2/impl/BaseGenericObjectPool.java
/**
* The idle object evictor {@link TimerTask}.
*
* @see GenericKeyedObjectPool#setTimeBetweenEvictionRunsMillis
*/
class Evictor implements Runnable {
private ScheduledFuture<?> scheduledFuture;
/**
* Cancels the scheduled future.
*/
void cancel() {scheduledFuture.cancel(false);
}
/**
* Run pool maintenance. Evict objects qualifying for eviction and then
* ensure that the minimum number of idle instances are available.
* Since the Timer that invokes Evictors is shared for all Pools but
* pools may exist in different class loaders, the Evictor ensures that
* any actions taken are under the class loader of the factory
* associated with the pool.
*/
@Override
public void run() {
final ClassLoader savedClassLoader =
Thread.currentThread().getContextClassLoader();
try {if (factoryClassLoader != null) {
// Set the class loader for the factory
final ClassLoader cl = factoryClassLoader.get();
if (cl == null) {
// The pool has been dereferenced and the class loader
// GC'd. Cancel this timer so the pool can be GC'd as
// well.
cancel();
return;
}
Thread.currentThread().setContextClassLoader(cl);
}
// Evict from the pool
try {evict();
} catch(final Exception e) {swallowException(e);
} catch(final OutOfMemoryError oome) {
// Log problem but give evictor thread a chance to continue
// in case error is recoverable
oome.printStackTrace(System.err);
}
// Re-create idle instances.
try {ensureMinIdle();
} catch (final Exception e) {swallowException(e);
}
} finally {
// Restore the previous CCL
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
}
/**
* Sets the scheduled future.
*
* @param scheduledFuture the scheduled future.
*/
void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) {this.scheduledFuture = scheduledFuture;}
}
Evictor 实现了 Runnable 办法,其 run 办法先执行 evict 办法,后执行 ensureMinIdle 办法
setTimeBetweenEvictionRuns
org/apache/commons/pool2/impl/BaseGenericObjectPool.java
/**
* Sets the number of milliseconds to sleep between runs of the idle object evictor thread.
* <ul>
* <li>When positive, the idle object evictor thread starts.</li>
* <li>When non-positive, no idle object evictor thread runs.</li>
* </ul>
*
* @param timeBetweenEvictionRuns
* duration to sleep between evictor runs
*
* @see #getTimeBetweenEvictionRunsMillis
* @since 2.10.0
*/
public final void setTimeBetweenEvictionRuns(final Duration timeBetweenEvictionRuns) {this.durationBetweenEvictionRuns = PoolImplUtils.nonNull(timeBetweenEvictionRuns, BaseObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS);
startEvictor(this.durationBetweenEvictionRuns);
}
setTimeBetweenEvictionRuns 办法会给 durationBetweenEvictionRuns 赋值,同时执行 startEvictor 办法
startEvictor
org/apache/commons/pool2/impl/BaseGenericObjectPool.java
/**
* <p>Starts the evictor with the given delay. If there is an evictor
* running when this method is called, it is stopped and replaced with a
* new evictor with the specified delay.</p>
*
* <p>This method needs to be final, since it is called from a constructor.
* See POOL-195.</p>
*
* @param delay time in milliseconds before start and between eviction runs
*/
final void startEvictor(final Duration delay) {synchronized (evictionLock) {final boolean isPositiverDelay = PoolImplUtils.isPositive(delay);
if (evictor == null) { // Starting evictor for the first time or after a cancel
if (isPositiverDelay) { // Starting new evictor
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
} else if (isPositiverDelay) { // Stop or restart of existing evictor: Restart
synchronized (EvictionTimer.class) { // Ensure no cancel can happen between cancel / schedule calls
EvictionTimer.cancel(evictor, evictorShutdownTimeoutDuration, true);
evictor = null;
evictionIterator = null;
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
} else { // Stopping evictor
EvictionTimer.cancel(evictor, evictorShutdownTimeoutDuration, false);
}
}
}
startEvictor 办法会判断 delay 是否是负数,是的话,则执行 EvictionTimer.schedule(evictor, delay, delay),不是则执行 EvictionTimer.cancel(evictor, evictorShutdownTimeoutDuration, false);对于 evictor 不为 null 的会先执行 cancel 再执行 schedule
EvictionTimer
org/apache/commons/pool2/impl/EvictionTimer.java
/**
* Adds the specified eviction task to the timer. Tasks that are added with
* a call to this method *must* call {@link
* #cancel(BaseGenericObjectPool.Evictor, Duration, boolean)}
* to cancel the task to prevent memory and/or thread leaks in application
* server environments.
*
* @param task Task to be scheduled.
* @param delay Delay in milliseconds before task is executed.
* @param period Time in milliseconds between executions.
*/
static synchronized void schedule(final BaseGenericObjectPool<?>.Evictor task, final Duration delay, final Duration period) {if (null == executor) {executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory());
executor.setRemoveOnCancelPolicy(true);
executor.scheduleAtFixedRate(new Reaper(), delay.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS);
}
final WeakReference<Runnable> ref = new WeakReference<>(task);
final WeakRunner runner = new WeakRunner(ref);
final ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(runner, delay.toMillis(),
period.toMillis(), TimeUnit.MILLISECONDS);
task.setScheduledFuture(scheduledFuture);
taskMap.put(ref, runner);
}
schedule 办法应用的是 ScheduledThreadPoolExecutor 的 scheduleWithFixedDelay 办法来执行 evictor;而再 executor 为 null 时会创立 ScheduledThreadPoolExecutor,同时触发 scheduleAtFixedRate 来执行 Reaper
Reaper
org/apache/commons/pool2/impl/EvictionTimer.java
/**
* Task that removes references to abandoned tasks and shuts
* down the executor if there are no live tasks left.
*/
private static class Reaper implements Runnable {
@Override
public void run() {synchronized (EvictionTimer.class) {for (final Entry<WeakReference<Runnable>, WeakRunner> entry : taskMap.entrySet()) {if (entry.getKey().get() == null) {executor.remove(entry.getValue());
taskMap.remove(entry.getKey());
}
}
if (taskMap.isEmpty() && executor != null) {executor.shutdown();
executor.setCorePoolSize(0);
executor = null;
}
}
}
}
Reaper 次要是遍历 taskMap,删除被 cancel 掉的 task
小结
jedis 的 testWhileIdle 是依赖 Evictor 来进行的,即 Evictor 它通过 evictionPolicy.evict 判断是否须要 evict,如果是则执行 evict 逻辑,即 destroy 办法,否则走 testWhileIdle 的逻辑。testWhileIdle 先执行 activateObject 办法,再执行 validateObject,如果 activateObject 或者 validateObject 失败则执行 destroy 办法,最初如果 validateObject 胜利则执行 passivateObject 办法。
Evictor 实现了 Runnable 办法,其 run 办法先执行 evict 办法,后执行 ensureMinIdle 办法;BaseGenericObjectPool 的 setTimeBetweenEvictionRuns 办法会给 durationBetweenEvictionRuns 赋值,同时执行 startEvictor 办法,即触发执行 EvictionTimer.schedule(evictor, delay, delay),schedule 办法应用的是 ScheduledThreadPoolExecutor 的 scheduleWithFixedDelay 办法来执行 evictor。
doc
- GenericObjectPool 参数解析