浅析redis命令执行的生命周期

21次阅读

共计 11314 个字符,预计需要花费 29 分钟才能阅读完成。

baiyan

引入

首先看一张我们非常熟悉的 redis 命令执行图:

那么思考这样一个问题,当我们连接了 redis 服务端之后,然后输入并执行某条 redis 命令:如 set key1 value1。这条命令究竟是如何被发送到 redis 服务端的,redis 服务端又是如何解析,并作出相应处理,并返回执行成功的呢?

客户端到服务端的命令传输(请求)

redis 在 TCP 协议基础之上,封装了自己的一套协议规范,方便服务端与客户端去接收与解析数据,划清命令参数之间的边界,方便最终对以 TCP 字节流传输的数据进行处理。下面我们使用 tcpdump 来捕获 redis-cli 发送命令时的数据包:

tcpdump port 6379 -i lo -X

这时,我们在客户端中输入 set key1 value1 命令。在 tcpdump 中捕获的数据包如下:

第一个是客户端发送命令到 redis 服务端时的数据包,而第二个是 redis 服务端响应给客户端的数据包。我们首先只看第一个数据包,它从客户端 43856 端口发送到 redis 服务端的 6379 端口。首先前 20 个字节是 IP 头部,后 32 字节是 TCP 头部(由于 TCP 头部后面存在可选项)。
我们主要关注从“2a33”开始的数据信息,从这里开始就是 redis 具体的数据格式了。从右边对数据的一个 ASCII 码翻译也可以看到 set、key1、value1 的字样,中间还有一些用. 表示的字符,那么这里,我们根据抓包结果分析一下 redis 数据传输的协议格式。

  • 2a33:0x2a 是字符 ”*” 的 ASCII 码值,0x33 是 ”3″ 的 ASCII 码值(十进制值是 51)
  • 0d0a:0d 是 ”r” 的 ASCII 码值,0a 是 ”n” 的 ASCII 码值
  • 7365:是 ”s” 和 ”e” 的 ASCII 码值
  • 740d:是 ”t” 和 ”r” 的 ASCII 码值
  • 0a24:是 ”n” 和 ”$” 的 ASCII 码值
  • 340d:是 ”4″ 和 ”r” 的 ASCII 码值
  • 0a6b:是 ”n” 和 ”k” 的 ASCII 码值
  • 6579:是 ”e” 和 ”y” 的 ASCII 码值
  • 310d:是 ”1″ 和 ”r” 的 ASCII 码值
  • 0a24:是 ”n” 和 ”$” 的 ASCII 码值
  • 360d:是”6″ 和 ”r” 的 ASCII 码值
  • 0a76:是 ”n” 和 ”v” 的 ASCII 码值
  • 616c:是 ”a” 和 ”l” 的 ASCII 码值
  • 7565:是 ”u” 和 ”e” 的 ASCII 码值
  • 310d:是 ”1″ 和 ”r” 的 ASCII 码值
  • 0a:是 ”n” 的 ASCII 码值

看到这里,我们是否能够发现以下规律:

  • redis 以 ”*” 作为标志,表示命令的开始。在 * 后面紧跟的数字代表参数的个数(set key1 value1 有 3 个参数所以为 3)
  • redis 以 ”$” 作为命令参数的开始,后面紧跟的数字代表参数的长度(如 key1 的长度为 4 所以为 $4)
  • redis 以 ”rn” 作为参数之间的分隔符,方便解析 TCP 字节流数据时定位边界位置

综合来看,客户端向服务端发送的 redis 数据包格式如下:

*3 \r\n set \r\n $4 \r\n key1 \r\n $6 \r\n value1 \r\n

相比 FastCGI 协议,redis 仅仅使用几个分隔符和特殊字符,就完成了对命令的传输语法及数据格式的规范化,同时服务端通过其中定义好的分隔符,也能够方便高效地从字节流数据中解析并读取出正确的数据。这种通信协议简单高效,能够满足 redis 对高性能的要求。

服务端对命令的处理

既然命令已经通过 redis 数据传输协议安全地送达到了服务端,那么,服务端就要开始对传输过来的字节流数据进行处理啦。由于我们在协议中清晰地定义了每个参数的边界(\r\n),所以,redis 服务端解析起来也非常轻松。

第一步:回调函数的使用

redis 是典型事件驱动程序。为了提高单进程的 redis 的性能,redis 采用 IO 多路复用技术来处理客户端的命令请求。redis 会在创建客户端实例的时,指定服务端接收到客户端命令请求的事件时,所要执行的事件处理函数:

client *createClient(int fd) {client *c = zmalloc(sizeof(client));

    if (fd != -1) {anetNonBlock(NULL,fd); // 设置非阻塞
        anetEnableTcpNoDelay(NULL,fd); // 设置不采用 Nagle 算法,避免半包与粘包现象
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive); // 设置 keep-alive
        // 注意这里创建了一个文件事件。当客户端读事件就绪的时候,回调 readQueryFromClient() 函数
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) {close(fd);
            zfree(c);
            return NULL;
        }
    }
    ... 
}

为了暂存客户端请求到服务端的字节流数据,redis 封装了一个接收缓冲区,来缓存从套接字中读取的数据。后续的命令处理流程从缓冲区中读取命令数据并处理即可。缓冲区的好处在于不用一直维持读写套接字。在后续的流程中,我们只需要从缓冲区中读取数据,而不是仍从套接字中读取。这样就可以提前释放套接字,节省资源。缓冲区的建立与使用就是在之前讲过的客户端回调函数 readQueryFromClient() 中完成的:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
   ...
    qblen = sdslen(c->querybuf); // 获取缓冲区长度
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; 
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 创建一个 sds 结构作为缓冲区
    nread = read(fd, c->querybuf+qblen, readlen); // 从套接字中读取数据到缓冲区中暂存
    ...
    // 真正地处理命令
    processInputBufferAndReplicate(c);
}

第二步:分发器的使用

这段代码创建并往缓冲区中写入了字节流数据,然后调用 processInputBufferAndReplicate() 去真正地处理命令。processInputBufferAndReplicate() 函数中只是简单的调用了 ==processInputBuffer() 函数。由于我们之前的缓冲区中已经有了客户端发给服务端的字节流数据,所以我们需要在这一层进行数据初步的筛选与处理:

void processInputBuffer(client *c) {
    // 如果缓冲区还没有处理完,继续循环处理
    while(c->qb_pos < sdslen(c->querybuf)) {
         ...
        // 对字节流数据进行定制化分发处理
        if (c->reqtype == PROTO_REQ_INLINE) { // 如果是 INLINE 类型的请求
            if (processInlineBuffer(c) != C_OK) break;  // 调用 processInlineBuffer 解析缓冲区数据
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {// 如果是 MULTIBULK 类型的请求
            if (processMultibulkBuffer(c) != C_OK) break; // 调用 processMultibulkBuffer 解析缓冲区数据
        } else {serverPanic("Unknown request type");
        }

       // 开始处理具体的命令
        if (c->argc == 0) { // 命令参数为 0 个,非法
            resetClient(c);
        } else { // 命令参数不为 0,合法
            // 调用 processCommand() 真正处理命令
            if (processCommand(c) == C_OK) { //
                ...
            }
        }
    }
}

看到这里,读者可能会有些疑惑。什么是 INLINE、什么是 MULTIBULK?在 redis 中,有两种请求命令类型:

  • INLINE 类型:简单字符串格式,如 ping 命令
  • MULTIBULK 类型:字符串数组格式。如 set、get 等等大部分命令都是这种类型

这个函数其实就是一个分发器。由于底层的字节流数据是无规则的,所以我们需要根据客户端的 reqtype 字段,去区分请求字节流数据属于那种请求类型,进而分发到对应的函数中进行处理。由于我们经常执行的命令都是 MULTIBULK 类型,我们也以 MULTIBULK 类型为例。对于 set、get 这种 MULTIBULK 请求类型,会被分发到 processMultibulkBuffer() 函数中进行处理。

第三步:检查接收缓冲区的数据完整性

在开启 TCP 的 Nagle 算法时,TCP 会将多个 redis 命令请求的数据包合并或者拆分发送。这样就会出现在一个数据包中命令不完整、或者一个数据包中包含多个命令的情况。为了解决这个问题,processMultibulkBuffer() 函数保证,当只有在缓冲区中包含一个完整请求时,这个函数才会成功解析完字节流中的命令参数,并返回成功状态码。否则,会 break 出外部的 while 循环,等待下一次事件循环再从套接字中读取剩余的数据,再进行对命令的解析。这样就保证了 redis 协议中的数据的完整性,也保证了实际命令参数的完整性。

int processMultibulkBuffer(client *c) {while(c->multibulklen) {
        ...
        /* 读取命令参数字节流 */
        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { // 如果 $ 后面代表参数长度的数字与实际命令长度不匹配(+ 2 的位置是 \r\n),说明数据不完整,直接跳出循环,等待下一次读取剩余数据
            break;
        } else { // 命令完整,进行一些执行命令之前的初始化工作
            if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); 
                sdsIncrLen(c->querybuf,-2); 
                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                sdsclear(c->querybuf);
            } else {c->argv[c->argc++] =
                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                c->qb_pos += c->bulklen+2;
            }
            c->bulklen = -1;
            c->multibulklen--; // 处理下一个命令参数
        }
    }
}

第四步:真正地处理命令

我们回到外层。当我们成功执行 processMultibulkBuffer() 函数之后,说明当前命令已经完整,可以对命令进行处理了。我们想一下,加入要我们去设计根据不同的命令,调用不同的处理函数,从而完成不同的功能,我们应该怎么做呢?想了想,我们可以简单写出以下代码:

if (command == "get") {doGetCommand(); //get 命令处理函数
} else if (command == "set") {doSetCommand(); //set 命令处理函数
} else {printf("非法命令")
}

以上代码非常简单,只是根据我们得到的不同命令请求,分发到不同的命令处理函数中进行定制化处理。那么 redis 其实也是同样的道理,那究竟 redis 是怎么做的呢:

int processCommand(client *c) {
    // 如果是退出命令直接返回
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    // 去字典里查找命令,并把要执行的命令处理函数赋值到 c 结构体中的 cmd 字段
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // 返回值校验
    if (!c->cmd) { // 没有找到该命令
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`,", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || // 命令参数不匹配
               (c->argc < -c->cmd->arity)) {flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for'%s'command",
            c->cmd->name);
        return C_OK;
    }
   // 真正执行命令
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {queueMultiCommand(c);
        addReply(c,shared.queued);
    } else { // 真正执行命令
        call(c,CMD_CALL_FULL); // 核心函数
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();}
    return C_OK;
}

在这个函数中,最重要的就是 lookupCommand() 函数和 call() 函数的调用了。在 redis 中,所有命令都存储在一个字典中,这个字典长这样:

struct redisCommand redisCommandTable[] = {{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
    {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
    {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
    ...
};

我们可以看到,这个字典是所有命令的集合,我们调用 lookupCommand 就是从这里获取命令及命令的相关信息的。它是一个结构体数组,包含所有命令名称、命令处理函数、参数个数、以及种种标记。其实这里就相当于一个配置信息的维护,以及命令道处理函数名称的映射关系,从而很好的解决了我们一开始使用 if-else 来分发命令处理函数的难以维护、可扩展性差的问题。
在我们成功在字典中找到一个命令的处理函数之后,我们只需要去调用相应的命令处理函数就好啦。上面最后的 call() 函数中就对相应的命令处理函数进行了调用,并返回调用结果给客户端。比如,setCommand() 就是 set 命令的实际处理函数:

void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; j < c->argc; j++) {char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {flags |= OBJ_SET_NX;} else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {flags |= OBJ_SET_XX;} else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {addReply(c,shared.syntaxerr);
            return;
        }
    }

    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

这个函数首先对 NX、EX 参数进行了判断及处理,最终调用了 setGenericCommand(),来执行 set 命令的通用逻辑部分:

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    setKey(c->db,key,val);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

最终会调用 addReply() 通用返回函数,应该是要把执行结果返回给客户端了。我们看看该函数里面做了些什么:

void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {serverPanic("Wrong obj->encoding in addReply()");
    }
}

我们仔细阅读这段代码,好像并没有找到执行结果是什么时候返回给客户端的。在这个函数中,只是将返回结果添加到了输出缓冲区中,一个命令就执行完了。那么究竟是什么时候返回的呢?是否还记得在介绍开启事件循环时,提到函数 beforesleep() 会在每次事件循环阻塞等待文件事件之前执行,主要执行一些不是很费时的操作,比如过期键删除操作,向客户端返回命令回复等。这样,就可以减节省返回执行结果时的网络通信开销,将同一个客户端上的多个命令的多次返回,对多个命令做一个缓存,最终一次性统一返回,减少了返回的次数,提高了性能。

客户端到服务端的命令传输(响应)

执行完 set key1 value1 命令之后,我们得到了一个 ”OK” 的返回,代表命令执行成功。其实我们仔细观察上面返回的第二个数据包,其实底层是一个 ”+OK” 的返回值。那么为什么要有一个 + 号呢?因为除了我们上面讲过的 set 命令,还有 get 命令、lpush 命令等等,他们的返回值都是不一样的。get 会返回数据集合、lpush 会返回一个整数,代表列表的长度等等。一个字符串的表示是远远不能满足需要的。所以在 redis 通信协议中,一共定义了五种返回值结构。客户端通过每种返回结构的第一个字符,来判断是哪种类型的返回值:

  • 状态回复:第一个字符是“+”;例如,SET 命令执行完毕会向客户端返回“+OK\r\n”。
  • 错误回复:第一个字符是“-”;例如,当客户端请求命令不存在时,会向客户端返回“-ERR unknown command ‘testcmd’”。
  • 整数回复:第一个字符是“:”;例如,INCR 命令执行完毕向客户端返回“:100\r\n”。
  • 批量回复:第一个字符是“$”;例如,GET 命令查找键向客户端返回结果“$5\r\nhello\r\n”,其中 $5 表示返回字符串长度。
  • 多条批量回复:第一个字符是“”;例如,LRANGE 命令可能会返回多个多个值,格式为“3\r\n$6\r\nvalue1\r\n$6rnvalue2rn$6\r\nvalue3\r\n”,与命令请求协议格式相同,“\*3”表示返回值数目,“$6”表示当前返回值字符串长度,多个返回值用“\r\n”分隔开。

我们执行 set 命令就是第一种类型,即状态回复。客户端通过 + 号,就能够知道这是状态回复,从而就知道该如何读取后面的字节流内容了。

总结

至此,我们就走完了一个 redis 命令的完整生命周期,同时也了解了 redis 通信协议的格式与规范。接下来,我将会深入每一个命令的实现,大家加油。

参考资料

  • 【Redis 源码分析】Redis 命令处理生命周期

正文完
 0