关于redis:深入剖析Redis客户端Jedis的特性和原理

7次阅读

共计 21458 个字符,预计需要花费 54 分钟才能阅读完成。

一、开篇

Redis 作为目前通用的缓存选型,因其高性能而倍受欢迎。Redis 的 2.x 版本仅反对单机模式,从 3.0 版本开始引入集群模式。

Redis 的 Java 生态的客户端当中蕴含 Jedis、Redisson、Lettuce,不同的客户端具备不同的能力是应用形式,本文次要剖析 Jedis 客户端。

Jedis 客户端同时反对单机模式、分片模式、集群模式的拜访模式,通过构建 Jedis 类对象实现单机模式下的数据拜访,通过构建 ShardedJedis 类对象实现分片模式的数据拜访,通过构建 JedisCluster 类对象实现集群模式下的数据拜访。

Jedis 客户端反对单命令和 Pipeline 形式拜访 Redis 集群,通过 Pipeline 的形式可能进步集群拜访的效率。

本文的整体剖析基于 Jedis 的 3.5.0 版本进行剖析,相干源码均参考此版本。

二、Jedis 拜访模式比照

Jedis 客户端操作 Redis 次要分为三种模式,分表是单机模式、分片模式、集群模式。

  • 单机模式次要是创立 Jedis 对象来操作单节点的 Redis,只实用于拜访单个 Redis 节点。
  • 分片模式(ShardedJedis)次要是通过创立 ShardedJedisPool 对象来拜访分片模式的多个 Redis 节点,是 Redis 没有集群性能之前客户端实现的一个数据分布式计划,实质上是客户端通过一致性哈希来实现数据分布式存储。
  • 集群模式(JedisCluster)次要是通过创立 JedisCluster 对象来拜访集群模式下的多个 Redis 节点,是 Redis3.0 引入集群模式后客户端实现的集群拜访拜访,实质上是通过引入槽(slot)概念以及通过 CRC16 哈希槽算法来实现数据分布式存储。

单机模式不波及任何分片的思维,所以咱们着重剖析分片模式和集群模式的理念。

2.1 分片模式

  • 分片模式实质属于基于客户端的分片,在客户端实现如何依据一个 key 找到 Redis 集群中对应的节点的计划。
  • Jedis 的客户端分片模式采纳一致性 Hash 来实现,一致性 Hash 算法的益处是当 Redis 节点进行增减时只会影响新增或删除节点前后的小局部数据,绝对于取模等算法来说对数据的影响范畴较小。
  • Redis 在大部分场景下作为缓存进行应用,所以不必思考数据失落以致缓存穿透造成的影响,在 Redis 节点增减时能够不必思考局部数据无奈命中的问题。

分片模式的整体利用如下图所示,外围在于客户端的一致性 Hash 策略。

(援用自:www.cnblogs.com)

2.2 集群模式

集群模式实质属于服务器分片技术,由 Redis 集群自身提供分片性能,从 Redis 3.0 版本开始正式提供。

集群的原理是:一个 Redis 集群蕴含 16384 个哈希槽(Hash slot),Redis 保留的每个键都属于这 16384 个哈希槽的其中一个,集群应用公式 CRC16(key)%16384 来计算键 key 属于哪个槽,其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和。

集群中的每个节点负责解决一部分哈希槽。举个例子,一个集群能够有三个哈希槽,其中:

  • 节点 A 负责解决 0 号至 5500 号哈希槽。
  • 节点 B 负责解决 5501 号至 11000 号哈希槽。
  • 节点 C 负责解决 11001 号至 16383 号哈希槽。

Redis 在集群模式下对于 key 的读写过程首先将对应的 key 值进行 CRC16 计算失去对应的哈希值,将哈希值对槽位总数取模映射到对应的槽位,最终映射到对应的节点进行读写。以命令 set(“key”, “value”) 为例子,它会应用 CRC16 算法对 key 进行计算失去哈希值 28989,而后对 16384 进行取模失去 12605,最初找到 12605 对应的 Redis 节点,最终跳转到该节点执行 set 命令。

集群模式的整体利用如下图所示,外围在于集群哈希槽的设计以及重定向命令。

(援用自:www.jianshu.com)

三、Jedis 的根底用法

// Jedis 单机模式的拜访
public void main(String[] args) {
    // 创立 Jedis 对象
    jedis = new Jedis("localhost", 6379);
    // 执行 hmget 操作
    jedis.hmget("foobar", "foo");
    // 敞开 Jedis 对象
    jedis.close();}
 
// Jedis 分片模式的拜访
public void main(String[] args) {HostAndPort redis1 = HostAndPortUtil.getRedisServers().get(0);
    HostAndPort redis2 = HostAndPortUtil.getRedisServers().get(1);
    List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(2);
    JedisShardInfo shard1 = new JedisShardInfo(redis1);
    JedisShardInfo shard2 = new JedisShardInfo(redis2);
    // 创立 ShardedJedis 对象
    ShardedJedis shardedJedis = new ShardedJedis(shards);
    // 通过 ShardedJedis 对象执行 set 操作
    shardedJedis.set("a", "bar");
}
 
// Jedis 集群模式的拜访
public void main(String[] args) {
    // 构建 redis 的集群池
    Set<HostAndPort> nodes = new HashSet<>();
    nodes.add(new HostAndPort("127.0.0.1", 7001));
    nodes.add(new HostAndPort("127.0.0.1", 7002));
    nodes.add(new HostAndPort("127.0.0.1", 7003));
 
    // 创立 JedisCluster
    JedisCluster cluster = new JedisCluster(nodes);
 
    // 执行 JedisCluster 对象中的办法
    cluster.set("cluster-test", "my jedis cluster test");
    String result = cluster.get("cluster-test");
}

Jedis 通过创立 Jedis 的类对象来实现单机模式下的数据拜访,通过构建 JedisCluster 类对象来实现集群模式下的数据拜访。

要了解 Jedis 的拜访 Redis 的整个过程,能够通过先了解单机模式下的拜访流程,在这个根底上再剖析集群模式的拜访流程会比拟适合。

四、Jedis 单机模式的拜访

Jedis 拜访单机模式 Redis 的整体流程图如下所示,从图中能够看出外围的流程蕴含 Jedis 对象的创立以及通过 Jedis 对象实现 Redis 的拜访。

相熟 Jedis 拜访单机 Redis 的过程,自身就是须要理解 Jedis 的创立过程以及执行 Redis 命令的过程。

  • Jedis 的创立过程外围在于创立 Jedis 对象以及 Jedis 外部变量 Client 对象。
  • Jedis 拜访 Redis 的过程在于通过 Jedis 外部的 Client 对象拜访 Redis。

4.1 创立过程

Jedis 自身的类关系图如下图所示,从图中咱们可能看到 Jedis 继承自 BinaryJedis 类。

在 BinaryJedis 类中存在和 Redis 对接的 Client 类对象,Jedis 通过父类的 BinaryJedis 的 Client 对象实现 Redis 的读写。

Jedis 类在创立过程中通过父类 BinaryJedis 创立了 Client 对象,而理解 Client 对象是进一步了解拜访过程的要害。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
 
  protected JedisPoolAbstract dataSource = null;
 
  public Jedis(final String host, final int port) {
    // 创立父类 BinaryJedis 对象
    super(host, port);
  }
}
 
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  // 拜访 redis 的 Client 对象
  protected Client client = null;
 
  public BinaryJedis(final String host, final int port) {
    // 创立 Client 对象拜访 redis
    client = new Client(host, port);
  }
}

Client 类的类关系图如下图所示,Client 对象继承自 BinaryClient 和 Connection 类。在 BinaryClient 类中存在 Redis 拜访明码等相干参数,在 Connection 类在存在拜访 Redis 的 socket 对象以及对应的输入输出流。实质上 Connection 是和 Redis 进行通信的外围类。

Client 类在创立过程中初始化外围父类 Connection 对象,而 Connection 是负责和 Redis 间接进行通信。

public class Client extends BinaryClient implements Commands {public Client(final String host, final int port) {super(host, port);
  }
}
 
public class BinaryClient extends Connection {
  // 存储和 Redis 连贯的相干信息
  private boolean isInMulti;
  private String user;
  private String password;
  private int db;
  private boolean isInWatch;
 
  public BinaryClient(final String host, final int port) {super(host, port);
  }
}
 
public class Connection implements Closeable {
  // 治理和 Redis 连贯的 socket 信息及对应的输入输出流
  private JedisSocketFactory jedisSocketFactory;
  private Socket socket;
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
  private int infiniteSoTimeout = 0;
  private boolean broken = false;
 
  public Connection(final String host, final int port, final boolean ssl,
      SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier) {
    // 构建 DefaultJedisSocketFactory 来创立和 Redis 连贯的 Socket 对象
    this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
        Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
  }
}

4.2 拜访过程

以 Jedis 执行 set 命令为例,整个过程如下:

  • Jedis 的 set 操作是通过 Client 的 set 操作来实现的。
  • Client 的 set 操作是通过父类 Connection 的 sendCommand 来实现。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
  @Override
  public String set(final String key, final String value) {checkIsInMultiOrPipeline();
    // client 执行 set 操作
    client.set(key, value);
    return client.getStatusCodeReply();}
}
 
public class Client extends BinaryClient implements Commands {
  @Override
  public void set(final String key, final String value) {
    // 执行 set 命令
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
}
 
public class BinaryClient extends Connection {public void set(final byte[] key, final byte[] value) {
    // 发送 set 指令
    sendCommand(SET, key, value);
  }
}
 
public class Connection implements Closeable {public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // socket 连贯 redis
      connect();
      // 依照 redis 的协定发送命令
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {}}
}

五、Jedis 分片模式的拜访

基于后面曾经介绍的 Redis 分片模式的一致性 Hash 的原理来了解 Jedis 的分片模式的拜访。

对于 Redis 分片模式的概念:Redis 在 3.0 版本之前没有集群模式的概念,这导致单节点可能存储的数据无限,通过 Redis 的客户端如 Jedis 在客户端通过一致性 Hash 算法来实现数据的分片存储。

实质上 Redis 的分片模式跟 Redis 自身没有任何关系,只是通过客户端来解决单节点数据无限存储的问题。

ShardedJedis 拜访 Redis 的外围在于构建对象的时候初始化一致性 Hash 对象,构建一致性 Hash 经典的 Hash 值和 node 的映射关系。构建完映射关系后执行 set 等操作就是 Hash 值到 node 的寻址过程,寻址实现后间接进行单节点的操作。

5.1 创立过程

ShardedJedis 的创立过程在于父类的 Sharded 中对于一致性 Hash 相干的初始化过程,外围在于构建一致性的虚构节点以及虚构节点和 Redis 节点的映射关系。

源码中最外围的局部代码在于依据依据权重映射成未 160 个虚构节点,通过虚构节点来定位到具体的 Redis 节点。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  // 保留虚构节点和 redis 的 node 节点的映射关系
  private TreeMap<Long, S> nodes;
  // hash 算法
  private final Hashing algo;
  // 保留 redis 节点和拜访该节点的 Jedis 的连贯信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
  }
 
  private void initialize(List<S> shards) {nodes = new TreeMap<>();
    // 遍历每个 redis 的节点并设置 hash 值到节点的映射关系
    for (int i = 0; i != shards.size(); ++i) {final S shardInfo = shards.get(i);
      // 依据权重映射成未 160 个虚构节点
      int N =  160 * shardInfo.getWeight();
      if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
        // 构建 hash 值和节点映射关系
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < N; n++) {nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
      }
      // 保留每个节点的拜访对象
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
}

5.2 拜访过程

ShardedJedis 的拜访过程就是一致性 Hash 的计算过程,外围的逻辑就是:通过 Hash 算法对拜访的 key 进行 Hash 计算生成 Hash 值,依据 Hash 值获取对应 Redis 节点,依据对应的 Redis 节点获取对应的拜访对象 Jedis。

获取拜访对象 Jedis 之后就能够间接进行命令操作。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  private TreeMap<Long, S> nodes;
  private final Hashing algo;
  // 保留 redis 节点和拜访该节点的 Jedis 的连贯信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public R getShard(String key) {
    // 依据 redis 节点找到对应的拜访对象 Jedis
    return resources.get(getShardInfo(key));
  }
 
  public S getShardInfo(String key) {return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
  }
 
  public S getShardInfo(byte[] key) {
    // 针对拜访的 key 生成对应的 hash 值
    // 依据 hash 值找到对应的 redis 节点
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
  }
}

六、Jedis 集群模式的拜访

基于后面介绍的 Redis 的集群原理来了解 Jedis 的集群模式的拜访。

Jedis 可能实现 key 和哈希槽的定位的外围机制在于哈希槽和 Redis 节点的映射,而这个发现过程基于 Redis 的 cluster slot 命令。

对于 Redis 集群操作的命令:Redis 通过 cluster slots 会返回 Redis 集群的整体情况。返回每一个 Redis 节点的信息蕴含:

  • 哈希槽起始编号
  • 哈希槽完结编号
  • 哈希槽对应 master 节点,节点应用 IP/Port 示意
  • master 节点的第一个正本
  • master 节点的第二个正本
127.0.0.1:30001> cluster slots
1) 1) (integer) 0 // 开始槽位
   2) (integer) 5460 // 完结槽位
   3) 1) "127.0.0.1" // master 节点的 host
      2) (integer) 30001 // master 节点的 port
      3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // 节点的编码
   4) 1) "127.0.0.1" // slave 节点的 host
      2) (integer) 30004 // slave 节点的 port
      3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // 节点的编码
2) 1) (integer) 5461
   2) (integer) 10922
   3) 1) "127.0.0.1"
      2) (integer) 30002
      3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
   4) 1) "127.0.0.1"
      2) (integer) 30005
      3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
3) 1) (integer) 10923
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 30003
      3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1"
   4) 1) "127.0.0.1"
      2) (integer) 30006
      3) "58e6e48d41228013e5d9c1c37c5060693925e97e"

Jedis 拜访集群模式 Redis 的整体流程图如下所示,从图中能够看出外围的流程蕴含 JedisCluster 对象的创立以及通过 JedisCluster 对象实现 Redis 的拜访。

JedisCluster 对象的创立外围在于创立 JedisClusterInfoCache 对象并通过集群发现来建设 slot 和集群节点的映射关系。

JedisCluster 对 Redis 集群的拜访在于获取 key 所在的 Redis 节点并通过 Jedis 对象进行拜访。

6.1 创立过程

JedisCluster 的类关系如下图所示,在图中能够看到外围变量 JedisSlotBasedConnectionHandler 对象。

JedisCluster 的父类 BinaryJedisCluster 创立了 JedisSlotBasedConnectionHandler 对象,该对象负责和 Redis 的集群进行通信。

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
  public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 拜访父类 BinaryJedisCluster
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
  }
}
 
public class BinaryJedisCluster implements BinaryJedisClusterCommands,
    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
  public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String user, String password, String clientName, GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 创立 JedisSlotBasedConnectionHandler 对象
    this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
        connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
 
    this.maxAttempts = maxAttempts;
  }
}

JedisSlotBasedConnectionHandler 的外围在于创立并初始化 JedisClusterInfoCache 对象,该对象缓存了 Redis 集群的信息。

JedisClusterInfoCache 对象的初始化过程通过 initializeSlotsCache 来实现,次要目标用于实现集群节点和槽位发现。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
  public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    super(nodes, poolConfig, connectionTimeout, soTimeout, user, password, clientName,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
  }
}
 
public abstract class JedisClusterConnectionHandler implements Closeable {
  public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    // 创立 JedisClusterInfoCache 对象
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
 
    // 初始化 jedis 的 Slot 信息
    initializeSlotsCache(nodes, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  }
 
 
  private void initializeSlotsCache(Set<HostAndPort> startNodes,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {for (HostAndPort hostAndPort : startNodes) {try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
          soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) {
 
        // 通过 discoverClusterNodesAndSlots 进行集群发现
        cache.discoverClusterNodesAndSlots(jedis);
        return;
      } catch (JedisConnectionException e) {}}
  }
}

JedisClusterInfoCache 的 nodes 用来保留 Redis 集群的节点信息,slots 用来保留槽位和集群节点的信息。

nodes 和 slots 维持的对象都是 JedisPool 对象,该对象维持了和 Redis 的连贯信息。集群的发现过程由 discoverClusterNodesAndSlots 来实现,实质是执行 Redis 的集群发现命令 cluster slots 实现的。

public class JedisClusterInfoCache {
  // 负责保留 redis 集群的节点信息
  private final Map<String, JedisPool> nodes = new HashMap<>();
  // 负责保留 redis 的槽位和 redis 节点的映射关系
  private final Map<Integer, JedisPool> slots = new HashMap<>();
 
  // 负责集群的发现逻辑
  public void discoverClusterNodesAndSlots(Jedis jedis) {w.lock();
 
    try {reset();
      List<Object> slots = jedis.clusterSlots();
 
      for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;
 
        if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}
        // 获取 redis 节点对应的槽位信息
        List<Integer> slotNums = getAssignedSlotArray(slotInfo);
 
        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.isEmpty()) {continue;}
 
          HostAndPort targetNode = generateHostAndPort(hostInfos);
          // 负责保留 redis 节点信息
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            // 负责保留槽位和 redis 节点的映射关系
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {w.unlock();
    }
  }
 
  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {w.lock();
    try {JedisPool targetPool = setupNodeIfNotExist(targetNode);
      // 保留槽位和对应的 JedisPool 对象
      for (Integer slot : targetSlots) {slots.put(slot, targetPool);
      }
    } finally {w.unlock();
    }
  }
 
  public JedisPool setupNodeIfNotExist(HostAndPort node) {w.lock();
    try {
      // 生产 redis 节点对应的 nodeKey
      String nodeKey = getNodeKey(node);
      JedisPool existingPool = nodes.get(nodeKey);
      if (existingPool != null) return existingPool;
      // 生产 redis 节点对应的 JedisPool
      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, infiniteSoTimeout, user, password, 0, clientName,
          ssl, sslSocketFactory, sslParameters, hostnameVerifier);
      // 保留 redis 节点的 key 和对应的 JedisPool 对象
      nodes.put(nodeKey, nodePool);
      return nodePool;
    } finally {w.unlock();
    }
  }
}

JedisPool 的类关系如下图所示,其中外部 internalPool 是通过 apache common pool 来实现的池化。

JedisPool 外部的 internalPool 通过 JedisFactory 的 makeObject 来创立 Jedis 对象。

每个 Redis 节点都会对应一个 JedisPool 对象,通过 JedisPool 来治理 Jedis 的申请开释复用等。

public class JedisPool extends JedisPoolAbstract {public JedisPool() {this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
  }
}
 
public class JedisPoolAbstract extends Pool<Jedis> {public JedisPoolAbstract() {super();
  }
}
 
public abstract class Pool<T> implements Closeable {
  protected GenericObjectPool<T> internalPool;
 
  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {if (this.internalPool != null) {
      try {closeInternalPool();
      } catch (Exception e) {}}
    this.internalPool = new GenericObjectPool<>(factory, poolConfig);
  }
}
 
class JedisFactory implements PooledObjectFactory<Jedis> {
   
  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    // 创立 Jedis 对象
    final HostAndPort hp = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hp.getHost(), hp.getPort(), connectionTimeout, soTimeout,
        infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
 
    try {
      // Jedis 对象连接
      jedis.connect();
      if (user != null) {jedis.auth(user, password);
      } else if (password != null) {jedis.auth(password);
      }
      if (database != 0) {jedis.select(database);
      }
      if (clientName != null) {jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {jedis.close();
      throw je;
    }
    // 将 Jedis 对象包装成 DefaultPooledObject 进行返回
    return new DefaultPooledObject<>(jedis);
  }
}

6.2 拜访过程

JedisCluster 拜访 Redis 的过程通过 JedisClusterCommand 来实现重试机制,最终通过 Jedis 对象来实现拜访。从实现的角度来说 JedisCluster 是在 Jedis 之上封装了一层,进行集群节点定位以及重试机制等。

以 set 命令为例,整个拜访通过 JedisClusterCommand 实现如下:

  • 计算 key 所在的 Redis 节点。
  • 获取 Redis 节点对应的 Jedis 对象。
  • 通过 Jedis 对象进行 set 操作。
public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
 
  @Override
  public String set(final String key, final String value, final SetParams params) {return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {return connection.set(key, value, params);
      }
    }.run(key);
  }
}

JedisClusterCommand 的 run 办法外围次要定位 Redis 的 key 所在的 Redis 节点,而后获取与该节点对应的 Jedis 对象进行拜访。

在 Jedis 对象拜访异样后,JedisClusterCommand 会进行重试操作并依照肯定策略执行 renewSlotCache 办法进行重集群节点重发现动作。

public abstract class JedisClusterCommand<T> {public T run(String key) {
    // 针对 key 进行槽位的计算
    return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
  }
   
  private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
 
    Jedis connection = null;
    try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
        if (redirect instanceof JedisAskDataException) {connection.asking();
        }
      } else {if (tryRandomNode) {connection = connectionHandler.getConnection();
        } else {
          // 依据 slot 去获取 Jedis 对象
          connection = connectionHandler.getConnectionFromSlot(slot);
        }
      }
      // 执行真正的 Redis 的命令
      return execute(connection);
    } catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {releaseConnection(connection);
      connection = null;
 
      if (attempts <= 1) {
        // 保障最初两次机会去从新刷新槽位和节点的对应的信息
        this.connectionHandler.renewSlotCache();}
      // 依照重试次数进行重试操作
      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } catch (JedisRedirectionException jre) {
      // 针对返回 Move 命令立刻触发从新刷新槽位和节点的对应信息
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }
 
      releaseConnection(connection);
      connection = null;
 
      return runWithRetries(slot, attempts - 1, false, jre);
    } finally {releaseConnection(connection);
    }
  }
}

JedisSlotBasedConnectionHandler 的 cache 对象维持了 slot 和 node 的映射关系,通过 getConnectionFromSlot 办法来获取该 slot 对应的 Jedis 对象。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
 
  protected final JedisClusterInfoCache cache;
 
  @Override
  public Jedis getConnectionFromSlot(int slot) {
    // 获取槽位对应的 JedisPool 对象
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // 从 JedisPool 对象中获取 Jedis 对象
      return connectionPool.getResource();} else {
      // 获取失败就从新刷新槽位信息
      renewSlotCache();
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();}
    }
  }
}

七、Jedis 的 Pipeline 实现

Pipeline 的技术核心思想是将多个命令发送到服务器而不必期待回复,最初在一个步骤中读取该回答。这种模式的益处在于节俭了申请响应这种模式的网络开销。

Redis 的一般命令如 set 和 Pipeline 批量操作的外围的差异在于 set 命令的操作会间接发送申请到 Redis 并同步期待后果返回,而 Pipeline 的操作会发送申请但不立刻同步期待后果返回,具体的实现能够从 Jedis 的源码一探到底。

原生的 Pipeline 在集群模式下相干的 key 必须 Hash 到同一个节点能力失效,起因在于 Pipeline 下的 Client 对象只能其中的一个节点建设了连贯。

在集群模式下归属于不同节点的 key 可能应用 Pipeline 就须要针对每个 key 保留对应的节点的 client 对象,在最初执行获取数据的时候一并获取。实质上能够认为在单节点的 Pipeline 的根底上封装成一个集群式的 Pipeline。

7.1 Pipeline 用法剖析

Pipeline 拜访单节点的 Redis 的时候,通过 Jedis 对象的 Pipeline 办法返回 Pipeline 对象,其余的命令操作通过该 Pipeline 对象进行拜访。

Pipeline 从应用角度来剖析,会批量发送多个命令并最初对立应用 syncAndReturnAll 来一次性返回后果。

public void pipeline() {jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
    Pipeline p = jedis.pipelined();
    // 批量发送命令到 redis
    p.set("foo", "bar");
    p.get("foo");
    // 同步期待响应后果
    List<Object> results = p.syncAndReturnAll();
 
    assertEquals(2, results.size());
    assertEquals("OK", results.get(0));
    assertEquals("bar", results.get(1));
 }
 
 
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
 
  @Override
  public Response<String> set(final String key, final String value) {
    // 发送命令
    getClient(key).set(key, value);
    // pipeline 的 getResponse 只是把待响应的申请聚合到 pipelinedResponses 对象当中
    return getResponse(BuilderFactory.STRING);
  }
}
 
 
public class Queable {private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  protected <T> Response<T> getResponse(Builder<T> builder) {Response<T> lr = new Response<>(builder);
    // 对立保留到响应队列当中
    pipelinedResponses.add(lr);
    return lr;
  }
}
 
 
public class Pipeline extends MultiKeyPipelineBase implements Closeable {public List<Object> syncAndReturnAll() {if (getPipelinedResponseLength() > 0) {
      // 依据批量发送命令的个数即须要批量返回命令的个数,通过 client 对象进行批量读取
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      List<Object> formatted = new ArrayList<>();
      for (Object o : unformatted) {
        try {
          // 格式化每个返回的后果并最终保留在列表中进行返回
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {formatted.add(e);
        }
      }
      return formatted;
    } else {return java.util.Collections.<Object> emptyList();
    }
  }
}

一般 set 命令发送申请给 Redis 后立刻通过 getStatusCodeReply 来获取响应后果,所以这是一种申请响应的模式。

getStatusCodeReply 在获取响应后果的时候会通过 flush() 命令强制发送报文到 Redis 服务端而后通过读取响应后果。

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  @Override
  public String set(final byte[] key, final byte[] value) {checkIsInMultiOrPipeline();
    // 发送命令
    client.set(key, value);
    // 期待申请响应
    return client.getStatusCodeReply();}
}
 
 
public class Connection implements Closeable {public String getStatusCodeReply() {
    // 通过 flush 立刻发送申请
    flush();
    // 解决响应申请
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {return null;} else {return SafeEncoder.encode(resp);
    }
  }
}
 
 
public class Connection implements Closeable {protected void flush() {
    try {
      // 针对输入流进行 flush 操作保障报文的收回
      outputStream.flush();} catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    }
  }
}

八、结束语

Jedis 作为 Redis 官网首选的 Java 客户端开发包,反对绝大部分的 Redis 的命令,也是日常中应用较多的 Redis 客户端。

理解了 Jedis 的实现原理,除了可能反对 Redis 的日常操作外,还能更好的应答 Redis 的额定操作诸如扩容时的技术选型。

通过介绍 Jedis 针对单机模式、分配模式、集群模式三种场景拜访形式,让大家有个从宏观到宏观的了解过程,把握 Jedis 的核心思想并更好的利用到实际当中。

作者:vivo 互联网服务器团队 -Wang Zhi

正文完
 0