本文次要钻研一下jedis的return行为

spring-data-redis

RedisTemplate

org/springframework/data/redis/core/RedisTemplate.java

    @Nullable    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");        Assert.notNull(action, "Callback object must not be null");        RedisConnectionFactory factory = getRequiredConnectionFactory();        RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);        try {            boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);            RedisConnection connToUse = preProcessConnection(conn, existingConnection);            boolean pipelineStatus = connToUse.isPipelined();            if (pipeline && !pipelineStatus) {                connToUse.openPipeline();            }            RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));            T result = action.doInRedis(connToExpose);            // close pipeline            if (pipeline && !pipelineStatus) {                connToUse.closePipeline();            }            return postProcessResult(result, connToUse, existingConnection);        } finally {            RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);        }    }
RedisTemplate的execute办法先通过RedisConnectionUtils.getConnection获取连贯,最初通过RedisConnectionUtils.releaseConnection来偿还连贯

RedisConnectionUtils

org/springframework/data/redis/core/RedisConnectionUtils.java

    public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory,            boolean transactionSupport) {        releaseConnection(conn, factory);    }    public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) {        if (conn == null) {            return;        }        RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);        if (conHolder != null) {            if (conHolder.isTransactionActive()) {                if (connectionEquals(conHolder, conn)) {                    if (log.isDebugEnabled()) {                        log.debug("RedisConnection will be closed when transaction finished.");                    }                    // It's the transactional Connection: Don't close it.                    conHolder.released();                }                return;            }            // release transactional/read-only and non-transactional/non-bound connections.            // transactional connections for read-only transactions get no synchronizer registered            unbindConnection(factory);            return;        }        doCloseConnection(conn);    }    private static void doCloseConnection(@Nullable RedisConnection connection) {        if (connection == null) {            return;        }        if (log.isDebugEnabled()) {            log.debug("Closing Redis Connection.");        }        try {            connection.close();        } catch (DataAccessException ex) {            log.debug("Could not close Redis Connection", ex);        } catch (Throwable ex) {            log.debug("Unexpected exception on closing Redis Connection", ex);        }    }
releaseConnection办法次要是解决了事务相干的操作,最初执行doCloseConnection,它最初执行的是connection.close()

connection.close()

org/springframework/data/redis/connection/jedis/JedisConnection.java

    @Override    public void close() throws DataAccessException {        super.close();        JedisSubscription subscription = this.subscription;        try {            if (subscription != null) {                subscription.close();            }        } catch (Exception ex) {            LOGGER.debug("Cannot terminate subscription", ex);        } finally {            this.subscription = null;        }        // return the connection to the pool        if (pool != null) {            jedis.close();            return;        }        // else close the connection normally (doing the try/catch dance)        try {            jedis.quit();        } catch (Exception ex) {            LOGGER.debug("Failed to QUIT during close", ex);        }        try {            jedis.disconnect();        } catch (Exception ex) {            LOGGER.debug("Failed to disconnect during close", ex);        }    }
connection的close办法针对应用连接池的会执行jedis.close,否则执行jedis.quit

jedis.close()

redis/clients/jedis/Jedis.java

  @Override  public void close() {    if (dataSource != null) {      JedisPoolAbstract pool = this.dataSource;      this.dataSource = null;      if (isBroken()) {        pool.returnBrokenResource(this);      } else {        pool.returnResource(this);      }    } else {      super.close();    }  }
jedis的close办法会先判断isBroken(取的redis.clients.jedis.Connection.broken属性),如果是则执行returnBrokenResource,否则执行returnResource

pool

redis/clients/jedis/util/Pool.java

  public void returnBrokenResource(final T resource) {    if (resource != null) {      returnBrokenResourceObject(resource);    }  }  public void returnResource(final T resource) {    if (resource != null) {      returnResourceObject(resource);    }  }  protected void returnBrokenResourceObject(final T resource) {    try {      internalPool.invalidateObject(resource);    } catch (Exception e) {      throw new JedisException("Could not return the broken resource to the pool", e);    }  }  protected void returnResourceObject(final T resource) {    try {      internalPool.returnObject(resource);    } catch (RuntimeException e) {      throw new JedisException("Could not return the resource to the pool", e);    }  }
returnBrokenResource执行的是internalPool.invalidateObject(resource),而returnResourceObject执行的是internalPool.returnObject(resource)

invalidateObject

org/apache/commons/pool2/impl/GenericObjectPool.java

    public void invalidateObject(final T obj, final DestroyMode destroyMode) throws Exception {        final PooledObject<T> p = getPooledObject(obj);        if (p == null) {            if (isAbandonedConfig()) {                return;            }            throw new IllegalStateException(                    "Invalidated object not currently part of this pool");        }        synchronized (p) {            if (p.getState() != PooledObjectState.INVALID) {                destroy(p, destroyMode);            }        }        ensureIdle(1, false);    }    private void destroy(final PooledObject<T> toDestroy, final DestroyMode destroyMode) throws Exception {        toDestroy.invalidate();        idleObjects.remove(toDestroy);        allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));        try {            factory.destroyObject(toDestroy, destroyMode);        } finally {            destroyedCount.incrementAndGet();            createCount.decrementAndGet();        }    }
invalidateObject办法执行的是destroy办法,该办法会回调factory.destroyObject

destroyObject

redis/clients/jedis/JedisFactory.java

  public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {    final BinaryJedis jedis = pooledJedis.getObject();    if (jedis.isConnected()) {      try {        // need a proper test, probably with mock        if (!jedis.isBroken()) {          jedis.quit();        }      } catch (RuntimeException e) {        logger.warn("Error while QUIT", e);      }      try {        jedis.close();      } catch (RuntimeException e) {        logger.warn("Error while close", e);      }    }  }
destroyObject办法则执行jedis.close()敞开client连贯

returnObject

org/apache/commons/pool2/impl/GenericObjectPool.java

    public void returnObject(final T obj) {        final PooledObject<T> p = getPooledObject(obj);        if (p == null) {            if (!isAbandonedConfig()) {                throw new IllegalStateException(                        "Returned object not currently part of this pool");            }            return; // Object was abandoned and removed        }        markReturningState(p);        final Duration activeTime = p.getActiveDuration();        if (getTestOnReturn() && !factory.validateObject(p)) {            try {                destroy(p, DestroyMode.NORMAL);            } catch (final Exception e) {                swallowException(e);            }            try {                ensureIdle(1, false);            } catch (final Exception e) {                swallowException(e);            }            updateStatsReturn(activeTime);            return;        }        try {            factory.passivateObject(p);        } catch (final Exception e1) {            swallowException(e1);            try {                destroy(p, DestroyMode.NORMAL);            } catch (final Exception e) {                swallowException(e);            }            try {                ensureIdle(1, false);            } catch (final Exception e) {                swallowException(e);            }            updateStatsReturn(activeTime);            return;        }        if (!p.deallocate()) {            throw new IllegalStateException(                    "Object has already been returned to this pool or is invalid");        }        final int maxIdleSave = getMaxIdle();        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {            try {                destroy(p, DestroyMode.NORMAL);            } catch (final Exception e) {                swallowException(e);            }            try {                ensureIdle(1, false);            } catch (final Exception e) {                swallowException(e);            }        } else {            if (getLifo()) {                idleObjects.addFirst(p);            } else {                idleObjects.addLast(p);            }            if (isClosed()) {                // Pool closed while object was being added to idle objects.                // Make sure the returned object is destroyed rather than left                // in the idle object pool (which would effectively be a leak)                clear();            }        }        updateStatsReturn(activeTime);    }
returnObject针对testOnReturn的会执行validateObject办法,之后执行factory.passivateObject(p),最初依据maxIdle的参数来判断,超出的则执行destroy,否则依据是否Lifo放回到连接池(idleObjects)中

小结

spring-data-redis的return次要是执行connection的close办法,对应到jedis就是jedis.close(),它会先判断isBroken(取的redis.clients.jedis.Connection.broken属性),如果是则执行returnBrokenResource,否则执行returnResource。

  • returnBrokenResource执行的是internalPool.invalidateObject(resource),invalidateObject办法执行的是destroy办法,该办法会回调factory.destroyObject办法,即执行jedis.close()敞开client连贯
  • returnObject针对testOnReturn的会执行validateObject办法,之后执行factory.passivateObject(p),最初依据maxIdle的参数来判断,超出的则执行destroy,否则依据是否Lifo放回到连接池(idleObjects)中
  • 也就说假如获取连贯之后,执行的时候redis挂了,redis.clients.jedis.Connection会标记broken为true,同时抛出JedisConnectionException;而RedisTemplate是在finally中进行releaseConnection,因此偿还的时候会触发returnBrokenResource从而敞开坏掉的连贯,间接实现testOnReturn的成果
  • 如果在获取连贯的时候,redis挂了,然而连接池依然有连贯,若没有testOnBorrow则返回而后应用,然而应用的时候会报错,即redis.clients.jedis.Connection会标记broken为true,同时抛出JedisConnectionException,偿还的时候间接销毁;若有testOnBorrow则validate的时候能验证进去连贯有问题,则会执行destory而后持续循环获取连接池的连贯,直到连接池连贯没有了;若获取连贯的时候连接池没有闲暇连贯了,则走create的逻辑,这个时候create间接抛出redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.