(1) Redis里的事件

Redis事件驱动框架是如何以事件模式,解决 server 运行过程中面临的申请操作和多种工作的。

(1.1) 事件循环构造体

// file: src/ae.h/** * 基于事件的程序的状态 * State of an event based program  */typedef struct aeEventLoop {    int maxfd;   // 以后注册的最大文件描述符    int setsize; // 跟踪的最大文件描述符数    long long timeEventNextId;    time_t lastTime;     /* Used to detect system clock skew */    aeFileEvent *events; // 注册事件数组的指针 指向aeFileEvent数组       aeFiredEvent *fired; // 就绪事件数组的指针  指向aeFiredEvent数组    aeTimeEvent *timeEventHead;  // 工夫事件    int stop;    void *apidata; // 指向aeApiState构造体  创立的epoll对象就在aeApiState->epfd    aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数     aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数    int flags;} aeEventLoop;
// file: src/ae.h /** * 文件事件构造 * File event structure  */typedef struct aeFileEvent {    int mask;    // 标记 可读/可写/屏障    aeFileProc *rfileProc;  // 写事件回调    aeFileProc *wfileProc;  // 读事件回调    void *clientData;       // 扩大数据} aeFileEvent;

(1.2) 事件对象的初始化

//file: src/server.cvoid initServer(void) {    // 2.1 创立 epoll    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);        // 省略局部代码}

事件的个数对应入参里的 server.maxclients+CONFIG_FDSET_INCR
server.maxclients 变量的值大小,能够在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。
CONFIG_FDSET_INCR 的大小 = 32 + 96

(1.2.1) IO多路复用模块初始化

Redis 在操作系统提供的 epoll 对象根底上又封装了一个 eventLoop 进去,所以创立的时候是先申请和创立 eventLoop。

// file: src/ae.c /** * 创立aeEventLoop构造体 * * @param setsize */aeEventLoop *aeCreateEventLoop(int setsize) {    aeEventLoop *eventLoop;    // ... 省略局部代码     eventLoop = zmalloc(sizeof(*eventLoop))    // 未来的各种回调事件就都会存在这里    // eventLoop->events是一个指针 指向数组 元素类型:aeFileEvent  大小:setsize    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);    // ... 省略局部代码    // 创立epoll    aeApiCreate(eventLoop)}

(1.3) IO事件处理

Redis 的 IO 事件次要包含三类,别离是可读事件、可写事件和屏障事件。

读事件:从客户端读取数据
写事件:向客户端写入数据。
屏障事件的次要作用是用来反转事件的解决程序。

(1.3.1) IO事件创立/注册事件

// file:  src/ae.c/** * @param *eventLoop * @param fd  * @param mask 0:未注册事件  1:描述符可读时触发  2:描述符可写时触发  3: * @param *proc aeFileProc类型  入参传的是 acceptTcpHandler函数 回调时会用到这个函数  * @param *clientData */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,        aeFileProc *proc, void *clientData){    if (fd >= eventLoop->setsize) {        errno = ERANGE;        return AE_ERR;    }    // 从aeFileEvent事件数组里取出一个文件事件构造    aeFileEvent *fe = &eventLoop->events[fd];    // 监听指定fd的指定事件    if (aeApiAddEvent(eventLoop, fd, mask) == -1)        return AE_ERR;    // 设置文件事件类型 以及事件的处理器    fe->mask |= mask;    if (mask & AE_READABLE) fe->rfileProc = proc;  // 设置读事件回调    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 设置写事件回调    // 公有数据    fe->clientData = clientData;    if (fd > eventLoop->maxfd)        eventLoop->maxfd = fd;    return AE_OK;}
//file: src/ae_epoll.c// 增加事件static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {    aeApiState *state = eventLoop->apidata;    struct epoll_event ee = {0}; /* avoid valgrind warning */    /* If the fd was already monitored for some event, we need a MOD     * operation. Otherwise we need an ADD operation. */    int op = eventLoop->events[fd].mask == AE_NONE ?            EPOLL_CTL_ADD : EPOLL_CTL_MOD;    // ...     // epoll_ctl 增加事件     epoll_ctl(state->epfd,op,fd,&ee);    return 0;}

(1.3.2) 读事件处理

当 Redis server 接管到客户端的连贯申请时,就会应用注册好的 acceptTcpHandler 函数进行解决。

acceptTcpHandler -> acceptCommonHandler -> createClient-> aeCreateFileEvent

aeCreateFileEvent 函数会针对已连贯套接字上,创立监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient。

到这里,事件驱动框架就减少了对一个客户端已连贯套接字的监听。一旦客户端有申请发送到 server,框架就会回调 readQueryFromClient 函数解决申请。这样一来,客户端申请就能通过事件驱动框架进行解决了。

// file: src/networking.c/** * @param *conn */client *createClient(connection *conn) {    // 为用户连贯创立client构造体    client *c = zmalloc(sizeof(client));    if (conn) {        // ... 解决连贯        // 注册读事件处理器,等连贯可读时调用   回调函数是readQueryFromClient         connSetReadHandler(conn, readQueryFromClient);        // 会把新创建的client构造体放到 conn构造体的private_data字段里        connSetPrivateData(conn, c);            }    // 设置client的一些参数    selectDb(c,0);    uint64_t client_id = ++server.next_client_id;    c->id = client_id;    c->resp = 2;    c->conn = conn;    // ...    return c;}

(1.3.3) 写事件处理

Redis 实例在收到客户端申请后,会在解决客户端命令后,将要返回的数据写入客户端输入缓冲区。

beforeSleep -> handleClientsWithPendingWrites

(1.4) 工夫事件处理

typedef struct aeTimeEvent {    long long id;  // 工夫事件ID    long when_sec;  // 事件达到的秒级工夫戳    long when_ms;  // 事件达到的毫秒级工夫戳    aeTimeProc *timeProc;  // 工夫事件触发后的处理函数    aeEventFinalizerProc *finalizerProc;  // 事件完结后的处理函数    void *clientData;  // 事件相干的公有数据    struct aeTimeEvent *prev;  // 工夫事件链表的前向指针    struct aeTimeEvent *next;  // 工夫事件链表的后向指针} aeTimeEvent;

(1.4.1) 工夫事件创立

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,        aeTimeProc *proc, void *clientData,        aeEventFinalizerProc *finalizerProc){    long long id = eventLoop->timeEventNextId++;    aeTimeEvent *te;    te = zmalloc(sizeof(*te));    if (te == NULL) return AE_ERR;    te->id = id;    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);    te->timeProc = proc;    te->finalizerProc = finalizerProc;    te->clientData = clientData;    te->prev = NULL;    te->next = eventLoop->timeEventHead;    te->refcount = 0;    if (te->next)        te->next->prev = te;    eventLoop->timeEventHead = te;    return id;}

(1.4.2) 工夫事件的触发

int aeProcessEvents(aeEventLoop *eventLoop, int flags){    int processed = 0, numevents;    // 省略局部代码    /* Check time events */    if (flags & AE_TIME_EVENTS)        processed += processTimeEvents(eventLoop);    return processed; /* return the number of processed file/time events */}
static int processTimeEvents(aeEventLoop *eventLoop) {    int processed = 0;    aeTimeEvent *te;    long long maxId;    time_t now = time(NULL);     // 链表节点    te = eventLoop->timeEventHead;    maxId = eventLoop->timeEventNextId-1;    // 遍历链表    while(te) {        long now_sec, now_ms;        long long id;         aeGetTime(&now_sec, &now_ms);        if (now_sec > te->when_sec ||            (now_sec == te->when_sec && now_ms >= te->when_ms))        {            int retval;            id = te->id;            te->refcount++;            // 解决            retval = te->timeProc(eventLoop, id, te->clientData);            te->refcount--;            processed++;            if (retval != AE_NOMORE) {                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);            } else {                te->id = AE_DELETED_EVENT_ID;            }        }        te = te->next;    }    return processed;}

Redis高性能IO模型 https://weikeqin.com/2022/01/...

Redis源码分析与实战 学习笔记 Day11 11 Redis事件驱动框架(下):Redis有哪些事件?
https://time.geekbang.org/col...