关于redis:Redis-事件机制是如何实现的

2次阅读

共计 5597 个字符,预计需要花费 14 分钟才能阅读完成。

前言

咱们都晓得,Redis 是单线程(非谨严),你是否想过,一个线程要如何解决来自各个客户端的各种申请呢?它忙的过去吗?没错,它还真的能忙过去,并且还东倒西歪。其中多亏了 IO 多路复用,而不仅仅是它,事件机制在其中也是一个不错的设计。

之前我提到过有对于 IO 多路复用对于 Redis 的影响,IO 多路复用和多线程会影响 Redis 分布式锁吗?其中有局部内容其实曾经提到了,所以本文会更加关注于事件机制自身。

PS:Redis 高版本曾经反对多线程解决某些事件,为了简化,这里不做探讨,故下文呈现的单线程仅是形容那些必须单线程执行的场景。

前置常识

  • IO 多路复用

尝试思考

首先,让咱们来思考一下,如果是咱们本人来实现,会尝试如何去做。

对于申请连贯解决的思考

最笨的办法,那么就是来一个客户端 accept 一次,而后给什么申请做什么事件,先来先做,做完走人,对吧。那显然这样太慢了,要晓得作为一个缓存,这样设计要把人给急死。

当然,咱们也能够说,来一个我开一个线程独自解决你,相当于你一来我就独自找人为你服务,而服务的人最终会将申请给到一个解决核心,让解决核心对立去解决,而后将后果返回。但显然 Redis 没有那么多资源让你节约。

于是要找人帮忙,那就是 IO 多路复用,至多它能帮我解决后面服务的问题,fd 我就不论了,间接通知我哪些人来了,并且通知我有事的是那些人。

反观机制的思考

既然 epoll_wait 能 通知咱们有那些 socket 曾经就绪,那么咱们就解决就绪的这些就能够了。但咱们须要一个正当的机制来帮咱们来优雅的解决他们,毕竟 Redis 前面只有个单线程在解决。因为解决没这么快,必定须要一个中央来寄存未解决的这些事件,那很正当就能想到须要一个相似 buffer 的货色。

所以,对于这个事件机制,我第一个想法就是弄个队列,或者 ringbuffer 来搞,那不就是一个生产消费者模型吗?

事件机制

那么上面咱们就来看看 Redis 它是如何设计。

分类

首先 Redis 分了两类事件

  • fileEvents 文件事件,就是咱们之前提到的申请的解决,咱们也次要探讨这个
  • timedEvents 定时事件,没错必定有一些定时工作触发的事件在外面

文件事件解决

OK,看完图咱们就有了一个大抵的印象,为了灵便的解决不同的事件,须要将事件调配给处理器去解决,这里也是咱们之前思考的时候没有想到的一个设计。通常来说对于任何的解决往往都有这样一个分配器去调配所有的工作,这样能够让扩大更加灵便,如果后续有新的类型,只须要扩大出一个新的处理器就能够了。

源码剖析

https://github.com/redis/redis/blob/9b1d4f003de1b141ea850f01e7104e7e5c670620/src/ae.c#L493
首先入口在 aeMain 这个简略,就是循环,也正是这个循环解决着所有的事件,咱们能够看到,只有不停 (stop),就会始终循环解决

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

而后就是咱们重点的 aeProcessEvents 办法,其中重点就是调用 aeApiPoll 获取以后就绪的事件,而后你就能看到咱们的 aeFileEvent 也就是文件事件了,最初还有 processTimeEvents 解决定时事件。那么事件自身,是如何解决的呢?就是 rfileProc 和 wfileProc 一个解决读一个解决写。那么问题来了,这两个办法具体是什么呢?卖个关子,咱们先瞅一眼 aeApiPoll

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
        int64_t usUntilTimer;

        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
            eventLoop->beforesleep(eventLoop);

        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else if (flags & AE_TIME_EVENTS) {usUntilTimer = usUntilEarliestTimer(eventLoop);
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            }
        }
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. 留神这里!!!!!!!!!!!!!!*/
        numevents = aeApiPoll(eventLoop, tvp);

        /* Don't process file events if not requested. */
        if (!(flags & AE_FILE_EVENTS)) {numevents = 0;}

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                /* rfileProc 在解决什么事件呢?*/
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                /* wfileProc 在解决什么事件呢?*/
                if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

这里其余都不重要,重点就在咱们相熟的 epoll_wait,获取所有就绪的 fd 也就能晓得所有须要解决的事件了。


static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

好了,咱们来解密到底 rfileProcwfileProc 是什么,aeCreateFileEvent 办法是用于创立 FileEvent 的办法,其中的入参外面有 aeFileProc 没错就是它了。依据不同的类型用不同的 handler 创立不同的 event。也就是说,最终的解决形式是通过参数传递进去的。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

小思考🤔

如果是我设计,或者绝大多数状况下就是弄一个对象,而对象依据具体的事件类型执行不同的解决逻辑。最多用一个 <mark style=”background: #FFB86CA6;”> 策略模式 </mark> 可能就入地了。而 Redis 的这样的设计思路,相似一种闭包的设计,或者说函数式编程的一种思路吧,将具体的解决对象,解决形式,处理结果,统统蕴含在内。咱们先不说这样的设计好不好,但给我的第一印象是,这样的设计会让我感觉最终执行的整个解决会更加连贯, 并且解决的时候执行的全副逻辑是高度一致的,而解决形式的自身真正做到了可扩大

总结

那咱们通过 Redis 的事件机制能学到什么呢?

  1. 这个事件机制的模型很通用也很清晰,蕴含:接管、循环、解决,三个局部,很规范的设计
  2. 其中对于工作的解决有一个专门的分配器去调配,这在很多 handler 的设计中十分实用,相熟 java 的同学应该晓得 DispatcherServlet 没错这样的模型会更加的清晰
  3. 易于扩大,这里的扩大有两方面一方面是对于处理器的扩大,之后有其余事件类型只须要减少事件处理器就能够了;而另一方面这里的扩大还包含了多线程的扩大,不便了同时反对多个事件的解决。
    其实,Redis 的事件机制是一个规范的 Reactor 模式 是一种基于事件驱动的设计模式,所以咱们更多的是要学到这样设计模式,来使用到当前的编码中,能够更清晰也易扩大。

参考链接

  • 这一篇是写的真的好,从 IO 多路复用的根底原理始终推导到 Redis 的事件机制,如果没有后面铺垫的同学倡议肯定看一下,真正的由浅到深 https://betterprogramming.pub/internals-workings-of-redis-718…
  • Redis 设计与实现 http://redisbook.com/preview/event/file_event.html
  • Reactor 模式 https://zhuanlan.zhihu.com/p/347779760
正文完
 0