最近因为工作须要,要去搞一个 Node.js 端的
Redis Client
组件进去,临时抉择通过ioredis
来作为 fork 对象。
因为之前有遇到过Redis
在应用 twemproxy 时会始终呈现无奈连贯服务器的问题,详情见 issues:https://github.com/luin/ioredis/issues/573
所以会批改源码批改这一问题,不过在批改实现之后跑单元测试发现,事件没有那么简略,并不只是 info -> ping 这样,所以只好去相熟源码,而后针对性地调整一下逻辑。
<!–more–>
ioredis 我的项目构造
从我的项目中看,源码都在 lib
文件夹下,是一个纯正的 TS 我的项目。lib
目录下的文件次要是一些通用能力的提供,比方 command
、pipeline
以及数据的传输等。
.
├── DataHandler.ts # 数据处理
├── ScanStream.ts
├── SubscriptionSet.ts
├── autoPipelining.ts
├── cluster # Redis Cluster 模式的实现
│ ├── ClusterOptions.ts
│ ├── ClusterSubscriber.ts
│ ├── ConnectionPool.ts
│ ├── DelayQueue.ts
│ ├── index.ts
│ └── util.ts
├── command.ts # 命令的具体实现
├── commander.ts # command 的调度方
├── connectors # 网络连接相干
│ ├── AbstractConnector.ts
│ ├── SentinelConnector
│ ├── StandaloneConnector.ts
│ └── index.ts
├── errors # 异样信息相干
│ ├── ClusterAllFailedError.ts
│ ├── MaxRetriesPerRequestError.ts
│ └── index.ts
├── index.ts # 入口文件
├── pipeline.ts # 管道逻辑
├── promiseContainer.ts # Promise 的一个封装
├── Redis # `Redis 实例的实现 `
│ ├── RedisOptions.ts
│ ├── event_handler.ts
│ └── index.ts
├── script.ts
├── transaction.ts
├── types.ts
└── utils # 一些工具函数的实现
├── debug.ts
├── index.ts
└── lodash.ts
而下分的两个文件夹,redis
与 cluster
都是具体的 redis
client
实现,cluster
是对应的 cluster
集群化实现。
所以在看 README
的时候咱们会发现有两种实例能够应用,https://www.npmjs.com/package/ioredis
new Redis
new Redis.Cluster
咱们先从最一般的 Redis
开始看,本篇笔记次要是针对 Redis
,联合着 README 一步步捋逻辑。
const `Redis` = require("ioredis");
const `Redis` = `new Redis`();
redis.set("foo", "bar");
redis.get("foo", function (err, result) {if (err) {console.error(err);
} else {console.log(result);
}
});
最根底的应用程序,首先实例化一个 Redis
对象,而后调用 Redis
对应的命令,如果对 Redis
命令不相熟能够看一下这个网站:https://redis.io/commands#
入口代码位于 redis/index.ts,虽说 ioredis
用了 TS,然而构造函数的实现仍然应用的是很古老的 ES5 形式,别离继承了 EventEmitter
和 Commander
两个类,第一个是 events
的,第二个则是 ioredis
本人提供的一个类,就在 commander.ts
文件中实现。
Redis 实例化
Redis
次要做的事件就是:
- 建设并保护与
Redis Server
的网络连接 - 健康检查
- 保护队列在异常情况下保障申请不丢,可重试
看回到 Redis
会看到针对 this.connector 的一个赋值,抛开自定义的 Connector
与 Sentinels
只看最初一项最一般的 StandaloneConnector
,这里就是用来建设与 Redis Server
的连贯的。
翻看 lib/connectors/StandaloneConnector.ts
的文件会发现,最终调用的是 net.createConnection
,这个其实也能和咱们在上边提到的 RESP
所对应上,就是用的最根本的 Redis
通信协定来实现操作的。
各项参数初始化结束后,则会调用 connect
来与 Redis Server
建设真正的连贯。
net
模块的 createConnection
只能建设网络连接,并不能保障是咱们预期的 Redis
服务。
通过 connect
拿到的 stream
对象其实就是 socket client
:https://github.com/luin/ioredis/blob/master/lib/redis/index.ts#L321
在 connect
办法中次要就是去建设与 Redis Server
的链接,在建设连贯当前,咱们会调用 event_handler.connectHandler
办法。
这里次要做了两件事:
- 去尝试 check
Redis Server
的状态,也就是咱们最开始提到的遇到的那个坑了,咱们能够通过Redis.prototype._readyCheck
办法看到具体的实现,ioredis
采纳info
命令作为探针,然而这个在twemproxy
集群模式下就会产生一些问题,因为该模式会禁用一些命令,其中就包含info
,那么这就会导致Redis Client
始终认为服务是不可用的。 - 增加了针对
socket client
的data
事件监听,这里是用于后续承受返回数据的,次要逻辑在DataHandler.ts
,后边会提到。
readyCheck 的逻辑存在于 redis/index.ts 和 redis/event_handler.ts 文件中
Redis.prototype._readyCheck = function (callback) {
const _this = this;
this.info(function (err, res) {if (err) {return callback(err);
}
if (typeof res !== "string") {return callback(null, res);
}
const info: {[key: string]: any } = {};
const lines = res.split("\r\n");
for (let i = 0; i < lines.length; ++i) {const [fieldName, ...fieldValueParts] = lines[i].split(":");
const fieldValue = fieldValueParts.join(":");
if (fieldValue) {info[fieldName] = fieldValue;
}
}
if (!info.loading || info.loading === "0") {callback(null, info);
} else {const loadingEtaMs = (info.loading_eta_seconds || 1) * 1000;
const retryTime =
_this.options.maxLoadingRetryTime &&
_this.options.maxLoadingRetryTime < loadingEtaMs
? _this.options.maxLoadingRetryTime
: loadingEtaMs;
debug("Redis server still loading, trying again in" + retryTime + "ms");
setTimeout(function () {_this._readyCheck(callback);
}, retryTime);
}
});
};
在检测 Redis
可用当前则会触发 callback
,该 callback
还会去查看 offlineQueue
是否有值,能够了解为是 Redis
可用之前调用命令的那些记录,ioredis
并不会间接报错通知你说连贯未建设,而是暂存在本人的一个队列中,等到可用后依照程序收回去。Redis
在实例化的过程中次要也就是做了这些事件,接下来咱们就要看 Redis
命令收回当前,具体执行的逻辑了。
Commander
Commander 的作用就是实现了各种 Redis Client
的命令,通过 https://www.npmjs.com/package/redis-commands 遍历失去的。
同时会针对 Client
的 Ready
状态进行解决,在 Ready
之前会做一些暂存命令之类的操作。
比拟像是一个抽象类,因为 Redis
和 Redis Cluster
都会继承并笼罩一些 API 来实现工作。
commands.forEach(function (commandName) {Commander.prototype[commandName] = generateFunction(commandName, "utf8");
Commander.prototype[commandName + "Buffer"] = generateFunction(
commandName,
null
);
});
function generateFunction(_encoding: string);
function generateFunction(_commandName: string | void, _encoding: string);
function generateFunction(_commandName?: string, _encoding?: string) {if (typeof _encoding === "undefined") {
_encoding = _commandName;
_commandName = null;
}
return function (...args) {const commandName = _commandName || args.shift();
let callback = args[args.length - 1];
if (typeof callback === "function") {args.pop();
} else {callback = undefined;}
const options = {
errorStack: this.options.showFriendlyErrorStack
? new Error().stack
: undefined,
keyPrefix: this.options.keyPrefix,
replyEncoding: _encoding,
};
if (this.options.dropBufferSupport && !_encoding) {
return asCallback(PromiseContainer.get().reject(new Error(DROP_BUFFER_SUPPORT_ERROR)),
callback
);
}
// No auto pipeline, use regular command sending
if (!shouldUseAutoPipelining(this, commandName)) {
return this.sendCommand(new Command(commandName, args, options, callback)
);
}
// Create a new pipeline and make sure it's scheduled
return executeWithAutoPipelining(this, commandName, args, callback);
};
}
在实现所有命令的同时还实现了一批 Buffer
后缀的 API,他们次要的区别咱们能够通过 generateFunction
函数的实现来看到,被传入到了 Command
实例中。
而 Command
对象则是具体的命令实现,所以咱们还须要先去看一下 Command。
Command
Command
负责的事件,次要是参数的解决、返回值的解决,生成命令传输的理论值以及 callback
的触发。
实例化
在 Command
的实例化过程中,除去一些属性的赋值,还调用了一个 initPromise
办法,在外部生成了一个 Promise
对象。
其中有两处比拟重要的解决,一个是对于参数的转换,还有一个是返回值的解决。
private initPromise() {const Promise = getPromise();
const promise = new Promise((resolve, reject) => {if (!this.transformed) {
this.transformed = true;
const transformer = Command._transformer.argument[this.name];
if (transformer) {this.args = transformer(this.args);
}
this.stringifyArguments();}
this.resolve = this._convertValue(resolve);
if (this.errorStack) {this.reject = (err) => {reject(optimizeErrorStack(err, this.errorStack, __dirname));
};
} else {this.reject = reject;}
});
this.promise = asCallback(promise, this.callback);
}
参数、返回值非凡解决
如果检索 Command.ts
文件,会发现 Command._transformer.argument
通过 setArgumentTransformer
办法进行设置。
而后再察看代码中有用到 setArgumentTransformer
的是少数几个 hset
命令,以及 mset
命令。
Command.setArgumentTransformer("hmset", function (args) {if (args.length === 2) {if (typeof Map !== "undefined" && args[1] instanceof Map) {return [args[0]].concat(convertMapToArray(args[1]));
}
if (typeof args[1] === "object" && args[1] !== null) {return [args[0]].concat(convertObjectToArray(args[1]));
}
}
return args;
});
如果大家应用过 Redis
的 hash set
操作,应该都会晓得,操作多个键值的形式是通过追加参数实现的:
> HMSET key field value [field value ...]
这样在 JS 中应用也须要将一个数组传递进去,由用户本人保护数组的 key value,这样一个程序的操作形式,必然是没有写 JS 习惯的 Object
传参要难受的,所以 ioredis
提供一个参数转换的逻辑,用来将 Object
转换为一维数组:
export function convertObjectToArray(obj) {const result = [];
const keys = Object.keys(obj);
for (let i = 0, l = keys.length; i < l; i++) {result.push(keys[i], obj[keys[i]]);
}
return result;
}
export function convertMapToArray<K, V>(map: Map<K, V>): Array<K | V> {const result = [];
let pos = 0;
map.forEach(function (value, key) {result[pos] = key;
result[pos + 1] = value;
pos += 2;
});
return result;
}
如果认真看 Command._transformer
会发现还有一个 reply
属性值,这里的逻辑次要在 _convertValue
中有所体现,大抵就是在接管到返回值当前,会先调用咱们传入的自定义函数用来解决返回值。
目前翻代码用到的惟一一处是 hgetall
的解决逻辑,hmget
与 hgetall
在 Redis
中都是返回一个数组的数据,而 ioredis
将数组依照 kv 的格局拼接为一个 Object
不便用户操作。
Command.setReplyTransformer("hgetall", function (result) {if (Array.isArray(result)) {const obj = {};
for (let i = 0; i < result.length; i += 2) {obj[result[i]] = result[i + 1];
}
return obj;
}
return result;
});
设置 key
前缀
如果看 Command
实例化的过程中,还会发现有 _iterateKeys
这样的一个函数调用,该函数具备两个作用:
- 提取参数中所有的 key
- 可选的将 key 增加一个前缀(prefix)
函数外部应用了 redis-commands
的两个 API,exists
和 getKeyIndexes
,用来获取参数数组中所有的 key 的下标。
因为这个函数做了两件事,所以在第一次看到构造函数的用法时,再看函数具体的实现,会对最初返回的 this.keys
很纳闷,然而当看到 Command
还提供了一个 getKeys
API 就可能明确是怎么的逻辑了。
如果设置了 keyPrefix
,则会触发 _iterateKeys
用来调整 key 名,并存储到 keys 中用于返回值。
当调用 getKeys
时,如果没有设置 keyPrefix
,则会用默认的空处理函数来执行同样的逻辑,就是获取所有的 key,而后返回进来;如果之前曾经设置过 keyPrefix 那么就会间接返回 this.keys 不再反复执行逻辑。
// 构造函数内逻辑
if (options.keyPrefix) {this._iterateKeys((key) => options.keyPrefix + key);
}
// 另一处调用的地位
public getKeys(): Array<string | Buffer> {return this._iterateKeys();
}
private _iterateKeys(transform: Function = (key) => key
): Array<string | Buffer> {if (typeof this.keys === "undefined") {this.keys = [];
if (commands.exists(this.name)) {const keyIndexes = commands.getKeyIndexes(this.name, this.args);
for (const index of keyIndexes) {this.args[index] = transform(this.args[index]);
this.keys.push(this.args[index] as string | Buffer);
}
}
}
return this.keys;
}
发送命令数据的生成
大家应用 Redis
应该更多的是通过代码中的 Client
调用各种命令来做,偶然会通过 redis-cli 间接命令行操作。
但其实 Redis
应用了一个叫做 RESP
(REdis Serialization Protocol) 的协定来进行传输。
如果本机有 Redis
的话,咱们在本地能够很简略的进行演示。
> echo -e '*1\r\n$4\r\nPING\r\n' | nc 127.0.0.1 6379
+PONG
咱们会失去一个 +PONG
字符串。这样的一个交互其实才是绝大多数 Client
与 Redis Server
交互时所应用的格局。
P.S.
RESP
有提供人类可读的版本进行交互,然而性能绝对要低一些。
举例说明如果咱们要执行一个 set 和一个 get 应该怎么去写这个命令:
# 结尾代表正文
# SET hello world
# 参数个数
*3
# 该行命令值的长度(set 命令)$3
# 命令对应的值(set 命令)SET
# 该行命令值的长度(具体的 key: hello)$5
# 命令对应的值(具体的 key: hello)hello
# 该行命令值的长度(value 的长度)$5
# 命令对应的值(value 本体)world
# GET hello
# 参数个数
*2
# 该行命令值的长度(get 命令)$3
# 命令对应的值(get 命令)GET
# 该行命令值的长度(具体的 key: hello)$5
# 命令对应的值(具体的 key: hello)hello
set
的返回值没什么意外,就是一个 +OK
,而 get
的返回值则有两行,第一行 $5
示意返回值的长度,第二行才是真正的返回值 world
。
所以如果去看 Command
的 toWritable 函数就是实现了这样的逻辑,因为比拟长所以就不贴了:https://github.com/luin/ioredis/blob/master/lib/command.ts#L269
Command
次要实现的就是这些逻辑,咱们在 Commander
的眼帘中能够看到所有命令调用的开端都会执行 this.sendCommand
,具体的调度就是在 Redis
、Redis Cluster
等具体的实现中做的了。所以咱们能够回到 Redis
去看下实现逻辑。
Redis 发送命令
sendCommand
的实现中,会进行 Redis
状态的查看,如果是 wait
或者 end
之类的,会进行对应的解决。
而后咱们会去查看以后是否是一个能够发送命令的状态:
let writable =
this.status === "ready" ||
(!stream &&
this.status === "connect" &&
commands.exists(command.name) &&
commands.hasFlag(command.name, "loading"));
if (!this.stream) {writable = false;} else if (!this.stream.writable) {writable = false;} else if (this.stream._writableState && this.stream._writableState.ended) {writable = false;}
代码还算比拟清晰,这里也要提到一点,咱们在解决 info
命令的问题是,应用 ping
命令来代替 info
,最后就卡在了这里,后续 debug 发现,ping
命令并不具备 loading
这一 flag
个性,所以 ping
命令都被放到了 offlineQueue
中,针对这一状况,咱们将 ping
增加一个额定的判断逻辑,确保 write
的值为真。
接下来如果 write
为真,那么咱们就会应用 stream
也就是前边建设的 socket 连贯来发送咱们实在的命令了,这时候就是调用的 write
并将 Command#toWritable
的返回值作为数据传进去,也就是之前提到的基于 RESP
格局的序列化。
同时会将一些信息放到 commandQueue
中,它和 offlineQueue
都是同一个类型的实例,后边会提到具体的作用。
this.commandQueue.push({
command: command, // Command 实例
stream: stream, // socket client(其实并没有中央会用到它,不晓得为什么要传过来)select: this.condition.select, // 这个也是没有被用到
});
另一个开源的模块,denque: https://www.npmjs.com/package…
如果 write
为假,那么命令就会被放到 offlineQueue
中。
完结逻辑后会把 command.promise
进行返回,咱们在 Command
实例化过程中能够看到,其实是实例化了一个 Promise
对象,并把 resolve
与 reject
做了一次援用,后边在数据返回时会用到。
当咱们命令曾经发送结束后,那么下一步就是等数据返回了,这里就要说到前边在介绍 React
实例化后 connect
所调用的 DataHandler
实例所做的事件了。
DataHandler
DataHandler
是一个比拟另类的类的写法,因为应用时就间接 new
了但并没有接管返回值。
在构造函数中,就做了两件事,一个是实例化了一个 RedisParser
对象,另一个就是监听了 redis.stream.on('data')
事件,也就是咱们在实例化 Redis
时传递过去的 socket client
,在 data
事件触发时调用 RedisParser.execute
来实现解析。RedisParser
是另一个开源模块了,有趣味的小伙伴能够看这里:https://www.npmjs.com/package/redis-parser
目前能够认为在调用 execute
办法后会调用实例化时传入的 return Reply
就能够了,这是一个解析后的 response
,咱们会拿到这个 response
之后会从 commandQueue
中顺次取出之前传入的对象。
取出的形式是依照队列的形式来取的,通过 shift
,每次取出队列中的第一个元素。
而后调用元素中 command
属性的 resolve
办法,也就是咱们在调用各种 Redis 命令时传入的 callback
了。
这里须要补充一些 Redis
相干的常识,咱们从整个逻辑链路能够看到,大抵是这样的:
- 用户执行命令
Redis
实例化Command
并放入队列- 接管到数据响应后解析数据,并获取队列中第一个元素,调用对应的
callback
同工夫可能会有很多 Redis
申请被收回去,然而再接管到数据后并不需要去判断这次响应对应的是哪一个 command
,因为 Redis
自身也是一个单过程的工作模式,命令的解决也会依照接收数据的先后顺序来解决,因为自身 ioredis
用的也是同一个 socket 连贯,所以也不会存在说命令发送到远端的先后顺序会发生变化。
所以咱们就能够很释怀的通过最简略的形式,push
+ shift
来解决数据了。
这也是为什么一些大 key 的操作会导致整个 Redis 服务响应变慢了。(在不做分片之类的解决状况下)
小结
到此为止,一般模式下的 Redis Client
整体逻辑咱们曾经梳理完了,从创立到发送命令到接管返回值。
后边会针对 Redis Cluster
再输入一篇笔记,一起来看一下在 Cluster
模式下又会有什么不一样的解决逻辑。
参考资料
- ioredis
- redis commands
- Node.js | net
- Why Redis is single-threaded and why is Redis so fast!
- Redis is single-threaded, then how does it do concurrent I/O?