共计 8652 个字符,预计需要花费 22 分钟才能阅读完成。
理解一下 Redis 命令处理过程 及 Redis 分布式锁
(1) Redis 实现分布式锁
通过 Redis SET key value NX
能够简略地实现分布式锁。
127.0.0.1:6379> SET key_lock value1 NX | |
OK | |
127.0.0.1:6379> | |
127.0.0.1:6379> SET key_lock value1 NX | |
(nil) | |
127.0.0.1:6379> |
在解锁时能够先判断 key 是否存在,而后对比值是否相等,相等后再删除 key,开释锁。
有没有想过一个问题,Redis SET key value NX
是怎么保障分布式锁的原子性的?
(2) Redis 命令的处理过程
Redis Server 和客户端建设连贯后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令申请。
而对于整个命令解决的过程来说,我认为次要能够分成五个阶段,它们别离对应了 Redis 源码中的不同函数。
- 接管申请,对应 acceptTcpHandler 函数
- 命令读取,对应 readQueryFromClient 函数;
- 命令解析,对应 processInputBuffer 函数;
- 命令执行,对应 processCommand 函数;
- 后果返回,对应 addReply 函数;
(2.1) 接管申请阶段 -acceptTcpHandler()
// file: src/networking.c | |
/* | |
* 接管 tcp 处理器 | |
* | |
* @param *el | |
* @param fd | |
* @param *privdata | |
* @param mask | |
*/ | |
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { | |
// ... | |
// 接管 tcp 申请 | |
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); | |
// ... | |
// 接管通用解决 | |
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); | |
} |
acceptTcpHandler -> anetTcpAccept -> acceptCommonHandler -> createClient -> readQueryFromClient
(2.2) 命令读取阶段 -readQueryFromClient()
readQueryFromClient
函数会从客户端连贯的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在 server.h 文件中定义的,默认值为 16KB。
// file: src/networking.c | |
/** | |
* @param *conn | |
*/ | |
void readQueryFromClient(connection *conn) { | |
// 从连贯的公有数据获取 client | |
// 在创立连贯时把 client 放到了 connection 的 private_data 字段 | |
client *c = connGetPrivateData(conn); | |
// 1024*16 = 16KB | |
readlen = PROTO_IOBUF_LEN; | |
// | |
qblen = sdslen(c->querybuf); | |
// 为查问缓冲区调配空间 | |
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); | |
// 调用 read 从描述符为 fd 的客户端 socket 中读取数据 | |
nread = connRead(c->conn, c->querybuf+qblen, readlen); | |
// 客户端输出缓冲区中有更多的数据,请持续解析它,以防查看是否有要执行的残缺命令。processInputBuffer(c); | |
} |
(2.3) 命令解析 -processInputBuffer()
// file: src/networking.c | |
/* | |
* 这个函数每次都被调用,在客户端构造 'c' 中,有更多的查问缓冲区须要解决,* 因为咱们从 socket 中读取了更多的数据,或者因为客户端被阻塞并稍后从新激活,* 所以可能有待解决的查问缓冲区,曾经 示意要解决的残缺命令。* | |
* @param *c 客户端 | |
*/ | |
void processInputBuffer(client *c) { | |
// 输出缓冲区中有内容时持续解决 | |
while(c->qb_pos < sdslen(c->querybuf)) { | |
// 省略局部代码 | |
// 判断申请类型 | |
if (!c->reqtype) { | |
// 依据客户端输出缓冲区的命令结尾字符判断命令类型 | |
if (c->querybuf[c->qb_pos] == '*') { | |
// 合乎 RESP 协定的命令 | |
c->reqtype = PROTO_REQ_MULTIBULK; | |
} else { | |
// 管道类型命令 | |
c->reqtype = PROTO_REQ_INLINE; | |
} | |
} | |
if (c->reqtype == PROTO_REQ_INLINE) { // 管道类型命令 | |
// 调用 processInlineBuffer 函数解析 | |
if (processInlineBuffer(c) != C_OK) break; | |
// 如果 Gopher 模式并且咱们失去零个或一个参数,则以 Gopher 模式解决申请。// 为防止数据竞争,如果启用 io 线程读取查问,Redis 将不反对 Gopher。if (server.gopher_enabled && !server.io_threads_do_reads && | |
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') || | |
c->argc == 0)) | |
{processGopherRequest(c); | |
resetClient(c); | |
c->flags |= CLIENT_CLOSE_AFTER_REPLY; | |
break; | |
} | |
} else if (c->reqtype == PROTO_REQ_MULTIBULK) { // RESP 协定命令 | |
// 调用 processMultibulkBuffer 函数解析 | |
if (processMultibulkBuffer(c) != C_OK) break; | |
} else {serverPanic("Unknown request type"); | |
} | |
// 批量解决能够看 <= 0 长度。if (c->argc == 0) {resetClient(c); // 重置客户端 | |
} else { | |
// 如果咱们处于 I/O 线程的上下文中,咱们无奈真正执行此处的命令。// 咱们所能做的就是将客户端标记为须要解决命令的客户端。if (c->flags & CLIENT_PENDING_READ) { | |
c->flags |= CLIENT_PENDING_COMMAND; | |
break; | |
} | |
// 执行命令并重置客户端 | |
if (processCommandAndResetClient(c) == C_ERR) { | |
// 如果客户端不再无效,咱们将防止退出此循环并稍后修剪客户端缓冲区。// 所以在这种状况下咱们会尽快返回。return; | |
} | |
} | |
} | |
} |
// file: src/networking.c | |
/* | |
* 此函数调用 processCommand(),但也为客户端执行一些在该上下文中有用的子工作:* 1. 它将以后客户端设置为客户端“c”。* 2. 如果解决了命令,则调用 commandProcessed()。* | |
* 如果客户端因解决命令的副作用而被开释,则该函数返回 C_ERR,否则返回 C_OK。* | |
* @param *c 客户端 | |
*/ | |
int processCommandAndResetClient(client *c) { | |
int deadclient = 0; | |
server.current_client = c; | |
// 解决命令 | |
if (processCommand(c) == C_OK) {commandProcessed(c); | |
} | |
return deadclient ? C_ERR : C_OK; | |
} |
(2.4) 命令执行 -processCommand()
// file: src/server.c | |
/** | |
* 解决各种命令 get set del exits quit lpush sadd 等 | |
* | |
* @param *c | |
*/ | |
int processCommand(client *c) { | |
// 查找命令,并进行命令合法性检查,以及命令参数个数查看 | |
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); | |
// ... 省略其余命令解决逻辑 | |
// 解决命令 | |
if (c->flags & CLIENT_MULTI && | |
c->cmd->proc != execCommand && c->cmd->proc != discardCommand && | |
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) | |
{ // 如果是 MULTI 事务,则入队 | |
queueMultiCommand(c); | |
addReply(c,shared.queued); | |
} else { | |
// 调用 call 间接解决 | |
call(c,CMD_CALL_FULL); | |
c->woff = server.master_repl_offset; | |
if (listLength(server.ready_keys)) | |
handleClientsBlockedOnKeys();} | |
return C_OK; | |
} |
(2.4.1) 查找对应命令 -lookupCommand
/* | |
* 查找命令 | |
*/ | |
struct redisCommand *lookupCommand(sds name) { | |
// | |
return dictFetchValue(server.commands, name); | |
} |
server.commands
对应的 redisCommandTable
如下
struct redisCommand redisCommandTable[] = { | |
{"get",getCommand,2, | |
"read-only fast @string", | |
0,NULL,1,1,1,0,0,0}, | |
/* Note that we can't flag set as fast, since it may perform an | |
* implicit DEL of a large key. */ | |
{"set",setCommand,-3, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
// 省略局部内容 | |
} |
server.commands
是在 populateCommandTable
函数里赋值的
/* Populates the Redis Command Table starting from the hard coded list | |
* we have on top of server.c file. */ | |
void populateCommandTable(void) { | |
int j; | |
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); | |
for (j = 0; j < numcommands; j++) { | |
struct redisCommand *c = redisCommandTable+j; | |
int retval1, retval2; | |
/* Translate the command string flags description into an actual | |
* set of flags. */ | |
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR) | |
serverPanic("Unsupported command flag"); | |
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */ | |
retval1 = dictAdd(server.commands, sdsnew(c->name), c); | |
/* Populate an additional dictionary that will be unaffected | |
* by rename-command statements in redis.conf. */ | |
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); | |
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); | |
} | |
} |
(2.4.2) 执行命令 -call
/* | |
* call() 是 Redis 执行命令的外围。* | |
* @param *c | |
* @param flags | |
*/ | |
void call(client *c, int flags) { | |
// 要执行的 redis 命令 | |
struct redisCommand *real_cmd = c->cmd; | |
// 调用命令处理函数 | |
c->cmd->proc(c); | |
} |
// file: src/t_string.c | |
/** | |
* | |
* @param *c | |
* @param flags | |
* @param *key | |
* @param *val | |
* @param *expire | |
* @param unit | |
* @param *ok_reply | |
* @param *abort_reply | |
*/ | |
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { | |
// 64 位精度整数 | |
long long milliseconds = 0; /* initialized to avoid any harmness warning */ | |
if (expire) {if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) | |
return; | |
if (milliseconds <= 0) {addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); | |
return; | |
} | |
if (unit == UNIT_SECONDS) milliseconds *= 1000; | |
} | |
// 如果有 NX 选项,那么查找 key 是否曾经存在 | |
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || | |
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) | |
{ | |
// 如果已存在,返回空值 | |
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); | |
return; | |
} | |
// | |
genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1); | |
server.dirty++; | |
// 设置过期 | |
if (expire) setExpire(c,c->db,key,mstime()+milliseconds); | |
// 公布 key 事件 | |
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); | |
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, | |
"expire",key,c->db->id); | |
addReply(c, ok_reply ? ok_reply : shared.ok); | |
} |
(2.5) 后果返回 -addReply()
// file: src/networking.c | |
/* ----------------------------------------------------------------------------- | |
* 更高级别的函数用于在客户端输入缓冲区上对数据进行排队。* 以下函数是命令实现将调用的函数。* -------------------------------------------------------------------------- */ | |
/* | |
* 将对象“obj”字符串示意增加到客户端输入缓冲区。* | |
* @param *c redis client | |
* @param *obj 命令执行的后果 类型是 redisObject | |
*/ | |
void addReply(client *c, robj *obj) {// 判断 client 是否能够接管新数据 (假客户端不能接管) | |
if (prepareClientToWrite(c) != C_OK) return; | |
// 依据 redisobject 格局把数据写入缓存 | |
if (sdsEncodedObject(obj)) { // obj 如果是 row 或者 embstr 格局 | |
// 尝试将应答增加到客户端构造中的动态缓冲区。if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) | |
// 将回复增加到回复列表中。_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); | |
} else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格局 | |
// 对于整数编码字符串,咱们只需应用优化函数将其转换为字符串,并将后果字符串附加到输入缓冲区。char buf[32]; | |
// 数字转为字符串 | |
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); | |
if (_addReplyToBuffer(c,buf,len) != C_OK) | |
_addReplyProtoToList(c,buf,len); | |
} else {serverPanic("Wrong obj->encoding in addReply()"); | |
} | |
} |
(3) IO 多路复用对命令原子性的影响
IO 多路复用机制是在 readQueryFromClient 函数执行前发挥作用的。它理论是在事件驱动框架中调用 aeApiPoll 函数,获取一批曾经就绪的 socket 描述符。而后执行一个循环,针对每个就绪描述符上的读事件,触发执行 readQueryFromClient 函数。
在理论解决时,Redis 的主线程依然是针对每个事件逐个调用回调函数进行解决的。而且对于写事件来说,IO 多路复用机制也是针对每个事件逐个解决的。
/* | |
* 处理事件 返回解决完的事件个数 | |
* | |
* 0 不做任何解决 | |
* 1 AE_FILE_EVENTS 解决文件事件 | |
* 2 AE_TIME_EVENTS 解决工夫事件 | |
* 3 AE_ALL_EVENTS 所有事件 | |
* 4 AE_DONT_WAIT | |
* 8 AE_CALL_BEFORE_SLEEP | |
* 16 AE_CALL_AFTER_SLEEP | |
*/ | |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) | |
{ | |
int processed = 0, numevents; | |
struct timeval tv, *tvp; | |
// 如果 eventLoop 解决前的函数不为空,就执行 | |
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) | |
eventLoop->beforesleep(eventLoop); | |
// 调用多路复用 API,仅在超时或某些事件触发时返回 | |
// 解决文件事件,阻塞工夫由 tvp 决定 | |
numevents = aeApiPoll(eventLoop, tvp); | |
// 解决后的函数不为空 | |
/* After sleep callback. */ | |
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) | |
eventLoop->aftersleep(eventLoop); | |
for (j = 0; j < numevents; j++) {// 先从 eventLoop->fired[j]获取已就绪事件构造体(aeFiredEvent) 获取 fd 后 再从 eventLoop->events 注册事件里获取对应的事件构造体(aeFileEvent) | |
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; | |
// ... | |
// 如果可读 | |
if (!invert && fe->mask & mask & AE_READABLE) { | |
// 调用读事件回调函数 对应 acceptTcpHandler | |
fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ | |
} | |
// 如果可写 触发写事件 | |
if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) { | |
// 调用写事件回调函数 对应 acceptTcpHandler | |
fe->wfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
processed++; | |
} | |
return processed; /* return the number of processed file/time events */ | |
} |
即便应用了 IO 多路复用机制,命令的整个处理过程依然能够由 IO 主线程来实现,也依然能够保障命令执行的原子性。
参考资料
RedisIO 模型 https://weikeqin.com/2022/01/…
Redis 源码分析与实战 学习笔记 Day14 14 | 从代码实现看分布式锁的原子性保障
https://time.geekbang.org/col…