关于源码学习:ioredis源码阅读0

63次阅读

共计 11063 个字符,预计需要花费 28 分钟才能阅读完成。

最近因为工作须要,要去搞一个 Node.js 端的 Redis Client 组件进去,临时抉择通过 ioredis 来作为 fork 对象。
因为之前有遇到过 Redis 在应用 twemproxy 时会始终呈现无奈连贯服务器的问题,详情见 issues:https://github.com/luin/ioredis/issues/573
所以会批改源码批改这一问题,不过在批改实现之后跑单元测试发现,事件没有那么简略,并不只是 info -> ping 这样,所以只好去相熟源码,而后针对性地调整一下逻辑。

<!–more–>

ioredis 我的项目构造

从我的项目中看,源码都在 lib 文件夹下,是一个纯正的 TS 我的项目。
lib 目录下的文件次要是一些通用能力的提供,比方 commandpipeline 以及数据的传输等。

.
├── DataHandler.ts                    # 数据处理
├── ScanStream.ts
├── SubscriptionSet.ts
├── autoPipelining.ts
├── cluster                           # Redis Cluster 模式的实现
│   ├── ClusterOptions.ts
│   ├── ClusterSubscriber.ts
│   ├── ConnectionPool.ts
│   ├── DelayQueue.ts
│   ├── index.ts
│   └── util.ts
├── command.ts                        # 命令的具体实现
├── commander.ts                      # command 的调度方
├── connectors                        # 网络连接相干
│   ├── AbstractConnector.ts
│   ├── SentinelConnector
│   ├── StandaloneConnector.ts
│   └── index.ts
├── errors                            # 异样信息相干
│   ├── ClusterAllFailedError.ts
│   ├── MaxRetriesPerRequestError.ts
│   └── index.ts
├── index.ts                          # 入口文件
├── pipeline.ts                       # 管道逻辑
├── promiseContainer.ts               # Promise 的一个封装
├── Redis                             # `Redis 实例的实现 `
│   ├── RedisOptions.ts
│   ├── event_handler.ts
│   └── index.ts
├── script.ts
├── transaction.ts
├── types.ts
└── utils                             # 一些工具函数的实现
    ├── debug.ts
    ├── index.ts
    └── lodash.ts

而下分的两个文件夹,rediscluster 都是具体的 redis client 实现,cluster 是对应的 cluster 集群化实现。
所以在看 README 的时候咱们会发现有两种实例能够应用,https://www.npmjs.com/package/ioredis

new Redis
new Redis.Cluster

咱们先从最一般的 Redis 开始看,本篇笔记次要是针对 Redis,联合着 README 一步步捋逻辑。

const `Redis` = require("ioredis");
const `Redis` = `new Redis`();

redis.set("foo", "bar");
redis.get("foo", function (err, result) {if (err) {console.error(err);
  } else {console.log(result);
  }
});

最根底的应用程序,首先实例化一个 Redis 对象,而后调用 Redis 对应的命令,如果对 Redis 命令不相熟能够看一下这个网站:https://redis.io/commands#

入口代码位于 redis/index.ts,虽说 ioredis 用了 TS,然而构造函数的实现仍然应用的是很古老的 ES5 形式,别离继承了 EventEmitterCommander 两个类,第一个是 events 的,第二个则是 ioredis 本人提供的一个类,就在 commander.ts 文件中实现。

Redis 实例化

Redis 次要做的事件就是:

  • 建设并保护与 Redis Server 的网络连接
  • 健康检查
  • 保护队列在异常情况下保障申请不丢,可重试

看回到 Redis 会看到针对 this.connector 的一个赋值,抛开自定义的 ConnectorSentinels 只看最初一项最一般的 StandaloneConnector,这里就是用来建设与 Redis Server 的连贯的。
翻看 lib/connectors/StandaloneConnector.ts 的文件会发现,最终调用的是 net.createConnection,这个其实也能和咱们在上边提到的 RESP 所对应上,就是用的最根本的 Redis 通信协定来实现操作的。

各项参数初始化结束后,则会调用 connect 来与 Redis Server 建设真正的连贯。

net 模块的 createConnection 只能建设网络连接,并不能保障是咱们预期的 Redis 服务。
通过 connect 拿到的 stream 对象其实就是 socket client:https://github.com/luin/ioredis/blob/master/lib/redis/index.ts#L321
connect 办法中次要就是去建设与 Redis Server 的链接,在建设连贯当前,咱们会调用 event_handler.connectHandler 办法。
这里次要做了两件事:

  1. 去尝试 check Redis Server 的状态,也就是咱们最开始提到的遇到的那个坑了,咱们能够通过 Redis.prototype._readyCheck 办法看到具体的实现,ioredis 采纳 info 命令作为探针,然而这个在 twemproxy 集群模式下就会产生一些问题,因为该模式会禁用一些命令,其中就包含 info,那么这就会导致 Redis Client 始终认为服务是不可用的。
  2. 增加了针对 socket clientdata 事件监听,这里是用于后续承受返回数据的,次要逻辑在 DataHandler.ts,后边会提到。

readyCheck 的逻辑存在于 redis/index.ts 和 redis/event_handler.ts 文件中

Redis.prototype._readyCheck = function (callback) {
  const _this = this;
  this.info(function (err, res) {if (err) {return callback(err);
    }
    if (typeof res !== "string") {return callback(null, res);
    }

    const info: {[key: string]: any } = {};

    const lines = res.split("\r\n");
    for (let i = 0; i < lines.length; ++i) {const [fieldName, ...fieldValueParts] = lines[i].split(":");
      const fieldValue = fieldValueParts.join(":");
      if (fieldValue) {info[fieldName] = fieldValue;
      }
    }

    if (!info.loading || info.loading === "0") {callback(null, info);
    } else {const loadingEtaMs = (info.loading_eta_seconds || 1) * 1000;
      const retryTime =
        _this.options.maxLoadingRetryTime &&
        _this.options.maxLoadingRetryTime < loadingEtaMs
          ? _this.options.maxLoadingRetryTime
          : loadingEtaMs;
      debug("Redis server still loading, trying again in" + retryTime + "ms");
      setTimeout(function () {_this._readyCheck(callback);
      }, retryTime);
    }
  });
};

在检测 Redis 可用当前则会触发 callback,该 callback 还会去查看 offlineQueue 是否有值,能够了解为是 Redis 可用之前调用命令的那些记录,ioredis 并不会间接报错通知你说连贯未建设,而是暂存在本人的一个队列中,等到可用后依照程序收回去。
Redis 在实例化的过程中次要也就是做了这些事件,接下来咱们就要看 Redis 命令收回当前,具体执行的逻辑了。

Commander

Commander 的作用就是实现了各种 Redis Client 的命令,通过 https://www.npmjs.com/package/redis-commands 遍历失去的。
同时会针对 ClientReady 状态进行解决,在 Ready 之前会做一些暂存命令之类的操作。
比拟像是一个抽象类,因为 RedisRedis Cluster 都会继承并笼罩一些 API 来实现工作。

commands.forEach(function (commandName) {Commander.prototype[commandName] = generateFunction(commandName, "utf8");
  Commander.prototype[commandName + "Buffer"] = generateFunction(
    commandName,
    null
  );
});

function generateFunction(_encoding: string);
function generateFunction(_commandName: string | void, _encoding: string);
function generateFunction(_commandName?: string, _encoding?: string) {if (typeof _encoding === "undefined") {
    _encoding = _commandName;
    _commandName = null;
  }

  return function (...args) {const commandName = _commandName || args.shift();
    let callback = args[args.length - 1];

    if (typeof callback === "function") {args.pop();
    } else {callback = undefined;}

    const options = {
      errorStack: this.options.showFriendlyErrorStack
        ? new Error().stack
        : undefined,
      keyPrefix: this.options.keyPrefix,
      replyEncoding: _encoding,
    };

    if (this.options.dropBufferSupport && !_encoding) {
      return asCallback(PromiseContainer.get().reject(new Error(DROP_BUFFER_SUPPORT_ERROR)),
        callback
      );
    }

    // No auto pipeline, use regular command sending
    if (!shouldUseAutoPipelining(this, commandName)) {
      return this.sendCommand(new Command(commandName, args, options, callback)
      );
    }

    // Create a new pipeline and make sure it's scheduled
    return executeWithAutoPipelining(this, commandName, args, callback);
  };
}

在实现所有命令的同时还实现了一批 Buffer 后缀的 API,他们次要的区别咱们能够通过 generateFunction 函数的实现来看到,被传入到了 Command 实例中。
Command 对象则是具体的命令实现,所以咱们还须要先去看一下 Command。

Command

Command 负责的事件,次要是参数的解决、返回值的解决,生成命令传输的理论值以及 callback 的触发。

实例化

Command 的实例化过程中,除去一些属性的赋值,还调用了一个 initPromise 办法,在外部生成了一个 Promise 对象。
其中有两处比拟重要的解决,一个是对于参数的转换,还有一个是返回值的解决。

private initPromise() {const Promise = getPromise();
  const promise = new Promise((resolve, reject) => {if (!this.transformed) {
      this.transformed = true;
      const transformer = Command._transformer.argument[this.name];
      if (transformer) {this.args = transformer(this.args);
      }
      this.stringifyArguments();}

    this.resolve = this._convertValue(resolve);
    if (this.errorStack) {this.reject = (err) => {reject(optimizeErrorStack(err, this.errorStack, __dirname));
      };
    } else {this.reject = reject;}
  });

  this.promise = asCallback(promise, this.callback);
}

参数、返回值非凡解决

如果检索 Command.ts 文件,会发现 Command._transformer.argument 通过 setArgumentTransformer 办法进行设置。
而后再察看代码中有用到 setArgumentTransformer 的是少数几个 hset 命令,以及 mset 命令。

Command.setArgumentTransformer("hmset", function (args) {if (args.length === 2) {if (typeof Map !== "undefined" && args[1] instanceof Map) {return [args[0]].concat(convertMapToArray(args[1]));
    }
    if (typeof args[1] === "object" && args[1] !== null) {return [args[0]].concat(convertObjectToArray(args[1]));
    }
  }
  return args;
});

如果大家应用过 Redishash set 操作,应该都会晓得,操作多个键值的形式是通过追加参数实现的:

> HMSET key field value [field value ...]

这样在 JS 中应用也须要将一个数组传递进去,由用户本人保护数组的 key value,这样一个程序的操作形式,必然是没有写 JS 习惯的 Object 传参要难受的,所以 ioredis 提供一个参数转换的逻辑,用来将 Object 转换为一维数组:

export function convertObjectToArray(obj) {const result = [];
  const keys = Object.keys(obj);

  for (let i = 0, l = keys.length; i < l; i++) {result.push(keys[i], obj[keys[i]]);
  }
  return result;
}

export function convertMapToArray<K, V>(map: Map<K, V>): Array<K | V> {const result = [];
  let pos = 0;
  map.forEach(function (value, key) {result[pos] = key;
    result[pos + 1] = value;
    pos += 2;
  });
  return result;
}

如果认真看 Command._transformer 会发现还有一个 reply 属性值,这里的逻辑次要在 _convertValue 中有所体现,大抵就是在接管到返回值当前,会先调用咱们传入的自定义函数用来解决返回值。
目前翻代码用到的惟一一处是 hgetall 的解决逻辑,hmgethgetallRedis 中都是返回一个数组的数据,而 ioredis 将数组依照 kv 的格局拼接为一个 Object 不便用户操作。

Command.setReplyTransformer("hgetall", function (result) {if (Array.isArray(result)) {const obj = {};
    for (let i = 0; i < result.length; i += 2) {obj[result[i]] = result[i + 1];
    }
    return obj;
  }
  return result;
});

设置 key 前缀

如果看 Command 实例化的过程中,还会发现有 _iterateKeys 这样的一个函数调用,该函数具备两个作用:

  1. 提取参数中所有的 key
  2. 可选的将 key 增加一个前缀(prefix)

函数外部应用了 redis-commands 的两个 API,existsgetKeyIndexes,用来获取参数数组中所有的 key 的下标。
因为这个函数做了两件事,所以在第一次看到构造函数的用法时,再看函数具体的实现,会对最初返回的 this.keys 很纳闷,然而当看到 Command 还提供了一个 getKeys API 就可能明确是怎么的逻辑了。

如果设置了 keyPrefix,则会触发 _iterateKeys 用来调整 key 名,并存储到 keys 中用于返回值。
当调用 getKeys 时,如果没有设置 keyPrefix,则会用默认的空处理函数来执行同样的逻辑,就是获取所有的 key,而后返回进来;如果之前曾经设置过 keyPrefix 那么就会间接返回 this.keys 不再反复执行逻辑。

// 构造函数内逻辑
if (options.keyPrefix) {this._iterateKeys((key) => options.keyPrefix + key);
}

// 另一处调用的地位
public getKeys(): Array<string | Buffer> {return this._iterateKeys();
}

private _iterateKeys(transform: Function = (key) => key
): Array<string | Buffer> {if (typeof this.keys === "undefined") {this.keys = [];
    if (commands.exists(this.name)) {const keyIndexes = commands.getKeyIndexes(this.name, this.args);
      for (const index of keyIndexes) {this.args[index] = transform(this.args[index]);
        this.keys.push(this.args[index] as string | Buffer);
      }
    }
  }
  return this.keys;
}

发送命令数据的生成

大家应用 Redis 应该更多的是通过代码中的 Client 调用各种命令来做,偶然会通过 redis-cli 间接命令行操作。
但其实 Redis 应用了一个叫做 RESP (REdis Serialization Protocol) 的协定来进行传输。
如果本机有 Redis 的话,咱们在本地能够很简略的进行演示。

> echo -e '*1\r\n$4\r\nPING\r\n' | nc 127.0.0.1 6379
+PONG

咱们会失去一个 +PONG 字符串。这样的一个交互其实才是绝大多数 ClientRedis Server 交互时所应用的格局。

P.S. RESP 有提供人类可读的版本进行交互,然而性能绝对要低一些。

举例说明如果咱们要执行一个 set 和一个 get 应该怎么去写这个命令:

# 结尾代表正文

# SET hello world
# 参数个数
*3
# 该行命令值的长度(set 命令)$3
# 命令对应的值(set 命令)SET
# 该行命令值的长度(具体的 key: hello)$5
# 命令对应的值(具体的 key: hello)hello
# 该行命令值的长度(value 的长度)$5
# 命令对应的值(value 本体)world

# GET hello
# 参数个数
*2
# 该行命令值的长度(get 命令)$3
# 命令对应的值(get 命令)GET
# 该行命令值的长度(具体的 key: hello)$5
# 命令对应的值(具体的 key: hello)hello

set 的返回值没什么意外,就是一个 +OK,而 get 的返回值则有两行,第一行 $5 示意返回值的长度,第二行才是真正的返回值 world
所以如果去看 Command 的 toWritable 函数就是实现了这样的逻辑,因为比拟长所以就不贴了:https://github.com/luin/ioredis/blob/master/lib/command.ts#L269

Command 次要实现的就是这些逻辑,咱们在 Commander 的眼帘中能够看到所有命令调用的开端都会执行 this.sendCommand,具体的调度就是在 RedisRedis Cluster 等具体的实现中做的了。所以咱们能够回到 Redis 去看下实现逻辑。

Redis 发送命令

sendCommand 的实现中,会进行 Redis 状态的查看,如果是 wait 或者 end 之类的,会进行对应的解决。
而后咱们会去查看以后是否是一个能够发送命令的状态:

let writable =
    this.status === "ready" ||
    (!stream &&
      this.status === "connect" &&
      commands.exists(command.name) &&
      commands.hasFlag(command.name, "loading"));
  if (!this.stream) {writable = false;} else if (!this.stream.writable) {writable = false;} else if (this.stream._writableState && this.stream._writableState.ended) {writable = false;}

代码还算比拟清晰,这里也要提到一点,咱们在解决 info 命令的问题是,应用 ping 命令来代替 info,最后就卡在了这里,后续 debug 发现,ping 命令并不具备 loading 这一 flag 个性,所以 ping 命令都被放到了 offlineQueue 中,针对这一状况,咱们将 ping 增加一个额定的判断逻辑,确保 write 的值为真。

接下来如果 write 为真,那么咱们就会应用 stream 也就是前边建设的 socket 连贯来发送咱们实在的命令了,这时候就是调用的 write 并将 Command#toWritable 的返回值作为数据传进去,也就是之前提到的基于 RESP 格局的序列化。
同时会将一些信息放到 commandQueue 中,它和 offlineQueue 都是同一个类型的实例,后边会提到具体的作用。

this.commandQueue.push({
  command: command,  // Command 实例
  stream: stream,    // socket client(其实并没有中央会用到它,不晓得为什么要传过来)select: this.condition.select, // 这个也是没有被用到
});

另一个开源的模块,denque: https://www.npmjs.com/package…

如果 write 为假,那么命令就会被放到 offlineQueue 中。

完结逻辑后会把 command.promise 进行返回,咱们在 Command 实例化过程中能够看到,其实是实例化了一个 Promise 对象,并把 resolvereject 做了一次援用,后边在数据返回时会用到。
当咱们命令曾经发送结束后,那么下一步就是等数据返回了,这里就要说到前边在介绍 React 实例化后 connect 所调用的 DataHandler 实例所做的事件了。

DataHandler

DataHandler 是一个比拟另类的类的写法,因为应用时就间接 new 了但并没有接管返回值。
在构造函数中,就做了两件事,一个是实例化了一个 RedisParser 对象,另一个就是监听了 redis.stream.on('data') 事件,也就是咱们在实例化 Redis 时传递过去的 socket client,在 data 事件触发时调用 RedisParser.execute 来实现解析。
RedisParser 是另一个开源模块了,有趣味的小伙伴能够看这里:https://www.npmjs.com/package/redis-parser
目前能够认为在调用 execute 办法后会调用实例化时传入的 return Reply 就能够了,这是一个解析后的 response,咱们会拿到这个 response 之后会从 commandQueue 中顺次取出之前传入的对象。
取出的形式是依照队列的形式来取的,通过 shift,每次取出队列中的第一个元素。
而后调用元素中 command 属性的 resolve 办法,也就是咱们在调用各种 Redis 命令时传入的 callback 了。

这里须要补充一些 Redis 相干的常识,咱们从整个逻辑链路能够看到,大抵是这样的:

  1. 用户执行命令
  2. Redis 实例化 Command 并放入队列
  3. 接管到数据响应后解析数据,并获取队列中第一个元素,调用对应的 callback

同工夫可能会有很多 Redis 申请被收回去,然而再接管到数据后并不需要去判断这次响应对应的是哪一个 command,因为 Redis 自身也是一个单过程的工作模式,命令的解决也会依照接收数据的先后顺序来解决,因为自身 ioredis 用的也是同一个 socket 连贯,所以也不会存在说命令发送到远端的先后顺序会发生变化。
所以咱们就能够很释怀的通过最简略的形式,push + shift 来解决数据了。

这也是为什么一些大 key 的操作会导致整个 Redis 服务响应变慢了。(在不做分片之类的解决状况下)

小结

到此为止,一般模式下的 Redis Client 整体逻辑咱们曾经梳理完了,从创立到发送命令到接管返回值。
后边会针对 Redis Cluster 再输入一篇笔记,一起来看一下在 Cluster 模式下又会有什么不一样的解决逻辑。

参考资料

  • ioredis
  • redis commands
  • Node.js | net
  • Why Redis is single-threaded and why is Redis so fast!
  • Redis is single-threaded, then how does it do concurrent I/O?

正文完
 0