Redis 作为一个 Client-Server 架构的数据库,其源码中少不了用来实现网络通信的局部。

为了实现高并发的网络通信,咱们罕用的 Linux 操作系统,就提供了 select、poll 和 epoll 三种编程模型,而在 Linux 上运行的 Redis,通常就会采纳其中的 epoll 模型来进行网络通信。

(1) 为什么 Redis 不应用根本的 Socket 编程模型?

应用 Socket 模型实现网络通信时,须要通过 创立Socket、监听端口、解决连贯 和 读写申请等多个步骤

须要留神的是,accept 函数是阻塞函数,也就是说,如果此时始终没有客户端连贯申请,那么,服务器端的执行流程会始终阻塞在 accept 函数。

尽管它可能实现服务器端和客户端之间的通信,然而程序每调用一次 accept 函数,只能解决一个客户端连贯。
这个显然不满足高并发的要求

IO模型 https://weikeqin.com/2020/06/...

参考IO模型,要想满足高并发,个别都应用IO多路复用

(2) RedisServer网络申请解决流程

(2.1) 绑定地址并监听套接字(bind listen)

// file:  src/server.c // 监听端口/** * @param port * @param *fds * @param *count */int listenToPort(int port, int *fds, int *count) {    // ...     for (j = 0; j < server.bindaddr_count || j == 0; j++) {        // ...  省略 绑定IPV6 IPV4的细节  anetTcp6Server  anetTcpServer        // 绑定        fds[*count] = anetTcpServer(server.neterr,port,NULL,                    server.tcp_backlog);        // ...      }    return C_OK;}

Redis 是反对开启多个端口的,所以在 listenToPort 中咱们看到是启用一个循环来调用 anetTcpServer。
在 anetTcpServer 中,逐渐会开展调用,直到执行到 bind 和 listen 零碎调用。

// file:  src/anet.c /** * @param *err * @param port * @param *bindaddr * @param backlog */int anetTcpServer(char *err, int port, char *bindaddr, int backlog){    return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);}/** * @param *err * @param port * @param *bindaddr * @param af * @param backlog */static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog){    int s = -1, rv;    char _port[6];  /* strlen("65535") */    struct addrinfo hints, *servinfo, *p;    // 创立套接字    s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)    // 设置端口重用    anetSetReuseAddr(err,s)    // 监听    anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)
/** * 监听 *  * @param *err * @param s 对应创立的套接字fd * @param *sa socket地址信息 (协定 地址) * @param len * @param backlog  */static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {    // 绑定ip端口    bind(s,sa,len)    // 监听套接字    listen(s, backlog)    return ANET_OK;}


(2.2) 和客户端建设连贯(accept)

// file: src/networking.c /** * 接管tcp处理器 *  * @param *el * @param fd * @param *privdata * @param mask */void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {        // ...         // 接管tcp申请        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);        // ...         // 接管通用解决        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}

(2.2.1) 接管tcp申请-anetTcpAccept

// file: src/anet.c /** * 接管tcp申请  *  * @param *err * @param s fd * @param *ip * @param ip_len * @param *port  */int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {    int fd;    struct sockaddr_storage sa; // 套接字地址存储构造体     socklen_t salen = sizeof(sa);    // 接管申请    fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)            // ...     return fd;}
// file: src/anet.c /** * @param *err * @param s * @param *sa * @param *len */ static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {    int fd;    while(1) {        // 接管socket数据        // fd是socket返回的socket,指向的定义的SOCKADDR_IN 构造体指针,指针的大小        fd = accept(s,sa,len);        // ...        break;    }    return fd;}

(2.2.2) 接管公共处理器-acceptCommonHandler

    /**     *      */    acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
// file: src/networking.c/** * @param *conn * @param flags * @param *ip  */static void acceptCommonHandler(connection *conn, int flags, char *ip) {    client *c;    // 创立redisClient对象    c = createClient(conn)    // 建设连贯     connAccept(conn, clientAcceptHandler)}    
(2.2.2.1) connCreateAcceptedSocket()

conn是怎么创立的

// file: src/connection.c/** * @param fd */ connection *connCreateAcceptedSocket(int fd) {    // 创立连贯    connection *conn = connCreateSocket();    //     conn->fd = fd;    // 设置连贯状态为接管中    conn->state = CONN_STATE_ACCEPTING;    return conn;}
// file: src/connection.h/** * 连接结构体 */ struct connection {    ConnectionType *type;  // 连贯类型 在前面会用到    ConnectionState state; // 连贯状态    short int flags;           short int refs;     int last_errno;    void *private_data;    // 公有数据    ConnectionCallbackFunc conn_handler;  // 连贯处理器    ConnectionCallbackFunc write_handler; // 写处理器    ConnectionCallbackFunc read_handler;  // 读处理器    int fd;  // };
(2.2.2.2) 创立redisClient对象-createClient()
// file: src/networking.c/** * @param *conn */client *createClient(connection *conn) {    // 为用户连贯创立client构造体    client *c = zmalloc(sizeof(client));    if (conn) {        // ... 解决连贯        // 注册读事件处理器,等连贯可读时调用   回调函数是readQueryFromClient         connSetReadHandler(conn, readQueryFromClient);        // 会把新创建的client构造体放到 conn构造体的private_data字段里        connSetPrivateData(conn, c);            }    // 设置client的一些参数    selectDb(c,0);    uint64_t client_id = ++server.next_client_id;    c->id = client_id;    c->resp = 2;    c->conn = conn;    // ...    return c;}

client构造体次要属性

// file: src/server.htypedef struct client {    uint64_t id;            /* 客户端增量惟一ID */    connection *conn;       /** 连贯 */    int resp;               /* 响应协定版本 可能是2或3 */    redisDb *db;            /* 指向以后选中的Db */    robj *name;             /* 由客户端设置的名称 */    sds querybuf;           /* 用来累积客户端查问的缓冲区 */    size_t qb_pos;          /* querybuf中读取到的地位 */    // ... 省略局部字段         /* Response buffer */    int bufpos;    //     char buf[PROTO_REPLY_CHUNK_BYTES];    // } client;
// file: src/connection.h /*  * 注册读事件处理器,等连贯可读时调用  *  * @param *conn * @param func ConnectionCallbackFunc类型的回调函数 */static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {    return conn->type->set_read_handler(conn, func);}
(2.2.2.3) 接管申请-connAccept()
/* The connection module does not deal with listening and accepting sockets, * so we assume we have a socket when an incoming connection is created. * * The fd supplied should therefore be associated with an already accept()ed * socket. * * connAccept() may directly call accept_handler(), or return and call it * at a later time. This behavior is a bit awkward but aims to reduce the need * to wait for the next event loop, if no additional handshake is required. * * IMPORTANT: accept_handler may decide to close the connection, calling connClose(). * To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED * in this case, and connAccept() returns with an error. * * connAccept() callers must always check the return value and on error (C_ERR) * a connClose() must be called. * * @param *conn 连贯 * @param accept_handler 接管处理器 是一个回调函数 ConnectionCallbackFunc */static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {    return conn->type->accept(conn, accept_handler);}

conn->type 对应 ConnectionType构造体

// file: src/connection.h typedef struct ConnectionType {    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);    int (*write)(struct connection *conn, const void *data, size_t data_len);    int (*read)(struct connection *conn, void *buf, size_t buf_len);    void (*close)(struct connection *conn);    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);    const char *(*get_last_error)(struct connection *conn);    int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);    ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);    ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);    ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);    int (*get_type)(struct connection *conn);} ConnectionType;
file: src/networking.c /** * @param *conn */ void clientAcceptHandler(connection *conn) {    //    client *c = connGetPrivateData(conn);    //    moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,                          REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,                          c);}


(2.3) 从连贯的socket读取客户端发送的数据(recv/read)

上一步监听后,等到client向redisServer发送的数据达到,会触发设置的回调办法readQueryFromClient,redisServer会调用readQueryFromClient()办法

// file: src/networking.c/** * @param *conn */ void readQueryFromClient(connection *conn) {    // 从连贯的公有数据获取client  // 在创立连贯时把client放到了connection的private_data字段    client *c = connGetPrivateData(conn);    //     // 客户端输出缓冲区中有更多的数据,请持续解析它,以防查看是否有要执行的残缺命令。     processInputBuffer(c);    }

(2.4) 解决读到的数据

从输出缓冲区读到数据后,上面就开始解决数据

// file: src/networking.c/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */void processInputBuffer(client *c) {    // ...    processCommandAndResetClient(c);}    /* This function calls processCommand(), but also performs a few sub tasks * for the client that are useful in that context: * * 1. It sets the current client to the client 'c'. * 2. calls commandProcessed() if the command was handled. * * The function returns C_ERR in case the client was freed as a side effect * of processing the command, otherwise C_OK is returned. */int processCommandAndResetClient(client *c) {    int deadclient = 0;    server.current_client = c;    // 解决命令    if (processCommand(c) == C_OK) {        commandProcessed(c);    }        return deadclient ? C_ERR : C_OK;}
// file: src/server.c/** * 解决各种命令 get set del exits quit lpush sadd  等 * * @param *c   */int processCommand(client *c) {    // 查找命令,并进行命令合法性检查,以及命令参数个数查看    /* Now lookup the command and check ASAP about trivial error conditions     * such as wrong arity, bad command name and so forth. */    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);    // ... 省略其余命令解决逻辑     // 解决命令    /* Exec the command */    if (c->flags & CLIENT_MULTI &&        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)    { // 如果是 MULTI 事务,则入队        queueMultiCommand(c);        addReply(c,shared.queued);    } else { // 调用 call 间接解决        call(c,CMD_CALL_FULL);        c->woff = server.master_repl_offset;        if (listLength(server.ready_keys))            handleClientsBlockedOnKeys();    }    return C_OK;}
// file: src/server.c/** * 依据key查找值 *  * @param name  */struct redisCommand *lookupCommand(sds name) {    //     return dictFetchValue(server.commands, name);}
/** * call() 是 Redis 执行命令的外围。 * * @param *c * @param flags  */void call(client *c, int flags) {    // 要执行的redis命令    struct redisCommand *real_cmd = c->cmd;    // 调用命令处理函数       c->cmd->proc(c);}

proc对应的command有以下几种

/** * */struct redisCommand redisCommandTable[] = {    {"module",moduleCommand,-2,     "admin no-script",     0,NULL,0,0,0,0,0,0},    {"get",getCommand,2,     "read-only fast @string",     0,NULL,1,1,1,0,0,0},    /* Note that we can't flag set as fast, since it may perform an     * implicit DEL of a large key. */    {"set",setCommand,-3,     "write use-memory @string",     0,NULL,1,1,1,0,0,0},    {"setnx",setnxCommand,3,     "write use-memory fast @string",     0,NULL,1,1,1,0,0,0},    {"setex",setexCommand,4,     "write use-memory @string",     0,NULL,1,1,1,0,0,0},       // ..     {"rpush",rpushCommand,-3,     "write use-memory fast @list",     0,NULL,1,1,1,0,0,0},    {"lpush",lpushCommand,-3,     "write use-memory fast @list",     0,NULL,1,1,1,0,0,0},        // ...     {"sadd",saddCommand,-3,     "write use-memory fast @set",     0,NULL,1,1,1,0,0,0},       // ...     {"zadd",zaddCommand,-4,     "write use-memory fast @sortedset",     0,NULL,1,1,1,0,0,0},          // ...     {"stralgo",stralgoCommand,-2,     "read-only @string",     0,lcsGetKeys,0,0,0,0,0,0}};

如果命令是get,其对应的命令处理函数就是 getCommand

(2.4.1) getCommand

// file: t_string.c /** * @param *c */void getCommand(client *c) {    getGenericCommand(c);}int getGenericCommand(client *c) {    robj *o;    // 查找key    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)        return C_OK;    // 找到key对应的值了    if (o->type != OBJ_STRING) { // key的对象类型不是string类型 返回谬误        addReply(c,shared.wrongtypeerr);        return C_ERR;    } else { // key的对象类型是string类型        // 将后果增加到输入缓冲区中        addReplyBulk(c,o);        return C_OK;    }}
// file: src/networking.c /* Add a Redis Object as a bulk reply */void addReplyBulk(client *c, robj *obj) {    addReplyBulkLen(c,obj);    addReply(c,obj);    addReply(c,shared.crlf);}

(2.4.2) setCommand

// file: src.t_string.c/** *  * @param *c * @param flags * @param *key * @param *val * @param *expire * @param unit * @param *ok_reply * @param *abort_reply */void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {    // 64位精度整数     long long milliseconds = 0; /* initialized to avoid any harmness warning */    if (expire) {        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)            return;        if (milliseconds <= 0) {            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);            return;        }        if (unit == UNIT_SECONDS) milliseconds *= 1000;    }    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))    {        addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);        return;    }    genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);    server.dirty++;    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,        "expire",key,c->db->id);    addReply(c, ok_reply ? ok_reply : shared.ok);}

(2.5) 给客户端返回后果(send)

无论是执行get命令还是set命令,最初执行完命令都会调用addReply()办法
addReply办法中做了两件事件:
1、prepareClientToWrite 判断是否须要返回数据,并且将以后 client 增加到期待写返回数据队列中。
2、调用 _addReplyToBuffer_addReplyObjectToList 办法将返回值写入到输入缓冲区中,期待写入 socekt

// file: src/networking.c /* ----------------------------------------------------------------------------- * 更高级别的函数用于在客户端输入缓冲区上对数据进行排队。 * 以下函数是命令实现将调用的函数。 * -------------------------------------------------------------------------- *//*  * 将对象“obj”字符串示意增加到客户端输入缓冲区。  *  * @param *c  redis client   * @param *obj  命令执行的后果   类型是redisObject */void addReply(client *c, robj *obj) {    // 判断client是否能够接管新数据 (假客户端不能接管)    if (prepareClientToWrite(c) != C_OK) return;    // 依据redisobject格局把数据写入缓存    if (sdsEncodedObject(obj)) { // obj如果是row或者embstr格局        // 尝试将应答增加到客户端构造中的动态缓冲区。        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)            // 将回复增加到回复列表中。            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));      } else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格局        /* 对于整数编码字符串,咱们只需应用优化函数将其转换为字符串,并将后果字符串附加到输入缓冲区。 */        char buf[32];        // 数字转为字符串        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);        if (_addReplyToBuffer(c,buf,len) != C_OK)            _addReplyProtoToList(c,buf,len);    } else {        serverPanic("Wrong obj->encoding in addReply()");    }}
// file: src/networking.c /* ----------------------------------------------------------------------------- * 低级函数用于向输入缓冲区增加更多数据。 * -------------------------------------------------------------------------- *//**  * 尝试将应答增加到客户端构造中的动态缓冲区。 * 如果缓冲区已满或回复列表不为空,则返回C_ERR,在这种状况下,必须将回复增加到回复列表中。  *  * @param *c  * @param *s 要写入的数据 * @param len 数据长度 */int _addReplyToBuffer(client *c, const char *s, size_t len) {    // 残余缓冲区大小    size_t available = sizeof(c->buf)-c->bufpos;    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;    /* 如果回复列表中曾经有内容,则无奈向动态缓冲区增加更多内容。 */    if (listLength(c->reply) > 0) return C_ERR;    /* 查看缓冲区是否有足够的空间用于此字符串。 */    if (len > available) return C_ERR;    // 把数据*s(char[]类型) 拷贝到 client对象的Response buffer中    memcpy(c->buf+c->bufpos,s,len);    // 更新已应用缓冲区大小    c->bufpos+=len;    return C_OK;}
/** * 将回复增加到回复列表中。  * 留神:对该函数的一些编辑须要转发到AddReplyFromClient。 *  * @param *c  * @param *s 要写入的数据 * @param len 数据长度 */ void _addReplyProtoToList(client *c, const char *s, size_t len) {    // 写入回复后敞开    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;    // 双向链表尾部    listNode *ln = listLast(c->reply);    // 链表里存的数据是 clientReplyBlock类型 是一个buf数组,有大小限度    clientReplyBlock *tail = ln? listNodeValue(ln): NULL;    /* Note that 'tail' may be NULL even if we have a tail node, because when     * addReplyDeferredLen() is used, it sets a dummy node to NULL just     * fo fill it later, when the size of the bulk length is set. */    /* 尽可能追加到尾部字符串。*/    if (tail) {        /* 复制咱们能够放入尾部的局部,并将其余部分留给新节点 */        size_t avail = tail->size - tail->used;        // *s要复制的局部 (可能是全副,可能是局部)        size_t copy = avail >= len? len: avail;        // 复制到buf数组里        memcpy(tail->buf + tail->used, s, copy);        tail->used += copy;        s += copy;        len -= copy;    }        // len>0    if (len) {        /* 创立一个新节点,确保至多为其调配了 16K */        size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;        tail = zmalloc(size + sizeof(clientReplyBlock));        /* take over the allocation's internal fragmentation */        tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);        tail->used = len;        memcpy(tail->buf, s, len);        listAddNodeTail(c->reply, tail);        c->reply_bytes += tail->size;    }    // 缓冲区达到限度后异步敞开客户端    asyncCloseClientOnOutputBufferLimitReached(c);}

(2.5.1) 回复列表中的数据什么时候写入到输入缓冲区呢?

// file: src/ae.c void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    // 循环    while (!eventLoop->stop) {        // 处理事件        aeProcessEvents(eventLoop, AE_ALL_EVENTS|                                   AE_CALL_BEFORE_SLEEP|                                   AE_CALL_AFTER_SLEEP);    }}/** * 处理事件 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags){        //         if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)            eventLoop->beforesleep(eventLoop);}
// file: src/server.c/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. * * Note: This function is (currently) called from two functions: * 1. aeMain - The main server loop * 2. processEventsWhileBlocked - Process clients during RDB/AOF load * * If it was called from processEventsWhileBlocked we don't want * to perform all actions (For example, we don't want to expire * keys), but we do need to perform some actions. * * The most important is freeClientsInAsyncFreeQueue but we also * call some other low-risk functions. */void beforeSleep(struct aeEventLoop *eventLoop) {    /* 解决具备挂起的输入缓冲区的写入。 */    handleClientsWithPendingWritesUsingThreads(); }
// file: src/networking.cint handleClientsWithPendingWritesUsingThreads(void) {    int processed = listLength(server.clients_pending_write);    if (processed == 0) return 0; /* Return ASAP if there are no clients. */    /* If I/O threads are disabled or we have few clients to serve, don't     * use I/O threads, but thejboring synchronous code. */    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {        return handleClientsWithPendingWrites();    }    /* Start threads if needed. */    if (!server.io_threads_active) startThreadedIO();    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);    /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    /* 把server.clients_pending_write链表 赋值 给迭代器&li */    listRewind(server.clients_pending_write,&li);    int item_id = 0;    // 遍历链表 server.clients_pending_write    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_WRITE;        // & 状态=尽快敞开        if (c->flags & CLIENT_CLOSE_ASAP) {            // 删除双向链表里的以后节点            listDelNode(server.clients_pending_write, ln);            continue;        }        int target_id = item_id % server.io_threads_num;        // 把c增加到io_threads_list[target_id]链表尾部  前面会用到        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }    /* 把io_threads_list[0]链表 赋值 给迭代器&li */    listRewind(io_threads_list[0],&li);    // 遍历链表 io_threads_list[0]    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        // 将client的数据发送进来        writeToClient(c,0);    }    listEmpty(io_threads_list[0]);    /* 把server.clients_pending_write链表 赋值 给迭代器&li */    listRewind(server.clients_pending_write,&li);    // 遍历链表 server.clients_pending_write    while((ln = listNext(&li))) {        // 获取节点        client *c = listNodeValue(ln);        /* 如果某些客户端中存在挂起的写入,装置写入处理程序。*/        // 如果一次发送不完则筹备下一次发送        if (clientHasPendingReplies(c) &&                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)        {            freeClientAsync(c);        }    }    listEmpty(server.clients_pending_write);    return processed;}

(2.5.2) 写事件处理器-sendReplyToClient

// file: src/networking.c /** * 写事件处理器  * 仅仅发送数据到client *  * Write event handler. Just send data to the client. */void sendReplyToClient(connection *conn) {    client *c = connGetPrivateData(conn);    //     writeToClient(c,1);}

(2.5.3) 把数据写入客户端的输入缓冲区

// file: src/networking.c/** * 把数据写入客户端的输入缓冲区  *  *  Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error.  If handler_installed is set, it will attempt to clear the * write event. * * This function is called by threads, but always with handler_installed * set to 0. So when handler_installed is set to 0 the function must be * thread safe. */int writeToClient(client *c, int handler_installed) {    /* Update total number of writes on server */    server.stat_total_writes_processed++;    ssize_t nwritten = 0, totwritten = 0;    size_t objlen;    clientReplyBlock *o;    while(clientHasPendingReplies(c)) {        if (c->bufpos > 0) { // 缓冲区有数据            // 把缓冲区数据写入socket            nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);                    } else { // 解决待发送链表            o = listNodeValue(listFirst(c->reply));            objlen = o->used;            if (objlen == 0) {                c->reply_bytes -= o->size;                listDelNode(c->reply,listFirst(c->reply));                continue;            }            // 把链表节点里的数据写入socket            nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);                    }    }    return C_OK;}
// file: src/connection.h /** *  把数据写入到连贯里 */static inline int connWrite(connection *conn, const void *data, size_t data_len) {    return conn->type->write(conn, data, data_len);}




(3) RedisServer里IO多路复用代码详解

initServer 这个函数内,Redis 做了这么三件重要的事件。
1、创立一个 epoll 对象
2、对配置的端口进行监听(listen)
3、把 listen socket 让 epoll 给治理起来

//file: src/server.cvoid initServer(void) {    // 2.1 创立 epoll    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);        // 2.2 绑定监听服务端口       // Open the TCP listening socket for the user commands.    listenToPort(server.port,server.ipfd,&server.ipfd_count)    // 2.3 注册accept事件处理器      // Create an event handler for accepting new connections in TCP and Unix domain sockets.    for (j = 0; j < server.ipfd_count; j++) {        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,            acceptTcpHandler,NULL)    }}

(3.1) 创立epoll-aeCreateEventLoop

redisServer构造

// file: src/server.h // redisServer构造体struct redisServer {    // ... 省略局部代码    aeEventLoop *el;    // ... 省略局部代码}

aeEventLoop构造

// file: src/ae.h// 基于事件的程序的状态 /* State of an event based program */typedef struct aeEventLoop {    int maxfd;   // 以后注册的最大文件描述符    int setsize; // 跟踪的最大文件描述符数    long long timeEventNextId;    time_t lastTime;     /* Used to detect system clock skew */    aeFileEvent *events; // 注册事件数组的指针 指向aeFileEvent数组       aeFiredEvent *fired; // 就绪事件数组的指针  指向aeFiredEvent数组    aeTimeEvent *timeEventHead;  // 工夫事件    int stop;    void *apidata; // 指向aeApiState构造体  创立的epoll对象就在aeApiState->epfd    aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数     aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数    int flags;} aeEventLoop;
// file: src/ae.h // 文件事件构造/* File event structure */typedef struct aeFileEvent {    int mask;    // 标记 可读/可写/屏障    aeFileProc *rfileProc;  // 写事件回调    aeFileProc *wfileProc;  // 读事件回调    void *clientData;       // 扩大数据} aeFileEvent;

Redis 在操作系统提供的 epoll 对象根底上又封装了一个 eventLoop 进去,所以创立的时候是先申请和创立 eventLoop。

// file: src/ae.c /** * 创立aeEventLoop构造体 * * @param setsize */aeEventLoop *aeCreateEventLoop(int setsize) {    aeEventLoop *eventLoop;    // ... 省略局部代码     eventLoop = zmalloc(sizeof(*eventLoop))    // 未来的各种回调事件就都会存在这里    // eventLoop->events是一个指针 指向数组 元素类型:aeFileEvent  大小:setsize    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);    // ... 省略局部代码    // 创立epoll    aeApiCreate(eventLoop)}
// file:  src/ae_epoll.c static int aeApiCreate(aeEventLoop *eventLoop) {    aeApiState *state = zmalloc(sizeof(aeApiState));    // ... 省略局部代码     // 真正创立epoll    // 调linux epoll_create()函数 创立epoll    state->epfd = epoll_create(2024); /* 1024 is just a hint for the kernel */        // ... 省略局部代码    eventLoop->apidata = state;    return 0;}

(3.2) 注册事件及回调函数-aeCreateFileEvent

// file:  src/ae.c/** * @param *eventLoop * @param fd  * @param mask 0:未注册事件  1:描述符可读时触发  2:描述符可写时触发  3: * @param *proc aeFileProc类型  入参传的是 acceptTcpHandler函数 回调时会用到这个函数  * @param *clientData */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,        aeFileProc *proc, void *clientData){    if (fd >= eventLoop->setsize) {        errno = ERANGE;        return AE_ERR;    }    // 从aeFileEvent事件数组里取出一个文件事件构造    aeFileEvent *fe = &eventLoop->events[fd];    // 监听指定fd的指定事件    if (aeApiAddEvent(eventLoop, fd, mask) == -1)        return AE_ERR;    // 设置文件事件类型 以及事件的处理器    fe->mask |= mask;    if (mask & AE_READABLE) fe->rfileProc = proc;  // 设置读事件回调    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 设置写事件回调    // 公有数据    fe->clientData = clientData;    if (fd > eventLoop->maxfd)        eventLoop->maxfd = fd;    return AE_OK;}
//file: src/ae_epoll.c// 增加事件static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {    aeApiState *state = eventLoop->apidata;    struct epoll_event ee = {0}; /* avoid valgrind warning */    /* If the fd was already monitored for some event, we need a MOD     * operation. Otherwise we need an ADD operation. */    int op = eventLoop->events[fd].mask == AE_NONE ?            EPOLL_CTL_ADD : EPOLL_CTL_MOD;    // ...     // epoll_ctl 增加事件     epoll_ctl(state->epfd,op,fd,&ee);    return 0;}

这个函数其实就是对 epoll_ctl 的一个封装。次要就是理论执行 epoll_ctl EPOLL_CTL_ADD。

每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。
在这个对象上,设置了三个要害货色
rfileProc:读事件回调
wfileProc:写事件回调
clientData:一些额定的扩大数据

未来 当 epoll_wait 发现某个 fd 上有事件产生的时候,这样 redis 首先依据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,而后再看 rfileProc、wfileProc 就能够找到读、写回调处理函数。

listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,公有数据 client_data 也为 null。

(3.3) 获取就绪socket并处理事件-aeMain()

// file: src/ae.c/** * 循环接管申请 *  * @param *eventLoop */ void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    // 循环    while (!eventLoop->stop) {        // 处理事件         aeProcessEvents(eventLoop, AE_ALL_EVENTS|                                   AE_CALL_BEFORE_SLEEP|                                   AE_CALL_AFTER_SLEEP);    }}
// 处理事件   返回解决完的事件个数0 不做任何解决1 AE_FILE_EVENTS  解决文件事件2 AE_TIME_EVENTS  解决工夫事件 3 AE_ALL_EVENTS   所有事件 4 AE_DONT_WAIT 8 AE_CALL_BEFORE_SLEEP 16 AE_CALL_AFTER_SLEEP int aeProcessEvents(aeEventLoop *eventLoop, int flags){    int processed = 0, numevents;        struct timeval tv, *tvp;        // 如果eventLoop解决前的函数不为空,就执行        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)            eventLoop->beforesleep(eventLoop);        // 调用多路复用 API,仅在超时或某些事件触发时返回           // 解决文件事件,阻塞工夫由tvp决定         numevents = aeApiPoll(eventLoop, tvp);        // 解决后的函数不为空        /* After sleep callback. */        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)            eventLoop->aftersleep(eventLoop);        for (j = 0; j < numevents; j++) {            // 先从eventLoop->fired[j]获取已就绪事件构造体(aeFiredEvent) 获取fd后 再从eventLoop->events注册事件里获取对应的事件构造体(aeFileEvent)             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];                        // ...            // 如果可读            if (!invert && fe->mask & mask & AE_READABLE) {                // 调用读事件回调函数 对应 acceptTcpHandler                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);                fired++;                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */            }            // 如果可写 触发写事件            if (fe->mask & mask & AE_WRITABLE) {                if (!fired || fe->wfileProc != fe->rfileProc) {                    // 调用写事件回调函数 对应 acceptTcpHandler                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);                    fired++;                }            }            processed++;        }        return processed; /* return the number of processed file/time events */}
// file: src/ae_poll.c /** * 获取就绪事件 *  * @param *eventLoop * @param *tvp */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {    // 期待事件    aeApiState *state = eventLoop->apidata;    int retval, numevents = 0;    // 调linux epoll_wait函数来获取已就绪socket    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);    // ...    return numevents;}

aeProcessEvents 就是调用 epoll_wait 来获取就绪socket 。
当发现有某个socket上数据就绪当前,则调用当时注册的事件处理器函数 rfileProc 和 wfileProc。

参考资料

Redis高性能IO模型 https://weikeqin.com/2022/01/...

Redis源码分析与实战 学习笔记 Day9 09 | Redis事件驱动框架(上):何时应用select、poll、epoll? https://time.geekbang.org/col...