(1) Redis里的事件
Redis事件驱动框架是如何以事件模式,解决 server 运行过程中面临的申请操作和多种工作的。
(1.1) 事件循环构造体
// file: src/ae.h/** * 基于事件的程序的状态 * State of an event based program */typedef struct aeEventLoop { int maxfd; // 以后注册的最大文件描述符 int setsize; // 跟踪的最大文件描述符数 long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; // 注册事件数组的指针 指向aeFileEvent数组 aeFiredEvent *fired; // 就绪事件数组的指针 指向aeFiredEvent数组 aeTimeEvent *timeEventHead; // 工夫事件 int stop; void *apidata; // 指向aeApiState构造体 创立的epoll对象就在aeApiState->epfd aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数 aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数 int flags;} aeEventLoop;
// file: src/ae.h /** * 文件事件构造 * File event structure */typedef struct aeFileEvent { int mask; // 标记 可读/可写/屏障 aeFileProc *rfileProc; // 写事件回调 aeFileProc *wfileProc; // 读事件回调 void *clientData; // 扩大数据} aeFileEvent;
(1.2) 事件对象的初始化
//file: src/server.cvoid initServer(void) { // 2.1 创立 epoll server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // 省略局部代码}
事件的个数对应入参里的 server.maxclients+CONFIG_FDSET_INCR
,server.maxclients
变量的值大小,能够在 Redis 的配置文件 redis.conf
中进行定义,默认值是 1000。CONFIG_FDSET_INCR
的大小 = 32 + 96
(1.2.1) IO多路复用模块初始化
Redis 在操作系统提供的 epoll 对象根底上又封装了一个 eventLoop 进去,所以创立的时候是先申请和创立 eventLoop。
// file: src/ae.c /** * 创立aeEventLoop构造体 * * @param setsize */aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; // ... 省略局部代码 eventLoop = zmalloc(sizeof(*eventLoop)) // 未来的各种回调事件就都会存在这里 // eventLoop->events是一个指针 指向数组 元素类型:aeFileEvent 大小:setsize eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); // ... 省略局部代码 // 创立epoll aeApiCreate(eventLoop)}
(1.3) IO事件处理
Redis 的 IO 事件次要包含三类,别离是可读事件、可写事件和屏障事件。
读事件:从客户端读取数据
写事件:向客户端写入数据。
屏障事件的次要作用是用来反转事件的解决程序。
(1.3.1) IO事件创立/注册事件
// file: src/ae.c/** * @param *eventLoop * @param fd * @param mask 0:未注册事件 1:描述符可读时触发 2:描述符可写时触发 3: * @param *proc aeFileProc类型 入参传的是 acceptTcpHandler函数 回调时会用到这个函数 * @param *clientData */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData){ if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // 从aeFileEvent事件数组里取出一个文件事件构造 aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定fd的指定事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 设置文件事件类型 以及事件的处理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件回调 if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件回调 // 公有数据 fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK;}
//file: src/ae_epoll.c// 增加事件static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // ... // epoll_ctl 增加事件 epoll_ctl(state->epfd,op,fd,&ee); return 0;}
(1.3.2) 读事件处理
当 Redis server 接管到客户端的连贯申请时,就会应用注册好的 acceptTcpHandler 函数进行解决。
acceptTcpHandler -> acceptCommonHandler -> createClient-> aeCreateFileEvent
aeCreateFileEvent 函数会针对已连贯套接字上,创立监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient。
到这里,事件驱动框架就减少了对一个客户端已连贯套接字的监听。一旦客户端有申请发送到 server,框架就会回调 readQueryFromClient 函数解决申请。这样一来,客户端申请就能通过事件驱动框架进行解决了。
// file: src/networking.c/** * @param *conn */client *createClient(connection *conn) { // 为用户连贯创立client构造体 client *c = zmalloc(sizeof(client)); if (conn) { // ... 解决连贯 // 注册读事件处理器,等连贯可读时调用 回调函数是readQueryFromClient connSetReadHandler(conn, readQueryFromClient); // 会把新创建的client构造体放到 conn构造体的private_data字段里 connSetPrivateData(conn, c); } // 设置client的一些参数 selectDb(c,0); uint64_t client_id = ++server.next_client_id; c->id = client_id; c->resp = 2; c->conn = conn; // ... return c;}
(1.3.3) 写事件处理
Redis 实例在收到客户端申请后,会在解决客户端命令后,将要返回的数据写入客户端输入缓冲区。
beforeSleep -> handleClientsWithPendingWrites
(1.4) 工夫事件处理
typedef struct aeTimeEvent { long long id; // 工夫事件ID long when_sec; // 事件达到的秒级工夫戳 long when_ms; // 事件达到的毫秒级工夫戳 aeTimeProc *timeProc; // 工夫事件触发后的处理函数 aeEventFinalizerProc *finalizerProc; // 事件完结后的处理函数 void *clientData; // 事件相干的公有数据 struct aeTimeEvent *prev; // 工夫事件链表的前向指针 struct aeTimeEvent *next; // 工夫事件链表的后向指针} aeTimeEvent;
(1.4.1) 工夫事件创立
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc){ long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id;}
(1.4.2) 工夫事件的触发
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents; // 省略局部代码 /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */}
static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); // 链表节点 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; // 遍历链表 while(te) { long now_sec, now_ms; long long id; aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; te->refcount++; // 解决 retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed;}
Redis高性能IO模型 https://weikeqin.com/2022/01/...
Redis源码分析与实战 学习笔记 Day11 11 Redis事件驱动框架(下):Redis有哪些事件?
https://time.geekbang.org/col...