【Redis源码分析】Redis命令处理生命周期

50次阅读

共计 34978 个字符,预计需要花费 88 分钟才能阅读完成。

运营研发团队 李乐
前言
本文主要讲解服务器处理客户端命令请求的整个流程,包括服务器启动监听,接收命令请求并解析,执行命令请求,返回命令回复等,这也是本文的主题“命令处理的生命周期”。Redis 服务器作为典型的事件驱动程序,事件处理显得尤为重要,而 Redis 将事件分为两大类:文件事件与时间事件。文件事件即 socket 的可读可写事件,时间事件用于处理一些需要周期性执行的定时任务,本文将对这两种事件作详细介绍。
基本知识
为了更好的理解服务器与客户端的交互,还需要学习一些基础知识,比如客户端信息的存储,Redis 对外支持的命令集合,客户端与服务器 socket 读写事件的处理,Redis 内部定时任务的执行等,本小节将对这些知识作简要介绍。
1.1 对象结构体 robj 简介
Redis 是一个 Key-Value 数据库,key 只能是字符串,value 可能是字符串、哈希表、列表、集合和有序集合,这 5 种数据类型用结构体 robj 表示,我们称之为 redis 对象。结构体 robj 的 type 字段表示对象类型,5 种对象类型在 server.h 文件定义:
#define OBJ_STRING 0
#define OBJ_LIST 1
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4
针对某一种类型的对象,redis 在不同情况下可能采用不同的数据结构存储,结构体 robj 的的 encoding 字段表示当前对象底层存储采用的数据结构,即对象的编码,总共定义了 10 种 encoding 常量,如下表 - 1 所示:表 -1 对象编码类型表

encoding 常量
数据结构
可存储对象类型

OBJ_ENCODING_RAW
简单动态字符串 sds
字符串

OBJ_ENCODING_INT
整数
字符串

OBJ_ENCODING_HT
字典 dict
集合、哈希表、有序集合

OBJ_ENCODING_ZIPMAP
未使用

OBJ_ENCODING_LINKEDLIST
不再使用

OBJ_ENCODING_ZIPLIST
压缩列表 ziplist
哈希表、有序集合

BJ_ENCODING_INTSET
整数集合 intset
集合

OBJ_ENCODING_SKIPLIST
跳跃表 skiplist
有序集合

OBJ_ENCODING_EMBSTR
简单动态字符串 sds
字符串

OBJ_ENCODING_QUICKLIST
快速链表 quicklist
列表

对象的整个生命周期中,编码不是一成不变的,比如集合对象。当集合中所有元素都可以用整数表示时,底层数据结构采用整数集合;执行 SADD 命令往集合添加元素时,redis 总会校验待添加元素是否可以解析为整数,如果解析失败,则会将集合存储结构转换为字典。
if (subject->encoding == OBJ_ENCODING_INTSET) {
    if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
         
        subject->ptr = intsetAdd(subject->ptr,llval,&success);
         
    } else {
        // 编码转换
        setTypeConvert(subject,OBJ_ENCODING_HT);
    }
}
对象在不同情况下可能采用不同的数据结构存储,那对象可能同时采用多种数据结构存储吗?根据上面的表格,有序集合可能采用压缩列表、跳跃表和字典存储。使用字典存储时,根据成员查找分值的时间复杂度为 O(1),而对于 ZRANGE 与 ZRANK 等命令,需要排序才能实现,时间复杂度至少为 O(NlogN);使用跳跃表存储时,ZRANGE 与 ZRANK 等命令的时间复杂度为 O(logN),而根据成员查找分值的时间复杂度同样是 O(logN)。字典与跳跃表各有优势,因此 Redis 会同时采用字典与跳跃表存储有序集合。这里有读者可能会有疑问,同时采用两种数据结构存储不浪费空间吗?数据都是通过指针引用的,两种存储方式只需要额外存储一些指针即可,空间消耗是可以接受的。有序集合存储结构定义如下:
typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;
观察表 -1,注意到编码 OBJ_ENCODING_RAW 和 OBJ_ENCODING_EMBSTR 都表示的是简单动态字符串,那么这两种编码有什么区别吗?在回答此问题之前需要先了解结构体 robj 的定义:
#define LRU_BITS 24
 
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS;  // 缓存淘汰使用
    int refcount;           // 引用计数
    void *ptr;
} robj;
下面详细分析结构体各字段含义:ptr 是 void* 类型的指针,指向实际存储的某一种数据结构,但是当 robj 存储的是数据可以用 long 类型表示时,数据直接存储在 ptr 字段。可以看出,为了创建一个字符串对象,必须分配两次内存,robj 与 sds 存储空间;两次内存分配效率低下,且数据分离存储降低了计算机高速缓存的效率。因此提出 OBJ_ENCODING_EMBSTR 编码的字符串,当字符串内容比较短时,只分配一次内存,robj 与 sds 连续存储,以此提升内存分配效率与数据访问效率。OBJ_ENCODING_EMBSTR 编码的字符串内存结构如下图 - 1 所示:

图 -1 EMBSTR 编码字符串对象内存结构 refcount 存储当前对象的引用次数,用于实现对象的共享。共享对象时,refcount 加 1;删除对象时,refcount 减 1,当 refcount 值为 0 时释放对象空间。删除对象的代码如下:
void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        switch(o->type) {// 根据对象类型,释放其指向数据结构空间
        case OBJ_STRING: freeStringObject(o); break;
        case OBJ_LIST: freeListObject(o); break;
        case OBJ_SET: freeSetObject(o); break;
        …………
        }
        zfree(o); // 释放对象空间
    } else {
        // 引用计数减 1
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount–;  
    }
}
lru 字段占 24 比特,用于实现缓存淘汰策略,可以在配置文件中使用 maxmemory-policy 指令配置已用内存达到最大内存限制时的缓存淘汰策略。lru 根据用户配置缓存淘汰策略存储不同数据,常用的策略就是 LRU 与 LFU,LRU 的核心思想是,如果数据最近被访问过,那么将来被访问的几率也更高,此时 lru 字段存储的是对象访问时间;LFU 的核心思想是,如果数据过去被访问多次,那么将来被访问的频率也更高,此时 lru 字段存储的是上次访问时间与访问次数。假如使用 GET 命令访问数据时,会执行下面代码更新对象的 lru 字段:
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
    updateLFU(val);
} else {
    val->lru = LRU_CLOCK();
}
LRU_CLOCK 函数用于获取当前时间,注意此时间不是实时获取的,redis1 秒为周期执行系统调用获取精确时间,缓存在全局变量 server.lruclock,LRU_CLOCK 函数获取的只是缓存在此变量中的时间。updateLFU 函数用于更新对象的上次访问时间与访问次数,函数实现如下:
void updateLFU(robj *val) {
    unsigned long counter = LFUDecrAndReturn(val);
    counter = LFULogIncr(counter);
    val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
可以发现 lru 的低 8 比特存储的是对象的访问次数,高 16 比特存储的是对象的上次访问时间,以分钟为单位;需要特别注意的是函数 LFUDecrAndReturn,其返回计数值 counter,对象的访问次数在此值上累加。为什么不直接累加呢?假设每次只是简单的对访问次数累加,那么越老的数据一般情况下访问次数越大,即使该对象可能很长时间已经没有访问。因此访问次数应该有一个随时间衰减的过程,函数 LFUDecrAndReturn 实现了此衰减功能。
1.2 客户端结构体 client 简介
Redis 是典型的客户端服务器结构,客户端通过 socket 与服务端建立网络连接并发送命令请求,服务端处理命令请求并回复。Redis 使用结构体 client 存储客户端连接的所有信息,包括但不限于客户端的名称、客户端连接的套接字描述符、客户端当前选择的数据库 ID、客户端的输入缓冲区与输出缓冲区等。结构体 client 字段较多,此处只介绍命令处理主流程所需的关键字段。
typedef struct client {
    uint64_t id;           
    int fd;                
    redisDb *db;           
    robj *name;
time_t lastinteraction
              
    sds querybuf;   
    int argc;              
    robj **argv;
    struct redisCommand *cmd;          
     
    list *reply;           
    unsigned long long reply_bytes;
    size_t sentlen;        
    char buf[PROTO_REPLY_CHUNK_BYTES];
    int bufpos;
        
} client;
各字段含义如下:

1) id:客户端唯一 ID,通过全局对象 server 的 next_client_id 字段实现;
2) fd:客户端 socket 的文件描述符;
3) db:客户端使用 select 命令选择的数据库对象,其结构体定义如下:

typedef struct redisDb {
    int id;                    
    long long avg_ttl;
    
    dict *dict;                
    dict *expires;             
    dict *blocking_keys;       
    dict *ready_keys;          
    dict *watched_keys;           
} redisDb;
其中 id 为数据库序号,默认情况下 Redis 有 16 个数据库,id 序号为 0~15;dict 存储数据库所有键值对;expires 存储键的过期时间;avg_ttl 存储数据库对象的平均 TTL,用于统计;使用命令 BLPOP 阻塞获取列表元素时,如果链表为空,会阻塞客户端,同时将此列表键记录在 blocking_keys;当使用命令 PUSH 向列表添加元素时,会从字典 blocking_keys 中查找该列表键,如果找到说明有客户端正阻塞等待获取此列表键,于是将此列表键记录到字典 ready_keys,以便后续响应正在阻塞的客户端;Redis 支持事务,命令用于 MULTI 开启事务,命令 EXEC 用于执行事务;但是开启事务到执行事务期间,如何保证关心的数据不会被修改呢?Redis 采用乐观锁实现。开启事务的同时可以使用 WATCH key 命令监控关心的数据键,而 watched_keys 字典存储的就是被 WATCH 命令监控的所有数据键,其中 key-value 分别为数据键与客户端对象。当 Redis 服务器接收到写命令时,会从字典 watched_keys 中查找该数据键,如果找到说明有客户端正在监控此数据键,于是会标记客户端对象为 dirty;待 Redis 服务器收到客户端 EXEC 命令时,如果客户端带有 dirty 标记,则会拒绝执行事务。

4) name:客户端名称,可以使用命令 CLIENT SETNAME 设置;
5) lastinteraction:客户端上次与服务器交互的时间,以此实现客户端的超时处理;
6) querybuf:输入缓冲区,recv 函数接收的客户端命令请求会暂时缓存在此缓冲区;
7) argc:输入缓冲区的命令请求是按照 Redis 协议格式编码字符串,需要解析出命令请求的所有参数,参数个数存储在 argc 字段,参数内容被解析为 robj 对象,存储在 argv 数组;
8) cmd:待执行的客户端命令;解析命令请求后,会根据命令名称查找该命令对应的命令对象,存储在客户端 cmd 字段,可以看到其类型为 struct redisCommand;
9) reply:输出链表,链表节点的类型是 robj,存储待返回给客户端的命令回复数据;reply_bytes 表示已返回给客户端的字节数;
10) sentlen:当输出数据缓存在 reply 字段时,表示已返回给客户端的对象数目;当输出数据缓存在 buf 字段时,表示已返回给客户端的字节数目;看到这里读者可能会有疑问,为什么同时需要 reply 和 buf 的存在呢?其实二者只是用于返回不同的数据类型而已,详情参见 3.3 节;
11) buf:输出缓冲区,存储待返回给客户端的命令回复数据,bufpos 表示输出缓冲区中数据的最大字节位置,显然 sentlen~bufpos 区间的数据都是需要返回给客户端的。

1.3 服务端结构体 redisServer 简介
结构体 redisServer 存储 Redis 服务器的所有信息,包括但不限于数据库、配置参数、命令表、监听端口与地址、客户端列表、若干统计信息、RDB 与 AOF 持久化相关信息、主从复制相关信息、集群相关信息等。结构体 redisServer 的段非常多,这里只对部分字段做简要说明,以便读者对于服务端有个粗略了解,至于其他字段在讲解各知识点时会做说明。
struct redisServer {
char *configfile;
int hz;

int dbnum;
redisDb *db;
dict *commands;
 
aeEventLoop *el;
 
int port;
char *bindaddr[CONFIG_BINDADDR_MAX];
int bindaddr_count;
int ipfd[CONFIG_BINDADDR_MAX];
int ipfd_count;
 
list *clients;
int maxidletime;
}
各字段含义如下:

1) configfile:配置文件绝对路径;
2) hz:serverCron 函数的执行频率,默认为 10,可通过参数 hz 配置,最小值 1 最大值 500。Redis 服务器有很多任务需要定时执行,比如说定时清除过期键,定时处理超时客户端链接等,直接使用系统定时器开销较大,函数 serverCron 就用于执行这些定时任务,详情参见 1.4.2 节。当 serverCron 函数的执行频率确定时,通过函数的执行次数就可以判断是否需要执行某个定时任务,宏定义 run_with_period 就实现了此功能,其中 server.cronloops 字段就表示 serverCron 函数已经执行的次数;

#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))
当然由于 hz 是用户配置的,其并不能代表真实的 serverCron 函数执行频率。

3) dbnum:数据库的数目,可通过参数 databases 配置,默认 16;
4) db:数据库数组,数组的每个元素都是 redisDb 类型;
5) commands:命令字典,Redis 支持的所有命令都存储在这个字典中,key 为命令名称,vaue 为 struct redisCommand 对象;
6) el:Redis 是典型的事件驱动程序,el 即代表着 Redis 的事件循环;
7) port:服务器监听端口号,可通过参数 port 配置,默认端口号 6379;
8) bindaddr:绑定的所有 IP 地址,可以通过参数 bind 配置多个,例如 bind 192.168.1.100 10.0.0.1,bindaddr_count 为用户配置的 IP 地址数目;CONFIG_BINDADDR_MAX 常量为 16,即绑定 16 个 IP 地址;Redis 默认会绑定到当前机器所有可用的 Ip 地址;
9) ipfd:针对 bindaddr 字段的所有 IP 地址创建的 socket 文件描述符,ipfd_count 为创建的 socket 文件描述符数目;
10) clients:当前连接到 Redis 服务器的所有客户端;
11) maxidletime:最大空闲时间,可通过参数 timeout 配置,结合 client 对象的 lastinteraction 字段,当客户端超过 maxidletime 没有与服务器交互时,会认为客户端超时并释放该客户端连接;

1.4 命令结构体 redisCommand 简介
Redis 支持的所有命令初始都存储在全局变量 redisCommandTable,类型为 struct redisCommand[],定义及初始化如下:
struct redisCommand redisCommandTable[] = {
{“get”,getCommand,2,”rF”,0,NULL,1,1,1,0,0},
{“set”,setCommand,-3,”wm”,0,NULL,1,1,1,0,0},
…………
}
结构体 redisCommand 相对简单,主要定义了命令的名称、命令处理函数以及命令标志等:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags;
int flags;

long long microseconds, calls;
};
各字段含义如下:

1) name:命令名称;
2) proc:命令处理函数;
3) arity:命令参数数目,用于校验命令请求格式是否正确;当 arity 小于 0 时,表示命令参数数目大于等于 arity;当 arity 大于 0 时,表示命令参数数目必须为 arity;注意命令请求中命令的名称本身也是一个参数,如 GET 命令的参数数目为 2,命令请求格式为“GET key”;
4) sflags:命令标志,例如标识命令时读命令还是写命令,详情参见表 -2;注意到 sflags 的类型为字符串,此处只是为了良好的可读性;
5) flags:命令的二进制标志,服务器启动时解析 sflags 字段生成;
6) calls:从服务器启动至今命令执行的次数,用于统计;
7) microseconds:从服务器启动至今命令总的执行时间,microseconds/calls 即可计算出该命令的平均处理时间,用于统计;

表 -2 命令标志类型

字符标识
二进制标识
含义
相关命令

w
CMD_WRITE
写命令
set、del、incr、lpush

r
CMD_READONLY
读命令
get、exists、llen

m
CMD_DENYOOM
内存不足时,拒绝执行此类命令
set、append、lpush

a
CMD_ADMIN
管理命令
save、shutdown、slaveof

p
CMD_PUBSUB
发布订阅相关命令
subscribe、unsubscribe

s
CMD_NOSCRIPT
命令不可以在 lua 脚本使用
auth、save、brpop

R
CMD_RANDOM
随机命令,即使命令请求参数完全相同,返回结果也可能不容
srandmember、scan、time

S
CMD_SORT_FOR_SCRIPT
当在 lua 脚本使用此类命令时,需要对输出结果做排序
sinter、sunion、sdiff

l
CMD_LOADING
服务器启动载入过程中,只能执行此类命令
select、auth、info

t
CMD_STALE
当从服务器与主服务器断开链接,且从服务器配置 slave-serve-stale-data no 时,从服务器只能执行此类命令
auth、shutdown、info

M
CMD_SKIP_MONITOR
此类命令不会传播给监视器
exec

k
CMD_ASKING

restore-asking

F
CMD_FAST
命令执行时间超过阈值时,会记录延迟事件,此标志用于区分延迟事件类型,F 表示 fast-command
get、setnx、strlen、exists

当服务器接收到一条命令请求时,需要从命令表中查找命令,而 redisCommandTable 命令表是一个数组,意味着查询命令的时间复杂度为 O(N),效率低下。因此 Redis 在服务器初始化时,会将 redisCommandTable 转换为一个字典存储在 redisServer 对象的 commands 字段,key 为命令名称,value 为命令 redisCommand 对象。populateCommandTable 函数实现了命令表从数组到字典的转化,同时解析 sflags 生成 flags:
void populateCommandTable(void) {
int numcommands =
sizeof(redisCommandTable)/sizeof(structredisCommand);

for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;

char *f = c->sflags;
while(*f != ‘\0’) {
switch(*f) {
case ‘w’: c->flags |= CMD_WRITE; break;
case ‘r’: c->flags |= CMD_READONLY; break;
}
f++;
}
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
}
}
同时对于经常使用的命令,Redis 甚至会在服务器初始化的时候将命令缓存在 redisServer 对象,这样使用的时候就不需要每次都从 commands 字典中查找了:
struct redisServer {
struct redisCommand *delCommand,*multiCommand,*lpushCommand,
*lpopCommand,*rpopCommand, *sremCommand, *execCommand,
*expireCommand,*pexpireCommand;
}
1.5 事件处理
Redis 服务器是典型的事件驱动程序,而事件又分为文件事件(socket 的可读可写事件)与时间事件(定时任务)两大类。无论是文件事件还是时间事件都封装在结构体 aeEventLoop:
typedef struct aeEventLoop {
int stop;

aeFileEvent *events;
aeFiredEvent *fired;
aeTimeEvent *timeEventHead;

aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
stop 标识事件循环是否结束;events 为文件事件数组,存储已经注册的文件事件;fired 存储被触发的文件事件;Redis 有多个定时任务,因此理论上应该有多个时间事件节点,多个时间事件形成链表,timeEventHead 即为时间事件链表头结点;Redis 服务器需要阻塞等待文件事件的发生,进程阻塞之前会调用 beforesleep 函数,进程因为某种原因被唤醒之后会调用 aftersleep 函数。事件驱动程序通常存在 while/for 循环,循环等待事件发生并处理,Redis 也不例外,其事件循环如下:
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
函数 aeProcessEvents 为事件处理主函数,其第二个参数是一个标志位,AE_ALL_EVENTS 表示函数需要处理文件事件与时间事件,AE_CALL_AFTER_SLEEP 表示阻塞等待文件事件之后需要执行 aftersleep 函数。
1.5.1 文件事件
Redis 客户端通过 TCP socket 与服务端交互,文件事件指的就是 socket 的可读可写事件。socket 读写操作有阻塞与非阻塞之分,采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的 IO 多路复用模型 select/epoll/kqueue 等,视不同操作系统而定。这里只对 epoll 作简要介绍。epoll 是 linux 内核为处理大量并发网络连接而提出的解决方案,能显著提升系统 CPU 利用率。epoll 使用非常简单,总共只有三个 API,epoll_create 函数创建一个 epoll 专用的文件描述符,用于后续 epoll 相关 API 调用;epoll_ctl 函数向 epoll 注册、修改或删除需要监控的事件;epoll_wait 函数会阻塞进程,直到监控的某个网络连接有事件发生。
int epoll_create(int size)
输入参数 size 通知内核程序期望注册的网络连接数目,内核以此判断初始分配空间大小;注意在 linux2.6.8 版本以后,内核动态分配空间,此参数会被忽略。返回参数为 epoll 专用的文件描述符,不再使用时应该及时关闭此文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函数执行成功时返回 0,否则返回 -1,错误码设置在变量 errno;输入参数含义如下:

1) epfd:函数 epoll_create 返回的 epoll 文件描述符;
2) op:需要进行的操作,EPOLL_CTL_ADD 表示注册事件,EPOLL_CTL_MOD 表示修改网络连接事件,EPOLL_CTL_DEL 表示删除事件;
3) fd:网络连接的 socket 文件描述符;
4) event:需要监控的事件或者已触发的事件,结构体 epoll_event 定义如下:

struct epoll_event {
__uint32_t events;
epoll_data_t data;
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
其中 events 表示需要监控的事件类型或已触发的事件类型,比较常用的是 EPOLLIN 文件描述符可读事件,EPOLLOUT 文件描述符可写事件;data 保存与文件描述符关联的数据。
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int
timeout)
函数执行成功时返回 0,否则返回 -1,错误码设置在变量 errno;输入参数含义如下:

1) epfd:函数 epoll_create 返回的 epoll 文件描述符;
2) epoll_event:作为输出参数使用,用于回传已触发的事件数组;
3) maxevents:每次能处理的最大事件数目;
4) timeout:epoll_wait 函数阻塞超时时间,如果超过 timeout 时间还没有事件发生,函数不再阻塞直接返回;当 timeout 等于 0 时函数立即返回,timeout 等于 - 1 时函数会一直阻塞直到有事件发生。

Redis 并没有直接使用 epoll 提供的的 API,而是同时支持四种 IO 多路复用模型,并将每种模型的 API 进一步统一封装,由文件 ae_evport.c、ae_epoll.c、ae_kqueue.c 和 ae_select.c 实现。
static int aeApiCreate(aeEventLoop *eventLoop);
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
以 epoll 为例,aeApiCreate 函数是对 epoll_create 的封装;aeApiAddEvent 函数用于添加事件,是对 epoll_ctl 的封装;aeApiDelEvent 函数用于删除事件,是对 epoll_ctl 的封装;aeApiPoll 是对 epoll_wait 的封装。四个函数输入参数含义如下:
1) eventLoop:事件循环,与文件事件相关最主要有三个字段,apidata 指向 IO 多路复用模型对象,注意四种 IO 多路复用模型对象的类型不同,因此此字段是 void* 类型;events 存储需要监控的事件数组,以 socket 文件描述符作为数组索引存取元素;fired 存储已出发的事件数组。
以 epoll 模型为例,apidata 字段指向的 IO 多路复用模型对象定义如下:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
其中 epfd 函数 epoll_create 返回的 epoll 文件描述符,events 存储 epoll_wait 函数返回时已触发的事件数组。

2) fd:操作的 socket 文件描述符;
3) mask 或 delmask:添加或者删除的事件类型,AE_NONE 表示没有任何事件;AE_READABLE 表示可读事件;AE_WRITABLE 表示可写事件;
4) tvp:阻塞等待文件事件的超时时间;

这里只对等待事件函数 aeApiPoll 实现作简要介绍:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
// 阻塞等待事件的发生
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;

numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// 转换事件类型为 Redis 定义的
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
// 记录已发生事件到 fired 数组
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
函数首先需要通过 eventLoop->apidata 字段获取到 epoll 模型对应的 aeApiState 结构体对象,才能调用 epoll_wait 函数等待事件的发生;而 epoll_wait 函数将已触发的事件存储到 aeApiState 对象的 events 字段,Redis 再次遍历所有已触发事件,将其封装在 eventLoop->fired 数组,数组元素类型为结构体 aeFiredEvent,只有两个字段,fd 表示发生事件的 socket 文件描述符,mask 表示发生的事件类型,如 AE_READABLE 可读事件和 AE_WRITABLE 可写事件。上面简单介绍了 epoll 的使用,以及 Redis 对 epoll 等 IO 多路复用模型的封装,下面我们回到本小节的主题,文件事件。结构体 aeEventLoop 有一个关键字段 events,类型为 aeFileEvent 数组,存储所有需要监控的文件事件。文件事件结构体定义如下:
typedef struct aeFileEvent {
int mask;
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
其中 mask 存储监控的文件事件类型,如 AE_READABLE 可读事件和 AE_WRITABLE 可写事件;rfileProc 为函数指针,指向读事件处理函数;wfileProc 同样为函数指针,指向写事件处理函数;clientData 指向对应的客户端对象。调用 aeApiAddEvent 函数添加事件之前之前,首先需要调用 aeCreateFileEvent 函数创建对应的文件事件,并存储在 aeEventLoop 结构体的 events 字段,aeCreateFileEvent 函数简单实现如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData){

aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;

return AE_OK;
}
Redis 服务器启动时需要创建 socket 并监听,等待客户端连接;客户端与服务器建立 socket 连接之后,服务器会等待客户端的命令请求;服务器处理完成客户端的命令请求之后,命令回复会暂时缓存在 client 结构体的 buf 缓冲区,待客户端文件描述符的可写事件发生时,才会真正往客户端发送命令回复。这些都需要创建对应的文件事件:
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL);

aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c);

aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c);
可以发现接收客户端连接的处理函数为 acceptTcpHandler,此时还没有创建对应的客户端对象,因此函数 aeCreateFileEvent 第四个参数为 NULL;接收客户端命令请求的处理函数为 readQueryFromClient;向发送命令回复的处理函数为 sendReplyToClient。
最后思考一个问题,aeApiPoll 函数的第二个参数是时间结构体 timeval,存储调用 epoll_wait 时传入的超时时间,那么这个函数怎么计算出来的呢?我们之前提过,Redis 除了要处理各种文件事件外,还需要处理很多定时任务(时间事件),那么当 Redis 由于执行 epoll_wait 而阻塞时,恰巧定时任务到期而需要处理怎么办?要回答这个问题需要分析下 Redis 事件循环的执行函数 aeProcessEvents,函数在调用 aeApiPoll 之前会遍历 Redis 的时间事件链表,查找最早会发生的时间事件,以此作为 aeApiPoll 需要传入的超时时间。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
shortest = aeSearchNearestTimer(eventLoop);
long long ms =
shortest->when_sec – now_sec)*1000 +
shortest->when_ms – now_ms;

// 阻塞等待文件事件发生
numevents = aeApiPoll(eventLoop, tvp);

for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 处理文件事件,即根据类型执行 rfileProc 或 wfileProc
}

// 处理时间事件
processed += processTimeEvents(eventLoop);
}
1.5.2 时间事件
1.5.1 节介绍了 Redis 文件事件,已经知道事件循环执行函数 aeProcessEvents 的主要逻辑:1)查找最早会发生的时间事件,计算超时时间;2)阻塞等待文件事件的产生;3)处理文件事件;4)处理时间事件。时间事件的执行函数为 processTimeEvents。Redis 服务器内部有很多定时任务需要执行,比如说定时清除超时客户端连接,定时删除过期键等,定时任务被封装为时间事件结构体 aeTimeEvent 存储,多个时间事件形成链表,存储在 aeEventLoop 结构体的 timeEventHead 字段,其指向链表首节点。时间事件 aeTimeEvent 定义如下:
typedef struct aeTimeEvent {
long long id;
long when_sec;
long when_ms;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
各字段含义如下:

1) id:时间事件唯一 ID,通过字段 eventLoop->timeEventNextId 实现;
2) when_sec 与 when_ms:时间事件触发的秒数与毫秒数;
3) timeProc:函数指针,指向时间事件处理函数;
4) finalizerProc:函数指针,删除时间事件节点之前会调用此函数;
5) clientData:指向对应的客户端对象;
6) next:指向下一个时间事件节点。

时间事件执行函数 processTimeEvents 的处理逻辑比较简单,只是遍历时间事件链表,判断当前时间事件是否已经到期,如果到期则执行时间事件处理函数 timeProc:
static int processTimeEvents(aeEventLoop *eventLoop) {
te = eventLoop->timeEventHead;
while(te) {
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms)) {
// 处理时间事件
retval = te->timeProc(eventLoop, id, te->clientData);
// 重新设置时间事件到期时间
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,
&te->when_sec,&te->when_ms);
}
}
te = te->next;
}
}
注意时间事件处理函数 timeProc 返回值 retval,其表示此时间事件下次应该被触发的时间,单位毫秒,且是一个相对时间,即从当前时间算起,retval 毫秒后此时间事件会被触发。其实 Redis 只有一个时间事件节点,看到这里读者可能会有疑惑,服务器内部不是有很多定时任务吗,为什么只有一个时间事件呢?回答此问题之前我们需要先分析这个唯一的时间事件节点。Redis 创建时间事件节点的函数为 aeCreateTimeEvent,内部实现非常简单,只是创建时间事件节点并添加到时间事件链表。aeCreateTimeEvent 函数定义如下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop,
long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
其中输入参数 eventLoop 指向事件循环结构体;milliseconds 表示此时间事件触发时间,单位毫秒,注意这是一个相对时间,即从当前时间算起,milliseconds 毫秒后此时间事件会被触发;proc 指向时间事件的处理函数;clientData 指向对应的结构体对象;finalizerProc 同样是函数指针,删除时间事件节点之前会调用此函数。读者可以在代码目录全局搜索 aeCreateTimeEvent,会发现确实只创建了一个时间事件节点:
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
该时间事件在 1 毫秒后会被触发,处理函数为 serverCron,参数 clientData 与 finalizerProc 都为 NULL。而函数 serverCron 实现了 Redis 服务器所有定时任务的周期执行。
int serverCron(struct aeEventLoop *eventLoop, long long id, void
*clientData) {
run_with_period(100) {
//100 毫秒周期执行
}
run_with_period(5000) {
//5000 毫秒周期执行
}
// 清除超时客户端链接
clientsCron();
// 处理数据库
databasesCron();

server.cronloops++;
return 1000/server.hz;
}
变量 server.cronloops 用于记录 serverCron 函数的执行次数,变量 server.hz 表示 serverCron 函数的执行频率,用户可配置,最小为 1 最大为 500,默认为 10。假设 server.hz 取默认值 10,函数返回 1000/server.hz 会更新当前时间事件的触发时间为 100 毫秒后,即 serverCron 的执行周期为 100 毫秒。run_with_period 宏定义实现了定时任务按照指定时间周期执行,其会被替换为一个 if 条件判断,条件为真才会执行定时任务,定义如下:
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz)
|| !(server.cronloops%((_ms_)/(1000/server.hz))))
另外我们可以看到 serverCron 函数会无条件执行某些定时任务,比如清除超时客户端连接,以及处理数据库(清除数据库过期键等)。需要特别注意一点,serverCron 函数的执行时间不能过长,否则会导致服务器不能及时响应客户端的命令请求。以过期键删除为例,分析下 Redis 是如何保证 serverCron 函数的执行时间。过期键删除由函数 activeExpireCycle 实现,由函数 databasesCron 调用,其函数是实现如下:
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25

void activeExpireCycle(int type) {
timelimit = 
 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;

for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
do {
// 查找过期键并删除

if ((iteration & 0xf) == 0) {
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
break;
}
}
}while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4)
}
}
函数 activeExpireCycle 最多遍历 dbs_per_call 个数据库,并记录每个数据库删除的过期键数目;当删除过期键数目大于门限时,认为此数据库过期键较多,需要再次处理。考虑到极端情况,当数据库键数目非常多且基本都过期时,do-while 循环会一直执行下去。因此我们添加 timelimit 时间限制,每执行 16 次 do-while 循环,检测函数 activeExpireCycle 执行时间是否超过 timelimit,如果超过则强制结束循环。初看 timelimit 的计算方式可能会比较疑惑,其计算结果使得函数 activeExpireCycle 的总执行时间占 CPU 时间的 25%。仍然假设 server.hz 取默认值 10,即每秒钟函数 activeExpireCycle 执行 10 次,那么每秒钟函数 activeExpireCycle 的总执行时间为 100000025/100,每次函数 activeExpireCycle 的执行时间为 100000025/100/10,单位微妙。
2 sever 启动过程
上一节我们讲述了客户端,服务端,事件处理等基础知识,下面开始学习 Redis 服务器的启动过程,这里主要分为 server 初始化,监听端口以及等待命令三个小节。
2.1 server 初始化
服务器初始化主流程可以简要分为 7 个步骤:1)初始化配置,包括用户可配置的参数,以及命令表的初始化;2)加载并解析配置文件;3)初始化服务端内部变量,其中就包括数据库;4)创建事件循环 eventLoop;5)创建 socket 并启动监听;6)创建文件事件与时间事件;7)开启事件循环。下面详细介绍步骤 1~4,至于步骤 5~7 将会在 2.2 小节介绍。

图 -2 server 初始化流程步骤 1)初始化配置,由函数 initServerConfig 实现,其实就是给配置参数赋初始值:
void initServerConfig(void) {
//serverCron 函数执行频率,默认 10
server.hz = CONFIG_DEFAULT_HZ;
// 监听端口,默认 6379
server.port = CONFIG_DEFAULT_SERVER_PORT;
// 最大客户端数目,默认 10000
server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
// 客户端超时时间,默认 0,即永不超时
server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
// 数据库数目,默认 16
server.dbnum = CONFIG_DEFAULT_DBNUM;

// 初始化命令表,1.4 小节已经讲过,这里不再详述
populateCommandTable();

…………
}
步骤 2)加载并解析配置文件,入口函数为 loadServerConfig,函数声明如下:
void loadServerConfig(char *filename, char *options)
输入参数 filename 表示配置文件全路径名称,options 表示命令行输入的配置参数,例如我们通常以以下命令启动 Redis 服务器:
/home/user/redis/redis-server /home/user/redis/redis.conf -p 4000
使用 GDB 启动 redis-server,打印函数 loadServerConfig 输入参数如下:
(gdb) p filename
$1 = 0x778880 “/home/user/redis/redis.conf”
(gdb) p options
$2 = 0x7ffff1a21d33 “\”-p\” \”4000\” ”
Redis 的配置文件语法相对简单,每一行是一条配置,格式如“配置 参数 1 [参数 2] [……]”,加载配置文件只需要一行一行将文件内容读取到内存中即可,GDB 打印加载到内存中的配置如下:
(gdb) p config
“bind 127.0.0.1\n\nprotected-mode yes\n\nport 6379\ntcp-backlog 511\n\ntcp-keepalive 300\n\n………”
加载完成后会调用 loadServerConfigFromString 函数解析配置,输入参数 config 即配置字符串,实现如下:
void loadServerConfigFromString(char *config) {
// 分割配置字符串多行,totlines 记录行数
lines = sdssplitlen(config,strlen(config),”\n”,1,&totlines);

for (i = 0; i < totlines; i++) {
// 跳过注释行与空行
if (lines[i][0] == ‘#’ || lines[i][0] == ‘\0′) continue;
argv = sdssplitargs(lines[i],&argc); // 解析配置参数
// 赋值
if (!strcasecmp(argv[0],”timeout”) && argc == 2) {
server.maxidletime = atoi(argv[1]);
}else if (!strcasecmp(argv[0],”port”) && argc == 2) {
server.port = atoi(argv[1]);
}
// 其他配置
}
}
函数首先将输入配置字符串以“n”为分隔符划分为多行,totlines 记录总行数,lines 数组存储分割后的配置,数组元素类型为字符串 SDS;for 循环遍历所有配置行,解析配置参数,并根据参数内容设置结构体 server 各字段。注意 Redis 配置文件中行开始“#”字符标识本行内容为注释,解析时需要跳过。步骤 3)初始化服务器内部变量,比如客户端链表,数据库,全局变量共享对象等;入口函数为 initServer,函数逻辑相对简单,这里只做简要说明;
void initServer(void) {
server.clients = listCreate(); // 初始化客户端链表
// 创建数据库字典
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
…………
}
}
注意数据库字典的 dictType 指向的是结构体 dbDictType,其中定义了数据库字典键的哈希函数,键比较函数,以及键与值的析构函数,定义如下:
dictType dbDictType = {
dictSdsHash,
NULL,
NULL,
dictSdsKeyCompare,
dictSdsDestructor,
dictObjectDestructor
};
数据库的键都是 SDS 类型,键哈希函数为 dictSdsHash,,键比较函数为 dictSdsKeyCompare,键析构函数为 dictSdsDestructor;数据库的值是 robj 对象,值析构函数为 dictObjectDestructor;键和值的内容赋值函数都为 NULL。1.1 节提到对象 robj 的 refcount 字段存储当前对象的引用次数,意味着对象是可以共享的。要注意的是,只有当对象 robj 存储的是 0~10000 以内的整数,对象 robj 才会被共享,且这些共享整数对象的引用计数初始化为 INT_MAX,保证不会被释放。执行命令时 Redis 会返回一些字符串回复,这些字符串对象同样在服务器初始化时创建,且永远不会尝试释放这类对象。所有共享对象都存储在全局结构体变量 shared。
void createSharedObjects(void) {
// 创建命令回复字符串对象
shared.ok = createObject(OBJ_STRING,sdsnew(“+OK\r\n”));
shared.err = createObject(OBJ_STRING,sdsnew(“-ERR\r\n”));
// 创建 0~10000 整数对象
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
shared.integers[j]->encoding = OBJ_ENCODING_INT;
}
}
步骤 4)创建事件循环 eventLoop,即分配结构体所需内存,并初始化结构体各字段;epoll 就是在此时创建的:
aeEventLoop *aeCreateEventLoop(int setsize) {
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

if (aeApiCreate(eventLoop) == -1) goto err;
}
输入参数 setsize 理论上等于用户配置的虽大客户端数目即可,但是为了确保安全,这里设置 setsize 等于最大客户端数目加 128。函数 aeApiCreate 内部调用 epoll_create 创建 epoll,并初始化结构体 eventLoop 的字段 apidata。
2.2 启动监听
上节介绍了服务器初始化的前面 4 个步骤,初始化配置;加载并解析配置文件;初始化服务端内部遍历,包括数据库,全局共享变量等;创建时间循环 eventLoop。完成这些操作之后,Redis 将创建 socket 并启动监听,同时创建对应的文件事件与时间事件并开始事件循环。下面将详细介绍步骤 5~7。步骤 5)创建 socket 并启动监听;用户可通过指令 port 配置 socket 绑定端口号,指令 bind 配置 socket 绑定 IP 地址;注意指令 bind 可配置多个 IP 地址,中间用空格隔开;创建 socket 时只需要循环所有 IP 地址即可。
int listenToPort(int port, int *fds, int *count) {
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
// 创建 socket 并启动监听,文件描述符存储在 fds 数组作为返回参数
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
// 设置 socket 非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
}
输入参数 port 表示用户配置的端口号,server 结构体的 bindaddr_count 字段存储用户配置的 IP 地址数目,bindaddr 字段存储用户配置的所有 IP 地址。函数 anetTcpServer 实现了 socket 的创建,绑定,以及监听流程,这里不做过多详述。参数 fds 与 count 可用作输出参数,fds 数组存储创建的所有 socket 文件描述符,count 存储 socket 数目。注意到所有创建的 socket 都会设置为非阻塞模式,原因在于 Redis 使用了 IO 多路复用模式,其要求 socket 读写必须是非阻塞的,函数 anetNonBlock 通过系统调用 fcntl 设置 socket 非阻塞模式。步骤 6)创建文件事件与时间事件;步骤 5 中已经完成了 socket 的创建与监听,1.5.1 节提到 socket 的读写事件被抽象为文件事件,因为对于监听的 socket 还需要创建对应的文件事件。
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR){
}
}
server 结构体的 ipfd_count 字段存储创建的监听 socket 数目,ipfd 数组存储创建的所有监听 socket 文件描述符,需要循环所有的监听 socket,为其创建对应的文件事件。可以看到监听事件的处理函数为 acceptTcpHandler,实现了 socket 连接请求的 accept,以及客户端对象的创建。1.5.2 小节提到定时任务被抽象为时间事件,且 Redis 只创建了一个时间事件,在服务端初始化时创建。此时间事件的处理函数为 serverCron,初次创建时 1 毫秒后备触发。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
exit(1);
}
步骤 7)开启事件循环;前面 6 个步骤已经完成了服务端的初始化工作,并在指定 IP 地址、端口监听客户端连接,同时创建了文件事件与时间事件;此时只需要开启事件循环等待事件发生即可。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 开始事件循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 事件处理主函数
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
事件处理主函数 aeProcessEvents 已经详细介绍过,这里需要重点关注函数 beforesleep,其在每次事件循环开始,即 Redis 阻塞等待文件事件之前执行。函数 beforesleep 会执行一些不是很费时的操作,集群相关操作,过期键删除操作(这里可称为快速过期键删除),向客户端返回命令回复等。这里简要介绍下快速过期键删除操作。
void beforeSleep(struct aeEventLoop *eventLoop) {
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
}
Redis 过期键删除有两种策略:1)访问数据库键时,校验该键是否过期,如果过期则删除;2)周期性删除过期键,beforeSleep 函数与 serverCron 函数都会执行。server 结构体的 active_expire_enabled 字段表示是否开启周期性删除过期键策略,用户可通过 set-active-expire 指令配置;masterhost 字段存储当前 Redis 服务器的 master 服务器的域名,如果为 NULL 说明当前服务器不是某个 Redis 服务器的 slaver。注意到这里依然是调用函数 activeExpireCycle 执行过期键删除,只是参数传递的是 ACTIVE_EXPIRE_CYCLE_FAST,表示快速过期键删除。回顾下 1.5.2 节讲述函数 activeExpireCycle 的实现,函数计算出 timelimit,即函数最大执行时间,循环删除过期键时会校验函数执行时间是否超过此限制,超过则结束循环。显然快速过期键删除时只需要缩短 timelimit 即可,计算策略如下:
void activeExpireCycle(int type) {
static int timelimit_exit = 0;
static long long last_fast_cycle = 0

if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
// 上次 activeExpireCycle 函数是否已经执行完毕
if (!timelimit_exit) return;
// 当前时间距离上次执行快速过期键删除是否已经超过 2000 微妙
if (start < last_fast_cycle + 1000*2) return;
last_fast_cycle = start;
}
// 快速过期键删除时,函数执行时间不超过 1000 微妙
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = 1000;
}
执行快速过期键删除有很多限制,当函数 activeExpireCycle 正在执行时直接返回;当上次执行快速过期键删除的时间距离当前时间小于 2000 微妙时直接返回。思考下为什么可以通过变量 timelimit_exit 判断函数 activeExpireCycle 是否正在执行呢?注意到变量 timelimit_exit 声明为 static,即函数执行完毕不会释放变量空间。那么可以在函数 activeExpireCycle 入口赋值 timelimit_exit 为 0,返回之前赋值 timelimit_exit 为 1,由此便可通过变量 timelimit_exit 判断函数 activeExpireCycle 是否正在执行。变量 last_fast_cycle 声明为 static 也是同样的原因。同时可以看到当执行快速过期键删除时,设置函数 activeExpireCycle 的最大执行时间为 1000 微妙。函数 aeProcessEvents 为事件处理主函数,首先查找最近发生的时间事件,调用 epoll_wait 阻塞等待文件事件的发生并设置超时事件;待 epoll_wait 返回时,处理触发的文件事件;最后处理时间事件。步骤 6 中已经创建了文件事件,为监听 socket 的读事件,事件处理函数为 acceptTcpHandler,即当客户端发起 socket 连接请求时,服务端会执行函数 acceptTcpHandler 处理。acceptTcpHandler 函数主要做了两件事:1)accept 客户端的连接请求;2)创建客户端对象;3)创建文件事件。步骤 2 与步骤 3 由函数 createClient 实现,输入参数 fd 为 accept 客户端连接请求后生成的 socket 文件描述符。
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// 设置 socket 为非阻塞模式
anetNonBlock(NULL,fd);
// 设置 TCP_NODELAY
anetEnableTcpNoDelay(NULL,fd);
// 如果服务端配置了 tcpkeepalive,则设置 SO_KEEPALIVE
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR){
}
}
为了使用 IO 多路复用模式,此处同样需要设置 socket 为非阻塞模式。TCP 是基于字节流的可靠传输层协议,为了提升网络利用率,一般默认都会开启 Nagle。当应用层调用 write 函数发送数据时,TCP 并不一定会立刻将数据发送出去,根据 Nagle 算法,还必须满足一定条件才行。Nagle 是这样规定的:如果数据包长度大于一定门限时,则立即发送;如果数据包中含有 FIN(表示断开 TCP 链接)字段,则立即发送;如果当前设置了 TCP_NODELAY 选项,则立即发送;如果所有条件都不满足,默认需要等待 200 毫秒超时后才会发送。Redis 服务器向客户端返回命令回复时,希望 TCP 能立即将该回复发送给客户端,因此需要设置 TCP_NODELAY。思考下如果不设置会怎么样呢?从客户端分析,命令请求的响应时间会大大加长。TCP 是可靠的传输层协议,每次都需要经历三次握手与四次挥手,为了提升效率,可以设置 SO_KEEPALIVE,即 TCP 长连接,这样 TCP 传输层会定时发送心跳包确认该连接的可靠性。应用层也不再需要频繁的创建于释放 TCP 连接了。server 结构体的 tcpkeepalive 字段表示是否启用 TCP 长连接,用户可通过参数 tcp-keepalive 配置。接收到客户端连接请求之后,服务器需要创建文件事件等待客户端的命令请求,可以看到文件事件的处理函数为 readQueryFromClient,当服务器接收到客户端的命令请求时,会执行此此函数。
3 命令处理过程
上一节分析了服务器的启动过程,包括配置文件的解析,创建 socket 并启动监听,创建文件事件与时间事件并开启事件循环。服务器启动完成后,只需要等待客户端连接并发送命令请求即可。本小节主要介绍命令的处理过程,可以分为三个阶段,解析命令请求,命令调用和返回结果给客户端。
3.1 命令解析
TCP 是一种基于字节流的传输层通信协议,因此接收到的 TCP 数据不一定是一个完整的数据包,其有可能是多个数据包的组合,也有可能是某一个数据包的部分,这种现象被称为半包与粘包。如图 - 3 所示。

图 -3 TCP 半包与粘包客户端应用层分别发送三个数据包,data3、data2 和 data1,但是 TCP 传输层在真正发送数据时,将 data3 数据包分割为 data3_1 与 data3_2,并且将 data1 与 data2 数据合并,此时服务器接收到的数据包就不是一个完整的数据包。为了区分一个完整的数据包,通常有如下三种方法:1)数据包长度固定;2)通过特定的分隔符区分,比如 HTTP 协议就是通过换行符区分的;3)通过在数据包头部设置长度长度字段区分数据包长度,比如 FastCGI 协议。Redis 采用自定义协议格式实现不同命令请求的区分,例如当用户在 redis-cli 客户端键入下面命令:
SET redis-key value1 vlaue2 value3
客户端会将该命令请求转换为以下协议格式,然后发送给服务器:
*5\r\n$3\r\n$9redis-key\r\n$6value1\r\n$6vlaue2\r\n$6value3\r\n
其中,换行符 rn 用于区分命令请求的若干参数,“*5”表示该命令请求有 5 个参数,“$3”、“$9”和“$6”等表示该参数字符串长度,多个请求参数之间用“rn”分隔开需要注意的是,Redis 还支持在 telnet 会话输入命令的方式,只是此时没有了请求协议中的“*”来声明参数的数量,因此必须使用空格来分割各个参数,服务器在接收到数据之后,会将空格作为参数分隔符解析命令请求。这种方式的命令请求称为内联命令。Redis 服务器接收到的命令请求首先存储在客户端对象的 querybuf 输入缓冲区,然后解析命令请求各个参数,并存储在客户端对象的 argv(参数对象数组)和 argc(参数数目)字段。参考 2.2 小节可以知道解析客户端命令请求的入口函数为 readQueryFromClient,会读取 socket 数据存储到客户端对象的输入缓冲区,并调用函数 processInputBuffer 解析命令请求。processInputBuffer 函数主要逻辑如图 - 4 所示。

图 -4 命令解析流程图下面简要分析通过 redis-cli 客户端发送的命令请求的解析过程。假设客户端命令请求为“SET redis-key value1”,在函数 processMultibulkBuffer 添加断点,GDB 打印客户端输入缓冲区内容如下:
(gdb) p c->querybuf
$3 = (sds) 0x7ffff1b45505
“*3\r\n$3\r\nSET\r\n$9\r\nredis-key\r\n$6\r\nvalue1\r\n”
解析该命令请求可以分为 2 个步骤,1)解析命令请求参数数目;2)循环解析每个请求参数。下面详细分析每个步骤的源码实现步骤 1)解析命令请求参数数目;querybuf 指向命令请求首地址,命令请求参数数目的协议格式为“3rn”,即首字符必须是“”,并且可以使用字符“r”定位到行尾位置;解析后的参数数目暂存在客户端对象的 multibulklen 字段,表示等待解析的参数数目,变量 pos 记录已解析命令请求的长度。
// 定位到行尾
newline = strchr(c->querybuf,’\r’);

// 解析命令请求参数数目,并存储在客户端对象的 multibulklen 字段
serverAssertWithInfo(c,NULL,c->querybuf[0] == ‘*’);
string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
c->multibulklen = ll;

// 记录已解析位置偏移量
pos = (newline-c->querybuf)+2;
// 分配请求参数存储空间
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
GDB 打印主要变量内容如下:
(gdb) p c->multibulklen
$9 = 3
(gdb) p pos
$10 = 4
步骤 2)循环解析每个请求参数:命令请求各参数的协议格式为“$3\r\nSET\r\n”,即首字符必须是“$”。解析当前参数之前需要解析出参数的字符串长度,可以使用字符“r”定位到行尾位置;注意到解析参数长度时,字符串开始位置为 querybuf+pos+1;字符串参数长度暂存在客户端对象的 bulklen 字段,同时更新已解析字符串长度 pos。
// 定位到行尾
newline = strchr(c->querybuf+pos,’\r’);
// 解析当前参数字符串长度,字符串首字符偏移量为 pos
if (c->querybuf[pos] != ‘$’) {
return C_ERR;
}
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
pos += newline-(c->querybuf+pos)+2;
c->bulklen = ll;
GDB 打印主要变量内容如下:
(gdb) p c->querybuf+pos
$13 = 0x7ffff1b4550d “SET\r\n$9\r\nredis-key\r\n$6\r\nvalue1\r\n”
(gdb) p c->bulklen
$15 = 3
(gdb) p pos
$16 = 8
解析出参数字符串长度之后,可直接读取该长度的参数内容,并创建字符串对象;同时需要更新待解析参数 multibulklen。
// 解析参数
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;

// 待解析参数数目减一
c->multibulklen–;
当 multibulklen 值更新尾 0 时,说明参数解析完成,结束循环。读者可以思考下,待解析参数数目,当前参数长度为什么都需要暂存在客户端结构体,使用函数局部变量行不行?肯定是不行的,原因就在于上面提到的 TCP 半包与粘包现象,服务器可能只接收到部分命令请求,例如“3rn$3\r\nSET\r\n$9rnredis”。当函数 processMultibulkBuffer 执行完毕时,同样只会解析部分命令请求“3rn$3\r\nSET\r\n$9rn”,此时就需要记录该命令请求待解析的参数数目,以及待解析参数的长度;而剩余待解析的参数“redis”会继续缓存在客户端的输入缓冲区。
3.2 命令调用
参考图 -4,解析完成命令请求之后,会调用函数 processCommand 处理该命令请求,而处理命令请求之前还有很多校验逻辑,比如说客户端是否已经完成认证,命令请求参数是否合法等。下面简要列出若干校验规则。校验 1)如果是 quit 命令直接返回并关闭客户端;
if (!strcasecmp(c->argv[0]->ptr,”quit”)) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
校验 2)执行函数 lookupCommand 查找命令后,如果命令不存在返回错误;
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
addReplyErrorFormat(c,”unknown command ‘%s'”,(char*)c->argv[0]->ptr);
return C_OK;
}
校验 3)如果命令参数数目不合法,返回错误。命令结构体的 arity 用于校验参数数目是否合法,当 arity 小于 0 时,表示命令参数数目大于等于 arity;当 arity 大于 0 时,表示命令参数数目必须为 arity;注意命令请求中命令的名称本身也是一个参数。
if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
addReplyErrorFormat(c,”wrong number of arguments for ‘%s’ command”,
c->cmd->name);
return C_OK;
}
校验 4)如果使用指令“requirepass password”设置了密码,且客户端没未认证通过,只能执行 auth 命令,auth 命令格式为“AUTH password”。
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){
addReply(c,shared.noautherr);
return C_OK;
}
校验 5)如果使用指令“maxmemory <bytes>”设置了最大内存限制,且当前内存使用量超过了该配置门限,服务器会拒绝执行带有“m”(CMD_DENYOOM)标识的命令,如 SET 命令、APPEND 命令和 LPUSH 命令等。命令标识参见 1.4 小节。
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
addReply(c, shared.oomerr);
return C_OK;
}
}
校验 6)除了上面的 5 种校验,还有很多校验规则,比如集群相关校验,持久化相关校验,主从复制相关校验,发布订阅相关校验,以及事务操作等。这些校验规则会在相关章节会作详细介绍。当所有校验规则都通过后,才会调用命令处理函数执行命令,代码如下:
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;

// 更新统计信息:当前命令执行时间与调用次数
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;

// 记录慢查询日志
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
执行命令完成后,如果有必要,还需要更新统计信息,记录慢查询日志,AOF 持久化该命令请求,传播命令请求给所有的从服务器等。持久化与主从复制会在相关章节会作详细介绍,这里主要介绍慢查询日志的实现方式。
void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc,
long long duration) {
// 执行时间超过门限,记录该命令
if (duration >= server.slowlog_log_slower_than)
listAddNodeHead(server.slowlog,
slowlogCreateEntry(c,argv,argc,duration));

// 慢查询日志最多记录条数为 slowlog_max_len,超过需删除
while (listLength(server.slowlog) > server.slowlog_max_len)
listDelNode(server.slowlog,listLast(server.slowlog));
}
可以使用指令“slowlog-log-slower-than 10000”配置执行时间超过多少毫秒才会记录慢查询日志,指令“slowlog-max-len 128”配置慢查询日志最大数目,超过会删除最早的日志记录。可以看到慢查询日志记录在服务端结构体的 slowlog 字段,即存储速度非常快,并不会影响命令执行效率。用户可通过“SLOWLOG subcommand [argument]”命令查看服务器记录的慢查询日志。
3.3 返回结果
Redis 服务器返回结果类型不同,协议格式不同,而客户端可以根据返回结果的第一个字符判断返回类型。Redis 的返回结果可以分为 5 类:
1)状态回复,第一个字符是“+”;例如,SET 命令执行完毕会向客户端返回“+OKrn”。
addReply(c, ok_reply ? ok_reply : shared.ok);
变量 ok_reply 通常为 NULL,则返回的是共享变量 shared.ok,在服务器启动时就完成了共享变量的初始化。
shared.ok = createObject(OBJ_STRING,sdsnew(“+OK\r\n”));
2)错误回复,第一个字符是“-”;例如,当客户端请求命令不存在时,会向客户端返回“-ERR unknown command ‘testcmd’”。
addReplyErrorFormat(c,”unknown command ‘%s'”,(char*)c->argv[0]->ptr);
而函数 addReplyErrorFormat 内部实现会拼装错误回复字符串。
addReplyString(c,”-ERR “,5);
addReplyString(c,s,len);
addReplyString(c,”\r\n”,2);
3)整数回复,第一个字符是“:”;例如,INCR 命令执行完毕向客户端返回“:100rn”。
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
其中共享变量 shared.colon 与 shared.crlf 同样都是在服务器启动时就完成了初始化。
shared.colon = createObject(OBJ_STRING,sdsnew(“:”));
shared.crlf = createObject(OBJ_STRING,sdsnew(“\r\n”));
4)批量回复,第一个字符是“$”;例如,GET 命令查找键向客户端返回结果“$5rnhellorn”,其中 $5 表示返回字符串长度。
// 计算返回对象 obj 长度,并拼接为字符串“$5\r\n”
addReplyBulkLen(c,obj);
addReply(c,obj);
addReply(c,shared.crlf);
5)多条批量回复,第一个字符是“”;例如,LRANGE 命令可能会返回多个多个值,格式为“3rn$6\r\nvalue1\r\n$6rnvalue2rn$6\r\nvalue3\r\n”,与命令请求协议格式相同,“*3”表示返回值数目,“$6”表示当前返回值字符串长度,多个返回值用“rn”分隔开。
// 拼接返回值数目“*3\r\n”
addReplyMultiBulkLen(c,rangelen);
// 循环输出所有返回值
while(rangelen–) {
// 拼接当前返回值长度“$6\r\n”
addReplyLongLongWithPrefix(c,len,’$’);
addReplyString(c,p,len);
addReply(c,shared.crlf);
}
可以看到 5 种类型的返回结果都是调用类似于 addReply 函数返回的,那么是这些方法将返回结果发送给客户端的吗?其实不是。回顾 1.2 小节讲述的客户端结构体 client,其中有两个关键字段 reply 和 buf,分别表示输出链表与输出缓冲区,而函数 addReply 会直接或者间接的调用以下两个函数将返回结果暂时缓存在 reply 或者 buf 字段。
// 添加字符串都输出缓冲区
int _addReplyToBuffer(client *c, const char *s, size_t len)

// 添加各种类型的对象到输出链表
void _addReplyObjectToList(client *c, robj *o)
void _addReplySdsToList(client *c, sds s)
void _addReplyStringToList(client *c, const char *s, size_t len)
需要特别注意的是,reply 和 buf 字段不可能同时缓存待返回给客户端的数据。从客户端结构体的 sentlen 字段就能看出,当输出数据缓存在 reply 字段时,sentlen 表示已返回给客户端的对象数目;当输出数据缓存在 buf 字段时,sentlen 表示已返回给客户端的字节数目。那么当 reply 和 buf 字段同时缓存有输出数据呢?只有 sentlen 字段显然是不够的。从_addReplyToBuffer 函数实现同样可以看出该结论。
int _addReplyToBuffer(client *c, const char *s, size_t len) {
if (listLength(c->reply) > 0) return C_ERR;
}
调用函数_addReplyToBuffer 缓存数据到输出缓冲区时,如果检测到 reply 字段有待返回给客户端的数据,函数返回错误。而通常缓存数据时都会先尝试缓存到 buf 输出缓冲区,如果失败会再次尝试缓存到 reply 输出链表。
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
而函数 addReply 在将待返回给客户端的数据暂时缓存在输出缓冲区或者输出链表的同时,会将当前客户端添加到服务端结构体的 clients_pending_write 链表,以便后续能快速查找出哪些客户端有数据需要发送。
listAddNodeHead(server.clients_pending_write,c);
看到这里读者可能会有疑问,函数 addReply 只是将待返回给客户端的数据暂时缓存在输出缓冲区或者输出链表,那么什么时候将这些数据发送给客户端呢?读者是否还记得在介绍开启事件循环时,提到函数 beforesleep 在每次事件循环阻塞等待文件事件之前执行,主要执行一些不是很费时的操作,比如过期键删除操作,向客户端返回命令回复等。函数 beforesleep 会遍历 clients_pending_write 链表中每一个客户端节点,并发送输出缓冲区或者输出链表中的数据。
// 遍历 clients_pending_write 链表
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
listDelNode(server.clients_pending_write,ln);
// 向客户端发送数据
if (writeToClient(c->fd,c,0) == C_ERR) continue;
}
看到这里我想大部分读者可能都会认为返回结果已经发送给客户端,命令请求也已经处理完成了。其实不然,读者可以思考这么一个问题,当返回结果数据量非常大时,是无法一次性将所有数据都发送给客户端的,即函数 writeToClient 执行之后,客户端输出缓冲区或者输出链表中可能还有部分数据未发送给客户端。这时候怎么办呢?很简单,只需要添加文件事件,监听当前客户端 socket 文件描述符的可写事件即可。
if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR){
}
可以看到该文件事件的事件处理函数为 sendReplyToClient,即当客户端可写时,函数 sendReplyToClient 会发送剩余部分的数据给客户端。至此,命令请求才算是真正处理完成了。
4 本文小结
为了更好的理解服务器与客户端的交互,本文首先介绍了一些基础结构体,如对象结构体 robj,客户端结构体 client,服务端结构体 redisServer 以及命令结构体 redisCommand。Redis 服务器是典型的事件驱动程序,将事件处理分为两大类:文件事件与时间事件。文件事件即 socket 的可读可写事件,时间事件即需要周期性执行的一些定时任务。Redis 采用比较成熟的 IO 多路复用模型(select/epoll 等)处理文件事件,并对这些 IO 多路复用模型做了简单封装。Redis 服务器只维护了一个时间事件节点,该时间事件处理函数为 serverCron,执行了所有需要周期性执行的一些定时任务。事件是理解 Redis 的基石,希望读者能认真学习。最后本文介绍了服务器处理客户端命令请求的整个流程,包括服务器启动监听,接收命令请求并解析,执行命令请求,返回命令回复等。

正文完
 0