关于javascript:ioredis源码阅读1

53次阅读

共计 4954 个字符,预计需要花费 13 分钟才能阅读完成。

上次针对 redis 的源码浏览波及一般的 client,这次针对 cluster 模式下的 client 源码进行剖析。
具体的源码门路就是在 lib/cluster 目录下了。

Cluster 实例化

单刀直入,咱们应用 Cluster 模式最开始也是要进行实例化的,这里调用的代码位于 lib/cluster/index.ts

const {Cluster} = require('ioredis')

const cluster = new Redis.Cluster([
  {
    port: 6380,
    host: "127.0.0.1",
  },
  {
    port: 6381,
    host: "127.0.0.1",
  },
])

cluster.get('someKey').then()

从源码上来看,Cluster 预期接管两个参数,第一个是启动的节点汇合 startNodes,第二个是一个可选的 options
第一个参数是比拟固定的,没有太多含意,而第二个参数能够传递很多配置,这个能够在 README 中找到(目前是 12 个参数):https://www.npmjs.com/package/ioredis#cluster
如果没有传入的话,则会有默认值来填充,但并不是所有的参数都会有默认值。

Redis Client 一样的解决逻辑,在构造函数中会 call 一下 Commander,随后还会实例化一个 ConnectionPool 对象,并将 options.redisOptions 传入进去。

代码位于 lib/cluster/ConnectionPool.ts

在实例化 ConnectionPool 的过程中并没有做什么实质性的操作,只是把 options.redisOptions 放到了一个 private 属性中。
随后在 Cluster 构造函数中注册了四个对应的事件,别离是 -node+nodedrainnodeError,别离代表了 移除节点、减少节点、节点为空、节点出错。
ConnectionPool 本人实现的一些事件,后续会看到在哪里触发。

接下来实例化了一个 ClusterSubscriber 对象,并将上边实例化的 connectionPool 实例作为参数放了进去,并把 Cluster 实例的援用也传了进去。
实例化过程中做的事件也比较简单,就是监听了上边咱们提到的 -node+node 两个事件,在移除节点时,会判断是否存在 subscriber 属性,如果不存在则跳出,如果存在的话,会判断被移除的 key 是否等于以后 subscriber
这里能够提一下 subscriber 到底是什么,这个在下边的 selectSubscriber 函数中能够看到,就是实例化了一个 Redis Client,而实例化 Redis Client 所应用的参数,都是通过调用 connectionPoolgetNodes 办法所拿到的,并随机筛选其中一个节点配置来进行实例化。
之后就会通过该 Redis Client 来调用两个命令,subscriberpsubscriber,这两个是用来实现 Pub/Sub 的,具体区别其实就是后者能够监听一个带有通配符的服务。

subscriber 与 psubscriber 的区别:https://redis.io/topics/pubsub
并在接管到数据当前,通过 emit 的形式转发给 Cluster,后续 Cluster 模式下的 Pub/Sub 将会通过这里来进行数据的传递。

Connect

最初咱们调用 connect 函数就实现了整个 Cluster 实例化的过程。

如果是开启了 lazyConnect 那么这里就间接批改实例状态为 wait,而后完结整个流程。

connect 时咱们首先会解析 startNodes,拿到对应的 IP 和 端口等信息,而后会调用 reset 重置 connectionPool 中的实例,connectionPool 中会存储多个 IP+ 端口 的 Redis 实例援用,调用 reset 会把一些应该不存在的实例给关掉,而后把一些新减少的进行创立,复用曾经存在的实例,同时在新增节点的时候会触发 ClusterSubscriber+node 事件,如果此时是第一次触发,那么这时 ClusterSubscriber 才会真正的去创立用于 Pub/Sub 的 Redis 实例。

之后会注册一个 refresh 事件,在事件外部会调用 readyCheck,在这之前,则须要先去获取 Redis 节点的一些信息,这里是通过 getInfoFromNode 办法来实现的,外部会拿到一个 Redis 实例,并调用 duplicate 创立一个额定的实例,而后调用 cluster slots 命令来获取以后 Redis 集群服务状态,这里返回的数据会蕴含所有的节点 IP + 端口,同时蕴含某个节点的起始完结返回,具体的返回值如下:

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   1) (integer) 4095
   2) 1) "127.0.0.1"
      1) (integer) 7000
   3) 1) "127.0.0.1"
      1) (integer) 7004
2) 1) (integer) 12288
   1) (integer) 16383
   2) 1) "127.0.0.1"
      1) (integer) 7003
   3) 1) "127.0.0.1"
      1) (integer) 7007
3) 1) (integer) 4096
   1) (integer) 8191
   2) 1) "127.0.0.1"
      1) (integer) 7001
   3) 1) "127.0.0.1"
      1) (integer) 7005
4) 1) (integer) 8192
   1) (integer) 12287
   2) 1) "127.0.0.1"
      1) (integer) 7002
   3) 1) "127.0.0.1"
      1) (integer) 7006

转换成 JS 的数据大抵是一个这样的构造:

[
  [ // slot info
    0,    // slot range start 
    4095, // slot range end
    [
      '127.0.0.1', // IP
      7000         // port
    ]
  ],
  [  // other slot info
    12288,
    16383,
    [
      '127.0.0.1',
      7003
    ]
  ],
]

cluster slot 的形容:https://redis.io/commands/cluster-slots

在获取到这些实在的节点信息当前,会根据拿到的节点汇合,再次调用 connectionPoolreset 办法,因为上次调用其实是应用的 startNode 传入的初始值,这里则会应用以后服务正在运行的数据进行一次替换。
在实现这一动作后,则会触发 refresh 事件,也就会进入下边的 readyCheck 环节,确保服务是可用的。

readyCheck

查看 readyCheck 的实现,次要也是通过调用 cluster info 命令来获取以后服务的状态。
在前文所讲解决 twemproxy 模式下的问题时,将 Redis ClientreadyCheckinfo 批改为了 ping 命令来实现,而这里,则没有进行批改,因为要留神的是,这里并不是 info 命令,而是 cluster 命令,只不过参数是 info

Cluster 模块会应用 cluster info 命令中的 cluster_state 字段来作为检测的根据,数据会依照 k:v\nk:v\n 这种格局组合,所以咱们会在代码中看到通过匹配换行来获得对应的字段,并通过截取的形式拿到具体的值。

不过针对这里的逻辑,我集体倒是感觉间接用正则匹配反而更简略一些,因为拿到参数的值并没有做一些额定的操作,仅仅是用来验证。

private readyCheck(callback: CallbackFunction<void | "fail">): void {(this as any).cluster("info", function (err, res) {if (err) {return callback(err);
    }
    if (typeof res !== "string") {return callback();
    }

    let state;
    const lines = res.split("\r\n");
    for (let i = 0; i < lines.length; ++i) {const parts = lines[i].split(":");
      if (parts[0] === "cluster_state") {state = parts[1];
        break;
      }
    }

    if (state === "fail") {debug("cluster state not ok (%s)", state);
      callback(null, state);
    } else {callback();
    }
  });
}

当咱们发现 cluster info 中返回的数据为 fail 时,那么阐明集群中的这个节点是一个不可用的状态,那么就会调用 disconnect 断开并进行重连。
在触发 disconnect 的时候,同时会敞开 ClusterSubscriber 中的实例,因为咱们的连贯曾经要敞开了,那么也没有必要留着一个注册 Pub/Sub 的实例在这里了。
在这些操作实现之后,会进入 retry 的流程,这里其实就是依照某种逻辑从新调用了 connect 办法,再次执行前边所形容的逻辑。

针对整个流程画图示意大略是这样的:

sendCommand

在实例创立结束后,那么下一步就会波及到调用命令了。
在前边实例化过程中不可避免的也提到了一些 sendCommand 的事件,Redis 在实例化的过程中,会有一个状态的变更,而每次触发 sendCommand 实际上都会去查看那个状态,如果以后还没有建设好连贯,那么这时的命令会被放入到 offlineQueue 中暂存的。
readyCheck 通过之后会依照程序来调用这些命令。

当然,在 sendCommand 办法中也存在了对以后实例状态的判断,如果是 wait,那么能够认为实例开启了 lazyConnect 模式,这时会尝试与服务端建设连贯。

同时在 sendCommand 中也会对命令进行判断,一些 Pub/Sub 对应的命令,比方 publish,会被转发到 ClusterSubscriber 对应的实例下来执行,而其余一般的命令则会放到 connectionPool 中去执行。
通过这样的形式将公布订阅与一般的命令进行了拆分。

同样,因为是 Cluster 模式,所以还会有主从之间的拆分逻辑,这个能够通过实例化 Redis Cluster 时传入的 scaleReads 参数来决定,默认的话是 master,可选的还有 allslave 以及一个接管命令以及实例列表的自定义函数。

知识点来了

在 ioredis 中,默认状况下的配置是 master,这也就意味着所有的申请都会发送到 master 节点,这就意味着如果你为了进步读取的性能所创立的一些从库,__基本不会被拜访到__。

详情见文档:https://www.npmjs.com/package/ioredis#user-content-read-write-splitting

如果想要应用从库,那么能够把 scaleReads 批改为 slave,然而不须要放心说一些会对数据库造成批改的命令发送到从库,在 sendCommand 中会针对所发送的命令进行检测,如果不是只读的命令,且 scaleReads 设置的不是 master 会强行笼罩为 master

针对命令是否为只读的判断:https://github.com/luin/ioredis/blob/master/lib/cluster/index.ts#L599

而后对于那个自定义函数,其实就是须要本人依据 command 去评估到底应用哪个(些)实例,而后把对应的实例返回进来。
最终,咱们拿到了一个 Redis 实例,这时应用该 Redis 实例进行调用 sendCommand 即可。
而后后边的逻辑就和一般的 Redis 触发命令没有什么区别了。

总结

总的来看,在 ioredis 的实现中 Redis Cluster 是作为一个 Redis 的扩大来做的,在很多中央都会看到 Redis 的存在,并且同样都会继承自 Command 实例,这就让用户在应用的过程中并没有太多的差别,只有在实例化时传入的参数不太一样,在调用各种 Redis 命令时则没有区别,而在 Cluster 中则外部调用了 RedissendCommand 实现了逻辑上的复用。

正文完
 0