关于jedis:聊聊jedis的return行为

7次阅读

共计 8271 个字符,预计需要花费 21 分钟才能阅读完成。

本文次要钻研一下 jedis 的 return 行为

spring-data-redis

RedisTemplate

org/springframework/data/redis/core/RedisTemplate.java

    @Nullable
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");

        RedisConnectionFactory factory = getRequiredConnectionFactory();
        RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);

        try {boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
            RedisConnection connToUse = preProcessConnection(conn, existingConnection);

            boolean pipelineStatus = connToUse.isPipelined();
            if (pipeline && !pipelineStatus) {connToUse.openPipeline();
            }

            RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
            T result = action.doInRedis(connToExpose);

            // close pipeline
            if (pipeline && !pipelineStatus) {connToUse.closePipeline();
            }

            return postProcessResult(result, connToUse, existingConnection);
        } finally {RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
        }
    }

RedisTemplate 的 execute 办法先通过 RedisConnectionUtils.getConnection 获取连贯,最初通过 RedisConnectionUtils.releaseConnection 来偿还连贯

RedisConnectionUtils

org/springframework/data/redis/core/RedisConnectionUtils.java

    public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory,
            boolean transactionSupport) {releaseConnection(conn, factory);
    }

    public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) {if (conn == null) {return;}

        RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
        if (conHolder != null) {if (conHolder.isTransactionActive()) {if (connectionEquals(conHolder, conn)) {if (log.isDebugEnabled()) {log.debug("RedisConnection will be closed when transaction finished.");
                    }

                    // It's the transactional Connection: Don't close it.
                    conHolder.released();}
                return;
            }

            // release transactional/read-only and non-transactional/non-bound connections.
            // transactional connections for read-only transactions get no synchronizer registered

            unbindConnection(factory);
            return;
        }

        doCloseConnection(conn);
    }

    private static void doCloseConnection(@Nullable RedisConnection connection) {if (connection == null) {return;}

        if (log.isDebugEnabled()) {log.debug("Closing Redis Connection.");
        }

        try {connection.close();
        } catch (DataAccessException ex) {log.debug("Could not close Redis Connection", ex);
        } catch (Throwable ex) {log.debug("Unexpected exception on closing Redis Connection", ex);
        }
    }

releaseConnection 办法次要是解决了事务相干的操作,最初执行 doCloseConnection,它最初执行的是 connection.close()

connection.close()

org/springframework/data/redis/connection/jedis/JedisConnection.java

    @Override
    public void close() throws DataAccessException {super.close();

        JedisSubscription subscription = this.subscription;
        try {if (subscription != null) {subscription.close();
            }
        } catch (Exception ex) {LOGGER.debug("Cannot terminate subscription", ex);
        } finally {this.subscription = null;}

        // return the connection to the pool
        if (pool != null) {jedis.close();
            return;
        }

        // else close the connection normally (doing the try/catch dance)

        try {jedis.quit();
        } catch (Exception ex) {LOGGER.debug("Failed to QUIT during close", ex);
        }

        try {jedis.disconnect();
        } catch (Exception ex) {LOGGER.debug("Failed to disconnect during close", ex);
        }
    }

connection 的 close 办法针对应用连接池的会执行 jedis.close,否则执行 jedis.quit

jedis.close()

redis/clients/jedis/Jedis.java

  @Override
  public void close() {if (dataSource != null) {
      JedisPoolAbstract pool = this.dataSource;
      this.dataSource = null;
      if (isBroken()) {pool.returnBrokenResource(this);
      } else {pool.returnResource(this);
      }
    } else {super.close();
    }
  }

jedis 的 close 办法会先判断 isBroken(取的 redis.clients.jedis.Connection.broken 属性),如果是则执行 returnBrokenResource,否则执行 returnResource

pool

redis/clients/jedis/util/Pool.java

  public void returnBrokenResource(final T resource) {if (resource != null) {returnBrokenResourceObject(resource);
    }
  }

  public void returnResource(final T resource) {if (resource != null) {returnResourceObject(resource);
    }
  }

  protected void returnBrokenResourceObject(final T resource) {
    try {internalPool.invalidateObject(resource);
    } catch (Exception e) {throw new JedisException("Could not return the broken resource to the pool", e);
    }
  }

  protected void returnResourceObject(final T resource) {
    try {internalPool.returnObject(resource);
    } catch (RuntimeException e) {throw new JedisException("Could not return the resource to the pool", e);
    }
  }

returnBrokenResource 执行的是 internalPool.invalidateObject(resource),而 returnResourceObject 执行的是 internalPool.returnObject(resource)

invalidateObject

org/apache/commons/pool2/impl/GenericObjectPool.java

    public void invalidateObject(final T obj, final DestroyMode destroyMode) throws Exception {final PooledObject<T> p = getPooledObject(obj);
        if (p == null) {if (isAbandonedConfig()) {return;}
            throw new IllegalStateException("Invalidated object not currently part of this pool");
        }
        synchronized (p) {if (p.getState() != PooledObjectState.INVALID) {destroy(p, destroyMode);
            }
        }
        ensureIdle(1, false);
    }

    private void destroy(final PooledObject<T> toDestroy, final DestroyMode destroyMode) throws Exception {toDestroy.invalidate();
        idleObjects.remove(toDestroy);
        allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
        try {factory.destroyObject(toDestroy, destroyMode);
        } finally {destroyedCount.incrementAndGet();
            createCount.decrementAndGet();}
    }

invalidateObject 办法执行的是 destroy 办法,该办法会回调 factory.destroyObject

destroyObject

redis/clients/jedis/JedisFactory.java

  public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.isConnected()) {
      try {
        // need a proper test, probably with mock
        if (!jedis.isBroken()) {jedis.quit();
        }
      } catch (RuntimeException e) {logger.warn("Error while QUIT", e);
      }
      try {jedis.close();
      } catch (RuntimeException e) {logger.warn("Error while close", e);
      }
    }
  }

destroyObject 办法则执行 jedis.close()敞开 client 连贯

returnObject

org/apache/commons/pool2/impl/GenericObjectPool.java

    public void returnObject(final T obj) {final PooledObject<T> p = getPooledObject(obj);

        if (p == null) {if (!isAbandonedConfig()) {
                throw new IllegalStateException("Returned object not currently part of this pool");
            }
            return; // Object was abandoned and removed
        }

        markReturningState(p);

        final Duration activeTime = p.getActiveDuration();

        if (getTestOnReturn() && !factory.validateObject(p)) {
            try {destroy(p, DestroyMode.NORMAL);
            } catch (final Exception e) {swallowException(e);
            }
            try {ensureIdle(1, false);
            } catch (final Exception e) {swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        try {factory.passivateObject(p);
        } catch (final Exception e1) {swallowException(e1);
            try {destroy(p, DestroyMode.NORMAL);
            } catch (final Exception e) {swallowException(e);
            }
            try {ensureIdle(1, false);
            } catch (final Exception e) {swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        if (!p.deallocate()) {
            throw new IllegalStateException("Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {destroy(p, DestroyMode.NORMAL);
            } catch (final Exception e) {swallowException(e);
            }
            try {ensureIdle(1, false);
            } catch (final Exception e) {swallowException(e);
            }
        } else {if (getLifo()) {idleObjects.addFirst(p);
            } else {idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();}
        }
        updateStatsReturn(activeTime);
    }

returnObject 针对 testOnReturn 的会执行 validateObject 办法,之后执行 factory.passivateObject(p),最初依据 maxIdle 的参数来判断,超出的则执行 destroy,否则依据是否 Lifo 放回到连接池 (idleObjects) 中

小结

spring-data-redis 的 return 次要是执行 connection 的 close 办法,对应到 jedis 就是 jedis.close(),它会先判断 isBroken(取的 redis.clients.jedis.Connection.broken 属性),如果是则执行 returnBrokenResource,否则执行 returnResource。

  • returnBrokenResource 执行的是 internalPool.invalidateObject(resource),invalidateObject 办法执行的是 destroy 办法,该办法会回调 factory.destroyObject 办法,即执行 jedis.close()敞开 client 连贯
  • returnObject 针对 testOnReturn 的会执行 validateObject 办法,之后执行 factory.passivateObject(p),最初依据 maxIdle 的参数来判断,超出的则执行 destroy,否则依据是否 Lifo 放回到连接池 (idleObjects) 中
  • 也就说假如获取连贯之后,执行的时候 redis 挂了,redis.clients.jedis.Connection 会标记 broken 为 true,同时抛出 JedisConnectionException;而 RedisTemplate 是在 finally 中进行 releaseConnection,因此偿还的时候会触发 returnBrokenResource 从而敞开坏掉的连贯,间接实现 testOnReturn 的成果
  • 如果在获取连贯的时候,redis 挂了,然而连接池依然有连贯,若没有 testOnBorrow 则返回而后应用,然而应用的时候会报错,即 redis.clients.jedis.Connection 会标记 broken 为 true,同时抛出 JedisConnectionException,偿还的时候间接销毁;若有 testOnBorrow 则 validate 的时候能验证进去连贯有问题,则会执行 destory 而后持续循环获取连接池的连贯,直到连接池连贯没有了;若获取连贯的时候连接池没有闲暇连贯了,则走 create 的逻辑,这个时候 create 间接抛出 redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.
正文完
 0