共计 23730 个字符,预计需要花费 60 分钟才能阅读完成。
Redis 作为一个 Client-Server 架构的数据库,其源码中少不了用来实现网络通信的局部。
为了实现高并发的网络通信,咱们罕用的 Linux 操作系统,就提供了 select、poll 和 epoll 三种编程模型,而在 Linux 上运行的 Redis,通常就会采纳其中的 epoll 模型来进行网络通信。
(1) 为什么 Redis 不应用根本的 Socket 编程模型?
应用 Socket 模型实现网络通信时,须要通过 创立 Socket、监听端口、解决连贯 和 读写申请等多个步骤
须要留神的是,accept 函数是阻塞函数,也就是说,如果此时始终没有客户端连贯申请,那么,服务器端的执行流程会始终阻塞在 accept 函数。
尽管它可能实现服务器端和客户端之间的通信,然而程序每调用一次 accept 函数,只能解决一个客户端连贯。
这个显然不满足高并发的要求
IO 模型 https://weikeqin.com/2020/06/…
参考 IO 模型,要想满足高并发,个别都应用 IO 多路复用
(2) RedisServer 网络申请解决流程
(2.1) 绑定地址并监听套接字(bind listen)
// file: src/server.c | |
// 监听端口 | |
/** | |
* @param port | |
* @param *fds | |
* @param *count | |
*/ | |
int listenToPort(int port, int *fds, int *count) { | |
// ... | |
for (j = 0; j < server.bindaddr_count || j == 0; j++) { | |
// ... 省略 绑定 IPV6 IPV4 的细节 anetTcp6Server anetTcpServer | |
// 绑定 | |
fds[*count] = anetTcpServer(server.neterr,port,NULL, | |
server.tcp_backlog); | |
// ... | |
} | |
return C_OK; | |
} |
Redis 是反对开启多个端口的,所以在 listenToPort 中咱们看到是启用一个循环来调用 anetTcpServer。
在 anetTcpServer 中,逐渐会开展调用,直到执行到 bind 和 listen 零碎调用。
// file: src/anet.c | |
/** | |
* @param *err | |
* @param port | |
* @param *bindaddr | |
* @param backlog | |
*/ | |
int anetTcpServer(char *err, int port, char *bindaddr, int backlog) | |
{return _anetTcpServer(err, port, bindaddr, AF_INET, backlog); | |
} | |
/** | |
* @param *err | |
* @param port | |
* @param *bindaddr | |
* @param af | |
* @param backlog | |
*/ | |
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) | |
{ | |
int s = -1, rv; | |
char _port[6]; /* strlen("65535") */ | |
struct addrinfo hints, *servinfo, *p; | |
// 创立套接字 | |
s = socket(p->ai_family,p->ai_socktype,p->ai_protocol) | |
// 设置端口重用 | |
anetSetReuseAddr(err,s) | |
// 监听 | |
anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) |
/** | |
* 监听 | |
* | |
* @param *err | |
* @param s 对应创立的套接字 fd | |
* @param *sa socket 地址信息 (协定 地址) | |
* @param len | |
* @param backlog | |
*/ | |
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) { | |
// 绑定 ip 端口 | |
bind(s,sa,len) | |
// 监听套接字 | |
listen(s, backlog) | |
return ANET_OK; | |
} |
(2.2) 和客户端建设连贯(accept)
// file: src/networking.c | |
/** | |
* 接管 tcp 处理器 | |
* | |
* @param *el | |
* @param fd | |
* @param *privdata | |
* @param mask | |
*/ | |
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { | |
// ... | |
// 接管 tcp 申请 | |
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); | |
// ... | |
// 接管通用解决 | |
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); | |
} |
(2.2.1) 接管 tcp 申请 -anetTcpAccept
// file: src/anet.c | |
/** | |
* 接管 tcp 申请 | |
* | |
* @param *err | |
* @param s fd | |
* @param *ip | |
* @param ip_len | |
* @param *port | |
*/ | |
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) { | |
int fd; | |
struct sockaddr_storage sa; // 套接字地址存储构造体 | |
socklen_t salen = sizeof(sa); | |
// 接管申请 | |
fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen) | |
// ... | |
return fd; | |
} |
// file: src/anet.c | |
/** | |
* @param *err | |
* @param s | |
* @param *sa | |
* @param *len | |
*/ | |
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { | |
int fd; | |
while(1) { | |
// 接管 socket 数据 | |
// fd 是 socket 返回的 socket,指向的定义的 SOCKADDR_IN 构造体指针,指针的大小 | |
fd = accept(s,sa,len); | |
// ... | |
break; | |
} | |
return fd; | |
} |
(2.2.2) 接管公共处理器 -acceptCommonHandler
/** | |
* | |
*/ | |
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); |
// file: src/networking.c | |
/** | |
* @param *conn | |
* @param flags | |
* @param *ip | |
*/ | |
static void acceptCommonHandler(connection *conn, int flags, char *ip) { | |
client *c; | |
// 创立 redisClient 对象 | |
c = createClient(conn) | |
// 建设连贯 | |
connAccept(conn, clientAcceptHandler) | |
} |
(2.2.2.1) connCreateAcceptedSocket()
conn 是怎么创立的
// file: src/connection.c | |
/** | |
* @param fd | |
*/ | |
connection *connCreateAcceptedSocket(int fd) { | |
// 创立连贯 | |
connection *conn = connCreateSocket(); | |
// | |
conn->fd = fd; | |
// 设置连贯状态为接管中 | |
conn->state = CONN_STATE_ACCEPTING; | |
return conn; | |
} |
// file: src/connection.h | |
/** | |
* 连接结构体 | |
*/ | |
struct connection { | |
ConnectionType *type; // 连贯类型 在前面会用到 | |
ConnectionState state; // 连贯状态 | |
short int flags; | |
short int refs; | |
int last_errno; | |
void *private_data; // 公有数据 | |
ConnectionCallbackFunc conn_handler; // 连贯处理器 | |
ConnectionCallbackFunc write_handler; // 写处理器 | |
ConnectionCallbackFunc read_handler; // 读处理器 | |
int fd; // | |
}; |
(2.2.2.2) 创立 redisClient 对象 -createClient()
// file: src/networking.c | |
/** | |
* @param *conn | |
*/ | |
client *createClient(connection *conn) { | |
// 为用户连贯创立 client 构造体 | |
client *c = zmalloc(sizeof(client)); | |
if (conn) { | |
// ... 解决连贯 | |
// 注册读事件处理器,等连贯可读时调用 回调函数是 readQueryFromClient | |
connSetReadHandler(conn, readQueryFromClient); | |
// 会把新创建的 client 构造体放到 conn 构造体的 private_data 字段里 | |
connSetPrivateData(conn, c); | |
} | |
// 设置 client 的一些参数 | |
selectDb(c,0); | |
uint64_t client_id = ++server.next_client_id; | |
c->id = client_id; | |
c->resp = 2; | |
c->conn = conn; | |
// ... | |
return c; | |
} |
client 构造体次要属性
// file: src/server.h | |
typedef struct client { | |
uint64_t id; /* 客户端增量惟一 ID */ | |
connection *conn; /** 连贯 */ | |
int resp; /* 响应协定版本 可能是 2 或 3 */ | |
redisDb *db; /* 指向以后选中的 Db */ | |
robj *name; /* 由客户端设置的名称 */ | |
sds querybuf; /* 用来累积客户端查问的缓冲区 */ | |
size_t qb_pos; /* querybuf 中读取到的地位 */ | |
// ... 省略局部字段 | |
/* Response buffer */ | |
int bufpos; // | |
char buf[PROTO_REPLY_CHUNK_BYTES]; // | |
} client; |
// file: src/connection.h | |
/* | |
* 注册读事件处理器,等连贯可读时调用 | |
* | |
* @param *conn | |
* @param func ConnectionCallbackFunc 类型的回调函数 | |
*/ | |
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func); | |
} |
(2.2.2.3) 接管申请 -connAccept()
/* The connection module does not deal with listening and accepting sockets, | |
* so we assume we have a socket when an incoming connection is created. | |
* | |
* The fd supplied should therefore be associated with an already accept()ed | |
* socket. | |
* | |
* connAccept() may directly call accept_handler(), or return and call it | |
* at a later time. This behavior is a bit awkward but aims to reduce the need | |
* to wait for the next event loop, if no additional handshake is required. | |
* | |
* IMPORTANT: accept_handler may decide to close the connection, calling connClose(). | |
* To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED | |
* in this case, and connAccept() returns with an error. | |
* | |
* connAccept() callers must always check the return value and on error (C_ERR) | |
* a connClose() must be called. | |
* | |
* @param *conn 连贯 | |
* @param accept_handler 接管处理器 是一个回调函数 ConnectionCallbackFunc | |
*/ | |
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {return conn->type->accept(conn, accept_handler); | |
} |
conn->type 对应 ConnectionType 构造体
// file: src/connection.h | |
typedef struct ConnectionType {void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); | |
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); | |
int (*write)(struct connection *conn, const void *data, size_t data_len); | |
int (*read)(struct connection *conn, void *buf, size_t buf_len); | |
void (*close)(struct connection *conn); | |
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); | |
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); | |
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); | |
const char *(*get_last_error)(struct connection *conn); | |
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); | |
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); | |
int (*get_type)(struct connection *conn); | |
} ConnectionType; |
file: src/networking.c | |
/** | |
* @param *conn | |
*/ | |
void clientAcceptHandler(connection *conn) { | |
// | |
client *c = connGetPrivateData(conn); | |
// | |
moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE, | |
REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED, | |
c); | |
} |
(2.3) 从连贯的 socket 读取客户端发送的数据(recv/read)
上一步监听后,等到 client 向 redisServer 发送的数据达到,会触发设置的回调办法 readQueryFromClient
,redisServer 会调用readQueryFromClient()
办法
// file: src/networking.c | |
/** | |
* @param *conn | |
*/ | |
void readQueryFromClient(connection *conn) { | |
// 从连贯的公有数据获取 client // 在创立连贯时把 client 放到了 connection 的 private_data 字段 | |
client *c = connGetPrivateData(conn); | |
// | |
// 客户端输出缓冲区中有更多的数据,请持续解析它,以防查看是否有要执行的残缺命令。processInputBuffer(c); | |
} |
(2.4) 解决读到的数据
从输出缓冲区读到数据后,上面就开始解决数据
// file: src/networking.c | |
/* This function is called every time, in the client structure 'c', there is | |
* more query buffer to process, because we read more data from the socket | |
* or because a client was blocked and later reactivated, so there could be | |
* pending query buffer, already representing a full command, to process. */ | |
void processInputBuffer(client *c) { | |
// ... | |
processCommandAndResetClient(c); | |
} | |
/* This function calls processCommand(), but also performs a few sub tasks | |
* for the client that are useful in that context: | |
* | |
* 1. It sets the current client to the client 'c'. | |
* 2. calls commandProcessed() if the command was handled. | |
* | |
* The function returns C_ERR in case the client was freed as a side effect | |
* of processing the command, otherwise C_OK is returned. */ | |
int processCommandAndResetClient(client *c) { | |
int deadclient = 0; | |
server.current_client = c; | |
// 解决命令 | |
if (processCommand(c) == C_OK) {commandProcessed(c); | |
} | |
return deadclient ? C_ERR : C_OK; | |
} |
// file: src/server.c | |
/** | |
* 解决各种命令 get set del exits quit lpush sadd 等 | |
* | |
* @param *c | |
*/ | |
int processCommand(client *c) { | |
// 查找命令,并进行命令合法性检查,以及命令参数个数查看 | |
/* Now lookup the command and check ASAP about trivial error conditions | |
* such as wrong arity, bad command name and so forth. */ | |
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); | |
// ... 省略其余命令解决逻辑 | |
// 解决命令 | |
/* Exec the command */ | |
if (c->flags & CLIENT_MULTI && | |
c->cmd->proc != execCommand && c->cmd->proc != discardCommand && | |
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) | |
{ // 如果是 MULTI 事务,则入队 | |
queueMultiCommand(c); | |
addReply(c,shared.queued); | |
} else { // 调用 call 间接解决 | |
call(c,CMD_CALL_FULL); | |
c->woff = server.master_repl_offset; | |
if (listLength(server.ready_keys)) | |
handleClientsBlockedOnKeys();} | |
return C_OK; | |
} |
// file: src/server.c | |
/** | |
* 依据 key 查找值 | |
* | |
* @param name | |
*/ | |
struct redisCommand *lookupCommand(sds name) { | |
// | |
return dictFetchValue(server.commands, name); | |
} |
/** | |
* call() 是 Redis 执行命令的外围。* | |
* @param *c | |
* @param flags | |
*/ | |
void call(client *c, int flags) { | |
// 要执行的 redis 命令 | |
struct redisCommand *real_cmd = c->cmd; | |
// 调用命令处理函数 | |
c->cmd->proc(c); | |
} |
proc 对应的 command 有以下几种
/** | |
* | |
*/ | |
struct redisCommand redisCommandTable[] = { | |
{"module",moduleCommand,-2, | |
"admin no-script", | |
0,NULL,0,0,0,0,0,0}, | |
{"get",getCommand,2, | |
"read-only fast @string", | |
0,NULL,1,1,1,0,0,0}, | |
/* Note that we can't flag set as fast, since it may perform an | |
* implicit DEL of a large key. */ | |
{"set",setCommand,-3, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
{"setnx",setnxCommand,3, | |
"write use-memory fast @string", | |
0,NULL,1,1,1,0,0,0}, | |
{"setex",setexCommand,4, | |
"write use-memory @string", | |
0,NULL,1,1,1,0,0,0}, | |
// .. | |
{"rpush",rpushCommand,-3, | |
"write use-memory fast @list", | |
0,NULL,1,1,1,0,0,0}, | |
{"lpush",lpushCommand,-3, | |
"write use-memory fast @list", | |
0,NULL,1,1,1,0,0,0}, | |
// ... | |
{"sadd",saddCommand,-3, | |
"write use-memory fast @set", | |
0,NULL,1,1,1,0,0,0}, | |
// ... | |
{"zadd",zaddCommand,-4, | |
"write use-memory fast @sortedset", | |
0,NULL,1,1,1,0,0,0}, | |
// ... | |
{"stralgo",stralgoCommand,-2, | |
"read-only @string", | |
0,lcsGetKeys,0,0,0,0,0,0} | |
}; |
如果命令是 get,其对应的命令处理函数就是 getCommand
(2.4.1) getCommand
// file: t_string.c | |
/** | |
* @param *c | |
*/ | |
void getCommand(client *c) {getGenericCommand(c); | |
} | |
int getGenericCommand(client *c) { | |
robj *o; | |
// 查找 key | |
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL) | |
return C_OK; | |
// 找到 key 对应的值了 | |
if (o->type != OBJ_STRING) { // key 的对象类型不是 string 类型 返回谬误 | |
addReply(c,shared.wrongtypeerr); | |
return C_ERR; | |
} else { // key 的对象类型是 string 类型 | |
// 将后果增加到输入缓冲区中 | |
addReplyBulk(c,o); | |
return C_OK; | |
} | |
} |
// file: src/networking.c | |
/* Add a Redis Object as a bulk reply */ | |
void addReplyBulk(client *c, robj *obj) {addReplyBulkLen(c,obj); | |
addReply(c,obj); | |
addReply(c,shared.crlf); | |
} |
(2.4.2) setCommand
// file: src.t_string.c | |
/** | |
* | |
* @param *c | |
* @param flags | |
* @param *key | |
* @param *val | |
* @param *expire | |
* @param unit | |
* @param *ok_reply | |
* @param *abort_reply | |
*/ | |
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { | |
// 64 位精度整数 | |
long long milliseconds = 0; /* initialized to avoid any harmness warning */ | |
if (expire) {if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) | |
return; | |
if (milliseconds <= 0) {addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); | |
return; | |
} | |
if (unit == UNIT_SECONDS) milliseconds *= 1000; | |
} | |
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || | |
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) | |
{addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); | |
return; | |
} | |
genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1); | |
server.dirty++; | |
if (expire) setExpire(c,c->db,key,mstime()+milliseconds); | |
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); | |
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, | |
"expire",key,c->db->id); | |
addReply(c, ok_reply ? ok_reply : shared.ok); | |
} |
(2.5) 给客户端返回后果(send)
无论是执行 get 命令还是 set 命令,最初执行完命令都会调用 addReply()
办法
在addReply
办法中做了两件事件:
1、prepareClientToWrite
判断是否须要返回数据,并且将以后 client 增加到期待写返回数据队列中。
2、调用 _addReplyToBuffer
和 _addReplyObjectToList
办法将返回值写入到输入缓冲区中,期待写入 socekt
// file: src/networking.c | |
/* ----------------------------------------------------------------------------- | |
* 更高级别的函数用于在客户端输入缓冲区上对数据进行排队。* 以下函数是命令实现将调用的函数。* -------------------------------------------------------------------------- */ | |
/* | |
* 将对象“obj”字符串示意增加到客户端输入缓冲区。* | |
* @param *c redis client | |
* @param *obj 命令执行的后果 类型是 redisObject | |
*/ | |
void addReply(client *c, robj *obj) {// 判断 client 是否能够接管新数据 (假客户端不能接管) | |
if (prepareClientToWrite(c) != C_OK) return; | |
// 依据 redisobject 格局把数据写入缓存 | |
if (sdsEncodedObject(obj)) { // obj 如果是 row 或者 embstr 格局 | |
// 尝试将应答增加到客户端构造中的动态缓冲区。if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) | |
// 将回复增加到回复列表中。_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); | |
} else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格局 | |
/* 对于整数编码字符串,咱们只需应用优化函数将其转换为字符串,并将后果字符串附加到输入缓冲区。*/ | |
char buf[32]; | |
// 数字转为字符串 | |
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); | |
if (_addReplyToBuffer(c,buf,len) != C_OK) | |
_addReplyProtoToList(c,buf,len); | |
} else {serverPanic("Wrong obj->encoding in addReply()"); | |
} | |
} |
// file: src/networking.c | |
/* ----------------------------------------------------------------------------- | |
* 低级函数用于向输入缓冲区增加更多数据。* -------------------------------------------------------------------------- */ | |
/** | |
* 尝试将应答增加到客户端构造中的动态缓冲区。* 如果缓冲区已满或回复列表不为空,则返回 C_ERR,在这种状况下,必须将回复增加到回复列表中。* | |
* @param *c | |
* @param *s 要写入的数据 | |
* @param len 数据长度 | |
*/ | |
int _addReplyToBuffer(client *c, const char *s, size_t len) { | |
// 残余缓冲区大小 | |
size_t available = sizeof(c->buf)-c->bufpos; | |
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; | |
/* 如果回复列表中曾经有内容,则无奈向动态缓冲区增加更多内容。*/ | |
if (listLength(c->reply) > 0) return C_ERR; | |
/* 查看缓冲区是否有足够的空间用于此字符串。*/ | |
if (len > available) return C_ERR; | |
// 把数据 *s(char[]类型) 拷贝到 client 对象的 Response buffer 中 | |
memcpy(c->buf+c->bufpos,s,len); | |
// 更新已应用缓冲区大小 | |
c->bufpos+=len; | |
return C_OK; | |
} |
/** | |
* 将回复增加到回复列表中。* 留神:对该函数的一些编辑须要转发到 AddReplyFromClient。* | |
* @param *c | |
* @param *s 要写入的数据 | |
* @param len 数据长度 | |
*/ | |
void _addReplyProtoToList(client *c, const char *s, size_t len) { | |
// 写入回复后敞开 | |
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; | |
// 双向链表尾部 | |
listNode *ln = listLast(c->reply); | |
// 链表里存的数据是 clientReplyBlock 类型 是一个 buf 数组,有大小限度 | |
clientReplyBlock *tail = ln? listNodeValue(ln): NULL; | |
/* Note that 'tail' may be NULL even if we have a tail node, because when | |
* addReplyDeferredLen() is used, it sets a dummy node to NULL just | |
* fo fill it later, when the size of the bulk length is set. */ | |
/* 尽可能追加到尾部字符串。*/ | |
if (tail) { | |
/* 复制咱们能够放入尾部的局部,并将其余部分留给新节点 */ | |
size_t avail = tail->size - tail->used; | |
// * s 要复制的局部 (可能是全副,可能是局部) | |
size_t copy = avail >= len? len: avail; | |
// 复制到 buf 数组里 | |
memcpy(tail->buf + tail->used, s, copy); | |
tail->used += copy; | |
s += copy; | |
len -= copy; | |
} | |
// len>0 | |
if (len) { | |
/* 创立一个新节点,确保至多为其调配了 16K */ | |
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; | |
tail = zmalloc(size + sizeof(clientReplyBlock)); | |
/* take over the allocation's internal fragmentation */ | |
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); | |
tail->used = len; | |
memcpy(tail->buf, s, len); | |
listAddNodeTail(c->reply, tail); | |
c->reply_bytes += tail->size; | |
} | |
// 缓冲区达到限度后异步敞开客户端 | |
asyncCloseClientOnOutputBufferLimitReached(c); | |
} |
(2.5.1) 回复列表中的数据什么时候写入到输入缓冲区呢?
// file: src/ae.c | |
void aeMain(aeEventLoop *eventLoop) { | |
eventLoop->stop = 0; | |
// 循环 | |
while (!eventLoop->stop) { | |
// 处理事件 | |
aeProcessEvents(eventLoop, AE_ALL_EVENTS| | |
AE_CALL_BEFORE_SLEEP| | |
AE_CALL_AFTER_SLEEP); | |
} | |
} | |
/** | |
* 处理事件 | |
*/ | |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) | |
{ | |
// | |
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) | |
eventLoop->beforesleep(eventLoop); | |
} |
// file: src/server.c | |
/* This function gets called every time Redis is entering the | |
* main loop of the event driven library, that is, before to sleep | |
* for ready file descriptors. | |
* | |
* Note: This function is (currently) called from two functions: | |
* 1. aeMain - The main server loop | |
* 2. processEventsWhileBlocked - Process clients during RDB/AOF load | |
* | |
* If it was called from processEventsWhileBlocked we don't want | |
* to perform all actions (For example, we don't want to expire | |
* keys), but we do need to perform some actions. | |
* | |
* The most important is freeClientsInAsyncFreeQueue but we also | |
* call some other low-risk functions. */ | |
void beforeSleep(struct aeEventLoop *eventLoop) { | |
/* 解决具备挂起的输入缓冲区的写入。*/ | |
handleClientsWithPendingWritesUsingThreads();} |
// file: src/networking.c | |
int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write); | |
if (processed == 0) return 0; /* Return ASAP if there are no clients. */ | |
/* If I/O threads are disabled or we have few clients to serve, don't | |
* use I/O threads, but thejboring synchronous code. */ | |
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {return handleClientsWithPendingWrites(); | |
} | |
/* Start threads if needed. */ | |
if (!server.io_threads_active) startThreadedIO(); | |
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); | |
/* Distribute the clients across N different lists. */ | |
listIter li; | |
listNode *ln; | |
/* 把 server.clients_pending_write 链表 赋值 给迭代器 &li */ | |
listRewind(server.clients_pending_write,&li); | |
int item_id = 0; | |
// 遍历链表 server.clients_pending_write | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
c->flags &= ~CLIENT_PENDING_WRITE; | |
// & 状态 = 尽快敞开 | |
if (c->flags & CLIENT_CLOSE_ASAP) { | |
// 删除双向链表里的以后节点 | |
listDelNode(server.clients_pending_write, ln); | |
continue; | |
} | |
int target_id = item_id % server.io_threads_num; | |
// 把 c 增加到 io_threads_list[target_id]链表尾部 前面会用到 | |
listAddNodeTail(io_threads_list[target_id],c); | |
item_id++; | |
} | |
/* 把 io_threads_list[0]链表 赋值 给迭代器 &li */ | |
listRewind(io_threads_list[0],&li); | |
// 遍历链表 io_threads_list[0] | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
// 将 client 的数据发送进来 | |
writeToClient(c,0); | |
} | |
listEmpty(io_threads_list[0]); | |
/* 把 server.clients_pending_write 链表 赋值 给迭代器 &li */ | |
listRewind(server.clients_pending_write,&li); | |
// 遍历链表 server.clients_pending_write | |
while((ln = listNext(&li))) { | |
// 获取节点 | |
client *c = listNodeValue(ln); | |
/* 如果某些客户端中存在挂起的写入,装置写入处理程序。*/ | |
// 如果一次发送不完则筹备下一次发送 | |
if (clientHasPendingReplies(c) && | |
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) | |
{freeClientAsync(c); | |
} | |
} | |
listEmpty(server.clients_pending_write); | |
return processed; | |
} |
(2.5.2) 写事件处理器 -sendReplyToClient
// file: src/networking.c | |
/** | |
* 写事件处理器 | |
* 仅仅发送数据到 client | |
* | |
* Write event handler. Just send data to the client. | |
*/ | |
void sendReplyToClient(connection *conn) {client *c = connGetPrivateData(conn); | |
// | |
writeToClient(c,1); | |
} |
(2.5.3) 把数据写入客户端的输入缓冲区
// file: src/networking.c | |
/** | |
* 把数据写入客户端的输入缓冲区 | |
* | |
* Write data in output buffers to client. Return C_OK if the client | |
* is still valid after the call, C_ERR if it was freed because of some | |
* error. If handler_installed is set, it will attempt to clear the | |
* write event. | |
* | |
* This function is called by threads, but always with handler_installed | |
* set to 0. So when handler_installed is set to 0 the function must be | |
* thread safe. */ | |
int writeToClient(client *c, int handler_installed) { | |
/* Update total number of writes on server */ | |
server.stat_total_writes_processed++; | |
ssize_t nwritten = 0, totwritten = 0; | |
size_t objlen; | |
clientReplyBlock *o; | |
while(clientHasPendingReplies(c)) {if (c->bufpos > 0) { // 缓冲区有数据 | |
// 把缓冲区数据写入 socket | |
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); | |
} else { // 解决待发送链表 | |
o = listNodeValue(listFirst(c->reply)); | |
objlen = o->used; | |
if (objlen == 0) { | |
c->reply_bytes -= o->size; | |
listDelNode(c->reply,listFirst(c->reply)); | |
continue; | |
} | |
// 把链表节点里的数据写入 socket | |
nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); | |
} | |
} | |
return C_OK; | |
} |
// file: src/connection.h | |
/** | |
* 把数据写入到连贯里 | |
*/ | |
static inline int connWrite(connection *conn, const void *data, size_t data_len) {return conn->type->write(conn, data, data_len); | |
} |
(3) RedisServer 里 IO 多路复用代码详解
在 initServer
这个函数内,Redis 做了这么三件重要的事件。
1、创立一个 epoll 对象
2、对配置的端口进行监听(listen)
3、把 listen socket 让 epoll 给治理起来
//file: src/server.c | |
void initServer(void) { | |
// 2.1 创立 epoll | |
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); | |
// 2.2 绑定监听服务端口 | |
// Open the TCP listening socket for the user commands. | |
listenToPort(server.port,server.ipfd,&server.ipfd_count) | |
// 2.3 注册 accept 事件处理器 | |
// Create an event handler for accepting new connections in TCP and Unix domain sockets. | |
for (j = 0; j < server.ipfd_count; j++) {aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, | |
acceptTcpHandler,NULL) | |
} | |
} |
(3.1) 创立 epoll-aeCreateEventLoop
redisServer 构造
// file: src/server.h | |
// redisServer 构造体 | |
struct redisServer { | |
// ... 省略局部代码 | |
aeEventLoop *el; | |
// ... 省略局部代码 | |
} |
aeEventLoop 构造
// file: src/ae.h | |
// 基于事件的程序的状态 | |
/* State of an event based program */ | |
typedef struct aeEventLoop { | |
int maxfd; // 以后注册的最大文件描述符 | |
int setsize; // 跟踪的最大文件描述符数 | |
long long timeEventNextId; | |
time_t lastTime; /* Used to detect system clock skew */ | |
aeFileEvent *events; // 注册事件数组的指针 指向 aeFileEvent 数组 | |
aeFiredEvent *fired; // 就绪事件数组的指针 指向 aeFiredEvent 数组 | |
aeTimeEvent *timeEventHead; // 工夫事件 | |
int stop; | |
void *apidata; // 指向 aeApiState 构造体 创立的 epoll 对象就在 aeApiState->epfd | |
aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数 | |
aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数 | |
int flags; | |
} aeEventLoop; |
// file: src/ae.h | |
// 文件事件构造 | |
/* File event structure */ | |
typedef struct aeFileEvent { | |
int mask; // 标记 可读 / 可写 / 屏障 | |
aeFileProc *rfileProc; // 写事件回调 | |
aeFileProc *wfileProc; // 读事件回调 | |
void *clientData; // 扩大数据 | |
} aeFileEvent; |
Redis 在操作系统提供的 epoll 对象根底上又封装了一个 eventLoop 进去,所以创立的时候是先申请和创立 eventLoop。
// file: src/ae.c | |
/** | |
* 创立 aeEventLoop 构造体 | |
* | |
* @param setsize | |
*/ | |
aeEventLoop *aeCreateEventLoop(int setsize) { | |
aeEventLoop *eventLoop; | |
// ... 省略局部代码 | |
eventLoop = zmalloc(sizeof(*eventLoop)) | |
// 未来的各种回调事件就都会存在这里 | |
// eventLoop->events 是一个指针 指向数组 元素类型:aeFileEvent 大小:setsize | |
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); | |
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); | |
// ... 省略局部代码 | |
// 创立 epoll | |
aeApiCreate(eventLoop) | |
} |
// file: src/ae_epoll.c | |
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState)); | |
// ... 省略局部代码 | |
// 真正创立 epoll | |
// 调 linux epoll_create()函数 创立 epoll | |
state->epfd = epoll_create(2024); /* 1024 is just a hint for the kernel */ | |
// ... 省略局部代码 | |
eventLoop->apidata = state; | |
return 0; | |
} |
(3.2) 注册事件及回调函数 -aeCreateFileEvent
// file: src/ae.c | |
/** | |
* @param *eventLoop | |
* @param fd | |
* @param mask 0: 未注册事件 1: 描述符可读时触发 2: 描述符可写时触发 3: | |
* @param *proc aeFileProc 类型 入参传的是 acceptTcpHandler 函数 回调时会用到这个函数 | |
* @param *clientData | |
*/ | |
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, | |
aeFileProc *proc, void *clientData) | |
{if (fd >= eventLoop->setsize) { | |
errno = ERANGE; | |
return AE_ERR; | |
} | |
// 从 aeFileEvent 事件数组里取出一个文件事件构造 | |
aeFileEvent *fe = &eventLoop->events[fd]; | |
// 监听指定 fd 的指定事件 | |
if (aeApiAddEvent(eventLoop, fd, mask) == -1) | |
return AE_ERR; | |
// 设置文件事件类型 以及事件的处理器 | |
fe->mask |= mask; | |
if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件回调 | |
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件回调 | |
// 公有数据 | |
fe->clientData = clientData; | |
if (fd > eventLoop->maxfd) | |
eventLoop->maxfd = fd; | |
return AE_OK; | |
} |
//file: src/ae_epoll.c | |
// 增加事件 | |
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { | |
aeApiState *state = eventLoop->apidata; | |
struct epoll_event ee = {0}; /* avoid valgrind warning */ | |
/* If the fd was already monitored for some event, we need a MOD | |
* operation. Otherwise we need an ADD operation. */ | |
int op = eventLoop->events[fd].mask == AE_NONE ? | |
EPOLL_CTL_ADD : EPOLL_CTL_MOD; | |
// ... | |
// epoll_ctl 增加事件 | |
epoll_ctl(state->epfd,op,fd,&ee); | |
return 0; | |
} |
这个函数其实就是对 epoll_ctl 的一个封装。次要就是理论执行 epoll_ctl EPOLL_CTL_ADD。
每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。
在这个对象上,设置了三个要害货色
rfileProc:读事件回调
wfileProc:写事件回调
clientData:一些额定的扩大数据
未来 当 epoll_wait
发现某个 fd
上有事件产生的时候,这样 redis 首先依据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,而后再看 rfileProc、wfileProc 就能够找到读、写回调处理函数。
listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,公有数据 client_data 也为 null。
(3.3) 获取就绪 socket 并处理事件 -aeMain()
// file: src/ae.c | |
/** | |
* 循环接管申请 | |
* | |
* @param *eventLoop | |
*/ | |
void aeMain(aeEventLoop *eventLoop) { | |
eventLoop->stop = 0; | |
// 循环 | |
while (!eventLoop->stop) { | |
// 处理事件 | |
aeProcessEvents(eventLoop, AE_ALL_EVENTS| | |
AE_CALL_BEFORE_SLEEP| | |
AE_CALL_AFTER_SLEEP); | |
} | |
} |
// 处理事件 返回解决完的事件个数 | |
0 不做任何解决 | |
1 AE_FILE_EVENTS 解决文件事件 | |
2 AE_TIME_EVENTS 解决工夫事件 | |
3 AE_ALL_EVENTS 所有事件 | |
4 AE_DONT_WAIT | |
8 AE_CALL_BEFORE_SLEEP | |
16 AE_CALL_AFTER_SLEEP | |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) | |
{ | |
int processed = 0, numevents; | |
struct timeval tv, *tvp; | |
// 如果 eventLoop 解决前的函数不为空,就执行 | |
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) | |
eventLoop->beforesleep(eventLoop); | |
// 调用多路复用 API,仅在超时或某些事件触发时返回 | |
// 解决文件事件,阻塞工夫由 tvp 决定 | |
numevents = aeApiPoll(eventLoop, tvp); | |
// 解决后的函数不为空 | |
/* After sleep callback. */ | |
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) | |
eventLoop->aftersleep(eventLoop); | |
for (j = 0; j < numevents; j++) {// 先从 eventLoop->fired[j]获取已就绪事件构造体(aeFiredEvent) 获取 fd 后 再从 eventLoop->events 注册事件里获取对应的事件构造体(aeFileEvent) | |
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; | |
// ... | |
// 如果可读 | |
if (!invert && fe->mask & mask & AE_READABLE) { | |
// 调用读事件回调函数 对应 acceptTcpHandler | |
fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ | |
} | |
// 如果可写 触发写事件 | |
if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) { | |
// 调用写事件回调函数 对应 acceptTcpHandler | |
fe->wfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
processed++; | |
} | |
return processed; /* return the number of processed file/time events */ | |
} |
// file: src/ae_poll.c | |
/** | |
* 获取就绪事件 | |
* | |
* @param *eventLoop | |
* @param *tvp | |
*/ | |
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { | |
// 期待事件 | |
aeApiState *state = eventLoop->apidata; | |
int retval, numevents = 0; | |
// 调 linux epoll_wait 函数来获取已就绪 socket | |
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, | |
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); | |
// ... | |
return numevents; | |
} |
aeProcessEvents 就是调用 epoll_wait 来获取就绪 socket。
当发现有某个 socket 上数据就绪当前,则调用当时注册的事件处理器函数 rfileProc 和 wfileProc。
参考资料
Redis 高性能 IO 模型 https://weikeqin.com/2022/01/…
Redis 源码分析与实战 学习笔记 Day9 09 | Redis 事件驱动框架(上):何时应用 select、poll、epoll?https://time.geekbang.org/col…