理解一下Redis命令处理过程 及 Redis分布式锁

(1) Redis实现分布式锁

通过Redis SET key value NX 能够简略地实现分布式锁。

127.0.0.1:6379>  SET key_lock value1 NXOK127.0.0.1:6379>127.0.0.1:6379>  SET key_lock value1 NX(nil)127.0.0.1:6379>

在解锁时能够先判断key是否存在,而后对比值是否相等,相等后再删除key,开释锁。

有没有想过一个问题,Redis SET key value NX 是怎么保障分布式锁的原子性的?


(2) Redis命令的处理过程

Redis Server 和客户端建设连贯后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令申请。
而对于整个命令解决的过程来说,我认为次要能够分成五个阶段,它们别离对应了 Redis 源码中的不同函数。

  1. 接管申请,对应 acceptTcpHandler 函数
  2. 命令读取,对应 readQueryFromClient 函数;
  3. 命令解析,对应 processInputBuffer 函数;
  4. 命令执行,对应 processCommand 函数;
  5. 后果返回,对应 addReply 函数;

(2.1) 接管申请阶段-acceptTcpHandler()

// file: src/networking.c /* * 接管tcp处理器 *  * @param *el * @param fd * @param *privdata * @param mask */void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {        // ...         // 接管tcp申请        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);        // ...         // 接管通用解决        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
  acceptTcpHandler -> anetTcpAccept -> acceptCommonHandler -> createClient -> readQueryFromClient

(2.2) 命令读取阶段-readQueryFromClient()

readQueryFromClient 函数会从客户端连贯的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

// file: src/networking.c/** * @param *conn */ void readQueryFromClient(connection *conn) {    // 从连贯的公有数据获取client      // 在创立连贯时把client放到了connection的private_data字段    client *c = connGetPrivateData(conn);    // 1024*16 = 16KB    readlen = PROTO_IOBUF_LEN;    //     qblen = sdslen(c->querybuf);    // 为查问缓冲区调配空间    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);    // 调用read从描述符为fd的客户端socket中读取数据    nread = connRead(c->conn, c->querybuf+qblen, readlen);    // 客户端输出缓冲区中有更多的数据,请持续解析它,以防查看是否有要执行的残缺命令。     processInputBuffer(c);    }


(2.3) 命令解析-processInputBuffer()

// file: src/networking.c/*  * 这个函数每次都被调用,在客户端构造'c'中,有更多的查问缓冲区须要解决, * 因为咱们从socket中读取了更多的数据,或者因为客户端被阻塞并稍后从新激活, * 所以可能有待解决的查问缓冲区,曾经 示意要解决的残缺命令。 *  * @param *c 客户端  */void processInputBuffer(client *c) {    // 输出缓冲区中有内容时持续解决     while(c->qb_pos < sdslen(c->querybuf)) {                // 省略局部代码        // 判断申请类型        if (!c->reqtype) {            // 依据客户端输出缓冲区的命令结尾字符判断命令类型            if (c->querybuf[c->qb_pos] == '*') {                // 合乎RESP协定的命令                c->reqtype = PROTO_REQ_MULTIBULK;            } else {                // 管道类型命令                c->reqtype = PROTO_REQ_INLINE;            }        }        if (c->reqtype == PROTO_REQ_INLINE) {  // 管道类型命令            // 调用processInlineBuffer函数解析            if (processInlineBuffer(c) != C_OK) break;            // 如果 Gopher 模式并且咱们失去零个或一个参数,则以 Gopher 模式解决申请。             // 为防止数据竞争,如果启用 io 线程读取查问,Redis 将不反对 Gopher。             if (server.gopher_enabled && !server.io_threads_do_reads &&                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||                  c->argc == 0))            {                processGopherRequest(c);                resetClient(c);                c->flags |= CLIENT_CLOSE_AFTER_REPLY;                break;            }        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {  // RESP协定命令            // 调用processMultibulkBuffer函数解析            if (processMultibulkBuffer(c) != C_OK) break;        } else {            serverPanic("Unknown request type");        }        // 批量解决能够看 <= 0 长度。        if (c->argc == 0) {            resetClient(c); // 重置客户端        } else {            // 如果咱们处于 I/O 线程的上下文中,咱们无奈真正执行此处的命令。             // 咱们所能做的就是将客户端标记为须要解决命令的客户端。             if (c->flags & CLIENT_PENDING_READ) {                c->flags |= CLIENT_PENDING_COMMAND;                break;            }            // 执行命令并重置客户端             if (processCommandAndResetClient(c) == C_ERR) {                // 如果客户端不再无效,咱们将防止退出此循环并稍后修剪客户端缓冲区。                 // 所以在这种状况下咱们会尽快返回。                 return;            }        }    }}    
// file: src/networking.c/*  * 此函数调用 processCommand(),但也为客户端执行一些在该上下文中有用的子工作: *   1. 它将以后客户端设置为客户端“c”。 *   2. 如果解决了命令,则调用 commandProcessed()。 * * 如果客户端因解决命令的副作用而被开释,则该函数返回 C_ERR,否则返回 C_OK。 * * @param *c 客户端  */int processCommandAndResetClient(client *c) {    int deadclient = 0;    server.current_client = c;    // 解决命令    if (processCommand(c) == C_OK) {        commandProcessed(c);    }        return deadclient ? C_ERR : C_OK;}


(2.4) 命令执行-processCommand()

// file: src/server.c/** * 解决各种命令 get set del exits quit lpush sadd  等 * * @param *c   */int processCommand(client *c) {    // 查找命令,并进行命令合法性检查,以及命令参数个数查看    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);    // ... 省略其余命令解决逻辑     // 解决命令    if (c->flags & CLIENT_MULTI &&        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)    {   // 如果是 MULTI 事务,则入队        queueMultiCommand(c);        addReply(c,shared.queued);    } else {         // 调用 call 间接解决        call(c,CMD_CALL_FULL);        c->woff = server.master_repl_offset;        if (listLength(server.ready_keys))            handleClientsBlockedOnKeys();    }    return C_OK;}

(2.4.1) 查找对应命令-lookupCommand

/* * 查找命令 */struct redisCommand *lookupCommand(sds name) {    //     return dictFetchValue(server.commands, name);}

server.commands对应的redisCommandTable如下

struct redisCommand redisCommandTable[] = {    {"get",getCommand,2,     "read-only fast @string",     0,NULL,1,1,1,0,0,0},    /* Note that we can't flag set as fast, since it may perform an     * implicit DEL of a large key. */    {"set",setCommand,-3,     "write use-memory @string",     0,NULL,1,1,1,0,0,0},     // 省略局部内容}

server.commands是在populateCommandTable函数里赋值的

/* Populates the Redis Command Table starting from the hard coded list * we have on top of server.c file. */void populateCommandTable(void) {    int j;    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);    for (j = 0; j < numcommands; j++) {        struct redisCommand *c = redisCommandTable+j;        int retval1, retval2;        /* Translate the command string flags description into an actual         * set of flags. */        if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)            serverPanic("Unsupported command flag");        c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */        retval1 = dictAdd(server.commands, sdsnew(c->name), c);        /* Populate an additional dictionary that will be unaffected         * by rename-command statements in redis.conf. */        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);    }}

(2.4.2) 执行命令-call

/* * call() 是 Redis 执行命令的外围。 * * @param *c * @param flags  */void call(client *c, int flags) {    // 要执行的redis命令    struct redisCommand *real_cmd = c->cmd;    // 调用命令处理函数       c->cmd->proc(c);}
// file: src/t_string.c/** *  * @param *c * @param flags * @param *key * @param *val * @param *expire * @param unit * @param *ok_reply * @param *abort_reply */void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {    // 64位精度整数     long long milliseconds = 0; /* initialized to avoid any harmness warning */    if (expire) {        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)            return;        if (milliseconds <= 0) {            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);            return;        }        if (unit == UNIT_SECONDS) milliseconds *= 1000;    }    // 如果有NX选项,那么查找key是否曾经存在    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))    {        // 如果已存在,返回空值        addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);        return;    }    //     genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);    server.dirty++;    // 设置过期    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);    // 公布key事件    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,        "expire",key,c->db->id);    addReply(c, ok_reply ? ok_reply : shared.ok);}

(2.5) 后果返回-addReply()

// file: src/networking.c /* ----------------------------------------------------------------------------- * 更高级别的函数用于在客户端输入缓冲区上对数据进行排队。 * 以下函数是命令实现将调用的函数。 * -------------------------------------------------------------------------- *//*  * 将对象“obj”字符串示意增加到客户端输入缓冲区。  *  * @param *c  redis client   * @param *obj  命令执行的后果   类型是redisObject */void addReply(client *c, robj *obj) {    // 判断client是否能够接管新数据 (假客户端不能接管)    if (prepareClientToWrite(c) != C_OK) return;    // 依据redisobject格局把数据写入缓存    if (sdsEncodedObject(obj)) { // obj如果是row或者embstr格局        // 尝试将应答增加到客户端构造中的动态缓冲区。        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)            // 将回复增加到回复列表中。            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));      } else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格局        // 对于整数编码字符串,咱们只需应用优化函数将其转换为字符串,并将后果字符串附加到输入缓冲区。        char buf[32];        // 数字转为字符串        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);        if (_addReplyToBuffer(c,buf,len) != C_OK)            _addReplyProtoToList(c,buf,len);    } else {        serverPanic("Wrong obj->encoding in addReply()");    }}

(3) IO多路复用对命令原子性的影响

IO 多路复用机制是在 readQueryFromClient 函数执行前发挥作用的。它理论是在事件驱动框架中调用 aeApiPoll 函数,获取一批曾经就绪的 socket 描述符。而后执行一个循环,针对每个就绪描述符上的读事件,触发执行 readQueryFromClient 函数。

在理论解决时,Redis 的主线程依然是针对每个事件逐个调用回调函数进行解决的。而且对于写事件来说,IO 多路复用机制也是针对每个事件逐个解决的。

/* * 处理事件   返回解决完的事件个数 * * 0 不做任何解决 * 1 AE_FILE_EVENTS  解决文件事件 * 2 AE_TIME_EVENTS  解决工夫事件  * 3 AE_ALL_EVENTS   所有事件  * 4 AE_DONT_WAIT  * 8 AE_CALL_BEFORE_SLEEP  * 16 AE_CALL_AFTER_SLEEP  */int aeProcessEvents(aeEventLoop *eventLoop, int flags){    int processed = 0, numevents;        struct timeval tv, *tvp;        // 如果eventLoop解决前的函数不为空,就执行        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)            eventLoop->beforesleep(eventLoop);        // 调用多路复用 API,仅在超时或某些事件触发时返回           // 解决文件事件,阻塞工夫由tvp决定         numevents = aeApiPoll(eventLoop, tvp);        // 解决后的函数不为空        /* After sleep callback. */        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)            eventLoop->aftersleep(eventLoop);        for (j = 0; j < numevents; j++) {            // 先从eventLoop->fired[j]获取已就绪事件构造体(aeFiredEvent) 获取fd后 再从eventLoop->events注册事件里获取对应的事件构造体(aeFileEvent)             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];                        // ...            // 如果可读            if (!invert && fe->mask & mask & AE_READABLE) {                // 调用读事件回调函数 对应 acceptTcpHandler                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);                fired++;                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */            }            // 如果可写 触发写事件            if (fe->mask & mask & AE_WRITABLE) {                if (!fired || fe->wfileProc != fe->rfileProc) {                    // 调用写事件回调函数 对应 acceptTcpHandler                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);                    fired++;                }            }            processed++;        }        return processed; /* return the number of processed file/time events */}

即便应用了 IO 多路复用机制,命令的整个处理过程依然能够由 IO 主线程来实现,也依然能够保障命令执行的原子性。

参考资料

RedisIO模型 https://weikeqin.com/2022/01/...

Redis源码分析与实战 学习笔记 Day14 14 | 从代码实现看分布式锁的原子性保障
https://time.geekbang.org/col...