关于redis:Redis有哪些事件

51次阅读

共计 5298 个字符,预计需要花费 14 分钟才能阅读完成。

(1) Redis 里的事件

Redis 事件驱动框架是如何以事件模式,解决 server 运行过程中面临的申请操作和多种工作的。

(1.1) 事件循环构造体

// file: src/ae.h

/**
 * 基于事件的程序的状态
 * State of an event based program 
 */
typedef struct aeEventLoop {
    int maxfd;   // 以后注册的最大文件描述符
    int setsize; // 跟踪的最大文件描述符数
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; // 注册事件数组的指针 指向 aeFileEvent 数组   
    aeFiredEvent *fired; // 就绪事件数组的指针  指向 aeFiredEvent 数组
    aeTimeEvent *timeEventHead;  // 工夫事件
    int stop;
    void *apidata; // 指向 aeApiState 构造体  创立的 epoll 对象就在 aeApiState->epfd
    aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数 
    aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数
    int flags;
} aeEventLoop;
// file: src/ae.h 

/**
 * 文件事件构造
 * File event structure 
 */
typedef struct aeFileEvent {
    int mask;    // 标记 可读 / 可写 / 屏障
    aeFileProc *rfileProc;  // 写事件回调
    aeFileProc *wfileProc;  // 读事件回调
    void *clientData;       // 扩大数据
} aeFileEvent;

(1.2) 事件对象的初始化

//file: src/server.c

void initServer(void) {

    // 2.1 创立 epoll
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    
    // 省略局部代码

}

事件的个数对应入参里的 server.maxclients+CONFIG_FDSET_INCR
server.maxclients 变量的值大小,能够在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。
CONFIG_FDSET_INCR 的大小 = 32 + 96

(1.2.1) IO 多路复用模块初始化

Redis 在操作系统提供的 epoll 对象根底上又封装了一个 eventLoop 进去,所以创立的时候是先申请和创立 eventLoop。

// file: src/ae.c 

/**
 * 创立 aeEventLoop 构造体
 *
 * @param setsize
 */
aeEventLoop *aeCreateEventLoop(int setsize) {

    aeEventLoop *eventLoop;
    // ... 省略局部代码 

    eventLoop = zmalloc(sizeof(*eventLoop))
    // 未来的各种回调事件就都会存在这里
    // eventLoop->events 是一个指针 指向数组 元素类型:aeFileEvent  大小:setsize
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

    // ... 省略局部代码

    // 创立 epoll
    aeApiCreate(eventLoop)
}

(1.3) IO 事件处理

Redis 的 IO 事件次要包含三类,别离是可读事件、可写事件和屏障事件。

读事件:从客户端读取数据
写事件:向客户端写入数据。
屏障事件的次要作用是用来反转事件的解决程序。

(1.3.1) IO 事件创立 / 注册事件

// file:  src/ae.c

/**
 * @param *eventLoop
 * @param fd 
 * @param mask 0: 未注册事件  1: 描述符可读时触发  2: 描述符可写时触发  3:
 * @param *proc aeFileProc 类型  入参传的是 acceptTcpHandler 函数 回调时会用到这个函数 
 * @param *clientData
 */ 
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    // 从 aeFileEvent 事件数组里取出一个文件事件构造
    aeFileEvent *fe = &eventLoop->events[fd];

    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    // 设置文件事件类型 以及事件的处理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;  // 设置读事件回调
    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 设置写事件回调

    // 公有数据
    fe->clientData = clientData;

    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}
//file: src/ae_epoll.c

// 增加事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // ... 

    // epoll_ctl 增加事件 
    epoll_ctl(state->epfd,op,fd,&ee);
    return 0;
}

(1.3.2) 读事件处理

当 Redis server 接管到客户端的连贯申请时,就会应用注册好的 acceptTcpHandler 函数进行解决。

acceptTcpHandler -> acceptCommonHandler -> createClient-> aeCreateFileEvent

aeCreateFileEvent 函数会针对已连贯套接字上,创立监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient。

到这里,事件驱动框架就减少了对一个客户端已连贯套接字的监听。一旦客户端有申请发送到 server,框架就会回调 readQueryFromClient 函数解决申请。这样一来,客户端申请就能通过事件驱动框架进行解决了。

// file: src/networking.c

/**
 * @param *conn
 */
client *createClient(connection *conn) {
    // 为用户连贯创立 client 构造体
    client *c = zmalloc(sizeof(client));

    if (conn) {
        // ... 解决连贯

        // 注册读事件处理器,等连贯可读时调用   回调函数是 readQueryFromClient 
        connSetReadHandler(conn, readQueryFromClient);
        // 会把新创建的 client 构造体放到 conn 构造体的 private_data 字段里
        connSetPrivateData(conn, c);
        
    }

    // 设置 client 的一些参数
    selectDb(c,0);
    uint64_t client_id = ++server.next_client_id;
    c->id = client_id;
    c->resp = 2;
    c->conn = conn;
    // ...

    return c;
}

(1.3.3) 写事件处理

Redis 实例在收到客户端申请后,会在解决客户端命令后,将要返回的数据写入客户端输入缓冲区。

beforeSleep -> handleClientsWithPendingWrites

(1.4) 工夫事件处理


typedef struct aeTimeEvent {
    long long id;  // 工夫事件 ID
    long when_sec;  // 事件达到的秒级工夫戳
    long when_ms;  // 事件达到的毫秒级工夫戳
    aeTimeProc *timeProc;  // 工夫事件触发后的处理函数
    aeEventFinalizerProc *finalizerProc;  // 事件完结后的处理函数
    void *clientData;  // 事件相干的公有数据
    struct aeTimeEvent *prev;  // 工夫事件链表的前向指针
    struct aeTimeEvent *next;  // 工夫事件链表的后向指针
} aeTimeEvent;

(1.4.1) 工夫事件创立

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

(1.4.2) 工夫事件的触发


int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    // 省略局部代码

    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

 
    // 链表节点
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    // 遍历链表
    while(te) {
        long now_sec, now_ms;
        long long id;
 
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            te->refcount++;

            // 解决
            retval = te->timeProc(eventLoop, id, te->clientData);

            te->refcount--;
            processed++;
            if (retval != AE_NOMORE) {aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {te->id = AE_DELETED_EVENT_ID;}
        }
        te = te->next;
    }
    return processed;
}

Redis 高性能 IO 模型 https://weikeqin.com/2022/01/…

Redis 源码分析与实战 学习笔记 Day11 11 Redis 事件驱动框架(下):Redis 有哪些事件?
https://time.geekbang.org/col…

正文完
 0