前言
为了晋升执行命令前后的网络I/O性能,Redis6.0引入了Threaded I/O。
上面就来一起学习一下新个性Threaded I/O。
本文浏览程序
- Redis是如何运行的
- 命令执行前后产生了什么
- Threaded I/O 模型
- Threaded I/O 的实现于机制
- 总结与思考
- Redis源码学习思路
Redis是如何运行的
循环处理事件
Redis的函数入口在server.c中,main()办法流程如下图所示
在main()办法中Redis首先须要做的是初始化各种库以及服务配置。具体举例:
- crc64_init()会初始化一个crc校验用的Lookup Table
- getRandomBytes()为hashseed填充随机元素作为初始化值,用作哈希表的seed
- …
- initServerConfig()中执行了大量对server对象属性的初始化操作:
- 初始化server.runid,如29e05f486b8d41e68234a68c8b77edaff101c194
- 获取以后的时区信息,寄存至server.timezone中
- 初始化server.next_client_id值,使得连贯进来的客户端id从1开始自增
- …
- ACLInit()是对Redis 6.0新增的ACL零碎的初始化操作,包含初始化用户列表、ACL日志、默认用户等信息
- 通过moduleInitModulesSystem()和tlsInit()初始化模块零碎和SSL等
- …
初始化完结后,开始读取用户的启动参数,和大多数配置加载过程相似,Redis也通过字符串匹配等剖析用户输出的argc和argv[],这个过程中可能会产生:
- 获取到配置文件门路,批改server.configfile的值,后续用于加载配置文件
- 获取到启动选项参数,如loadmodule和对应的Module文件门路,保留至options变量中
解析完参数之后,执行loadServerConfig(),读取配置文件并与命令行参数options的内容进行合并,组成一个config变量,并且一一将name和value设置进configs列表中。对于每个config,有对应的switch-case的代码。
例如对于loadmodule,会执行queueLoadModule()办法,以实现真正的配置加载:
... } else if (!strcasecmp(argv[0],"logfile") && argc == 2) { ... } else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) { queueLoadModule(argv[1],&argv[2],argc-2); } else if (!strcasecmp(argv[0],"sentinel")) {...
回到main办法的流程,Redis会开始打印启动的日志,执行initServer()办法,服务依据配置项,持续为server对象初始化内容,例如:
- 创立事件循环构造体aeEventLoop(定义在ae.h),赋值给server.el
- 依据配置的db数目,调配大小为sizeof(redisDb) * dbnum的内存空间,server.db保留这块空间的地址指针
- 每个db都是一个redisDb构造,将这个构造中的保留key、保留过期工夫等的字典初始化为空dict
- …
尔后就是一些依据不同运行模式的初始化,例如惯例模式运行时会记录惯例日志、加载磁盘长久化的数据;而在sentinel模式运行时记录哨兵日志,不加载数据等。
在所有筹备操作都实现后,Redis开始陷入aeMain()的事件循环,在这个循环中会一直执行aeProcessEvents()解决产生的各种事件,直到Redis完结退出。
事件类型
Redis中存在有两种类型的事件:
- 工夫事件
- 文件事件
工夫事件也就是到了肯定工夫会产生的事件,在Redis中它们被记录成一个链表,每次创立新的事件事件的时候,都会在链表头部插入一个aeTimeEvent节点(头插法),其中保留了该事件会在何时产生,须要调用什么样的办法解决。
遍历整个链表咱们能够晓得离最近要产生的工夫事件还有多久,因为链表外面的节点依照自增id顺序排列,而在产生工夫的维度上时乱序的。
文件事件能够看作I/O引起的事件,客户端发送命令会让服务端产生一个读I/O,对应一个读事件;同样当客户端期待服务端音讯的时候须要变得可写,让服务端写入内容,因而会对应一个写事件。AE_READABLE事件会在客户端建设连贯、发送命令或其余连贯变得可读的时候产生,而AE_WRITABLE事件则会在客户端连贯变得可写的时候产生。
文件事件的构造简略很多,aeFileEvent记录了这是一个可读事件还是可写事件,对应的解决办法,以及用户数据。
/* File event structure */typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; /* 读事件处理办法 */ aeFileProc *wfileProc; /* 写事件处理办法 */ void *clientData;} aeFileEvent;
如果同时产生了两种事件,Redis会优先解决AE_READABLE事件。
aeProcessEvents
aeProcessEvents()办法解决曾经产生和行将产生的各种事件。
在aeMain()循环进入aeProcessEvents()后,Redis首先查看下一次的工夫事件会在什么时候产生,在还没有工夫事件产生的这段时间内,能够调用多路复用的API aeApiPoll()阻塞并期待文件事件的产生。如果没有文件事件产生,那么超时后返回0,否则返回已产生的文件事件数量numevents。
在有文件事件可解决的状况下,Redis会调用AE_READABLE事件的rfileProc办法以及AE_WRITABLE事件的wfileProc办法进行解决:
- 通常状况为先进行读事件,而后进行写事件
- 如果设置了AE_BARRIER,则做相同的事件,读事件永远在写事件之后。
... if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }...
在实现后面的解决后,Redis会持续调用processTimeEvents()解决工夫事件。遍历整个工夫事件链表,如果此时曾经过了一段时间(阻塞期待或解决文件事件耗时),有工夫事件产生,那么就调用对应工夫事件的timeProc办法,将所有曾经过期的工夫事件处理掉:
... if (te->when <= now) { ... retval = te->timeProc(eventLoop, id, te->clientData); ... processed++; ... }...
如果执行了文件事件之后还没有到最近的工夫事件产生点,那么本次aeMain()循环中将没有工夫事件被执行,进入下一次循环。
附上源码
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want to call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv, *tvp; long msUntilTimer = -1; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) msUntilTimer = msUntilEarliestTimer(eventLoop); if (msUntilTimer >= 0) { tv.tv_sec = msUntilTimer / 1000; tv.tv_usec = (msUntilTimer % 1000) * 1000; tvp = &tv; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */}
命令执行前后产生了什么
在客户端连贯上Redis的时候,通过执行connSetReadHandler(conn, readQueryFromClient),设置了当读事件产生时,应用readQueryFromClient()作为读事件的Handler。
在收到客户端的命令申请时,Redis进行一些检查和统计后,调用read()办法将连贯中的数据读取进client.querybuf音讯缓冲区中:
void readQueryFromClient(connection *conn) { ... nread = connRead(c->conn, c->querybuf+qblen, readlen); ...static inline int connRead(connection *conn, void *buf, size_t buf_len) { return conn->type->read(conn, buf, buf_len);}static int connSocketRead(connection *conn, void *buf, size_t buf_len) { int ret = read(conn->fd, buf, buf_len); ...}
而后进入processInputBuffer(c)开始读取输出缓冲区中的音讯,最初进入processCommand(c)开始解决输出的命令。
在命令执行失去后果后,首先会寄存在client.buf中,并且调用调用addReply(client c, robj obj)办法,将这个client对象追加到server.clients_pending_write列表中。此时当次的命令,或者说AE_READABLE事件就曾经根本处理完毕了,除了一些额定的统计数据、后处理以外,不会再进行发送响应音讯的动作。
在以后aeProcessEvents()办法完结后,进入下一次的循环,第二次循环调用I/O多路复用接口期待文件事件产生前,Redis会查看server.clients_pending_write是否有客户端须要进行回复,若有,遍历指向各个待回复客户端的server.clients_pending_write列表,通过listDelNode()一一将客户端从中删除,并将待回复的内容通过writeToClient(c,0)回复进来。
int writeToClient(client *c, int handler_installed) { ... nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); ...static inline int connWrite(connection *conn, const void *data, size_t data_len) { return conn->type->write(conn, data, data_len);}static int connSocketWrite(connection *conn, const void *data, size_t data_len) { int ret = write(conn->fd, data, data_len); ...}
Threaded I/O模型
I/O问题与Threaded I/O的引入
如果要说Redis会有什么性能问题,那么从I/O角度,因为它没有像其余Database一样应用磁盘,所以不存在磁盘I/O的问题。
在数据进入缓冲区前及从缓冲区写至Socket时,存在肯定的网络I/O,特地是写I/O对性能影响比拟大。以往咱们会思考做管道化来减小网络I/O的开销,或者将Redis部署成Redis集群来晋升性能。
在Redis 6.0之后,因为Threaded I/O的引入,Redis开始反对对网络读写的线程化,让更多的线程参加进这部分动作中,同时放弃命令的单线程执行。这样的改变从某种程度上说能够既晋升性能,但又防止将命令执行线程化而须要引入锁或者其余形式解决并行执行的竞态问题。
Threaded I/O 在做什么?
在老版本的实现中,Redis将不同client的命令执行后果保留在各自的client.buf中,而后把待回复的client寄存在一个列表里,最初在事件循环中一一将buf的内容写至对应Socket。对应在新版本中,Redis应用多个线程实现这部分操作。
对读操作,Redis同样地为server对象新增了一个clients_pending_read属性,当读事件来长期,判断是否满足线程化读的条件,如果满足,那么执行提早读操作,将这个client对象增加到server.clients_pending_read列表中。和写操作一样,留到下一次事件循环时应用多个线程实现读操作。
Threaded I/O的实现与限度
Init阶段
在Redis启动时,如果满足对应参数配置,会进行I/O线程初始化的操作。
Redis会进行一些惯例查看,配置数是否合乎开启多线程I/O的要求。
/* Initialize the data structures needed for threaded I/O. */void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); }...
创立一个长度为线程数的io_threads_list列表,列表的每个元素都是另一个列表L,L将会用来寄存对应线程待处理的多个client对象。
... /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */...
对于主线程,初始化操作到这里就完结了。
... /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); setIOPendingCount(i, 0); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; }}...
io_threads_mutex 是一个互斥锁列表,io_threads_mutex[i] 即第 i 个线程的锁,用于后续阻塞I/O线程操作,初始化之后将其临时锁定。而后再对每个线程执行创立操作,tid即其指针,保留至io_threads列表中。新的线程会始终执行IOThreadMain办法。
Reads/Writes
多线程的读写次要在handleClientsWithPendingReadsUsingThreads()和handleClientsWithPendingWritesUsingThreads()中实现,因为两者简直是对称的,所以这里只对读操作进行解说。
同样,Redis会进行惯例查看,是否启用线程化读写并且启用线程化读(只开启前者则只有写操作是线程化),以及是否有期待读取的客户端。
/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */int handleClientsWithPendingReadsUsingThreads(void) { if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);...
这里将server.clients_pending_read的列表转化为不便遍历的链表,而后将列表的每个节点(*client对象)以相似Round-Robin(轮询调度)的形式调配个各个线程,线程执行各个client的读写程序并不需要保障,命令到达的先后顺序曾经由server.clients_pending_read/write列表记录,后续也会按这个程序执行。
/* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; }...
设置状态标记,标识以后处于多线程读的状态。因为标记的存在,Redis的Threaded I/O刹时只能处于读或写的状态,不能局部线程读,局部写。
.../* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_READ;...
为每个线程记录下各自须要解决的客户端数量。当不同线程读取到本人的pending长度不为0时,就会开始进行解决。留神 j 从1开始,意味着主线程的pending长度始终为0,因为主线程马上要在这个办法中同步实现本人的工作,不须要晓得期待的工作数。
...for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); setIOPendingCount(j, count); }...
主线程此时将本人要解决的client解决完。
.../* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]);...
陷入循环期待,pending等于各个线程残余工作数之和,当所有线程都没有工作的时候,本轮I/O解决完结。
... /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } if (tio_debug) printf("I/O READ All threads finshed\n");...
咱们曾经在各自线程中将conn中的内容读取至对应client的client.querybuf输出缓冲区中,所以能够遍历server.clients_pending_read列表,串行地进行命令执行操作,同时将client从列表中移除。
... /* Run the list of clients again to process the new buffers. */ while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ continue; } processInputBuffer(c); /* We may have pending replies if a thread readQueryFromClient() produced * replies and did not install a write handler (it can't). */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); }...
解决实现,将解决的数量加到统计属性上,而后返回。
... /* Update processed count on server */ server.stat_io_reads_processed += processed; return processed;
IOThreadMain
后面还有每个线程具体的工作内容没有解释,它们会始终陷在IOThreadMain的循环中,期待执行读写的机会。
照常执行一些初始化内容。
void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsigned long)myid; char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist);...
线程会检测本人的待处理的client列表长度,当期待队列长度大于0时往下执行,否则会到死循环终点。
这里利用互斥锁,让主线程有机会加锁,使得I/O线程卡在执行pthread_mutex_lock(),达到让I/O线程进行工作的成果。
... while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } /* Give the main thread a chance to stop this thread. */ if (getIOPendingCount(id) == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } serverAssert(getIOPendingCount(id) != 0); if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));...
将io_threads_list[i]的客户端列表转化为不便遍历的链表,一一遍历,借助io_threads_op标记判断以后是要执行多线程读还是多线程写,实现对本人要解决的客户端的操作。
... /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } }...
清空本人要解决的客户端列表,并且将本人的待处理数量批改为0,完结本轮操作。
...listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); if (tio_debug) printf("[%ld] Done\n", id); }}
Limitation
通过查看代码,应用上Threaded I/O的启用受以下条件影响:
- 配置项io-threads须要大于1,否则会持续应用单线程操作读写I/O
- 配置项io-threads-do-reads管制读I/O是否应用线程化
- 对于提早读取,由postponeClientRead()办法管制。办法中除了配置要求外,还须要以后client不能是主从模型的角色,也不能处于曾经期待下次事件循环线程化读取CLIENT_PENDING_READ的状态。在这个办法中client对象会被增加到期待队列中,并且将client的状态改为CLIENT_PENDING_READ。
- 对于多线程写I/O,由handleClientsWithPendingWritesUsingThreads()中的stopThreadedIOIfNeeded()办法加以限度。除了对应配置项要满足要求外,server.clients_pending_write的长度须要大于等于配置线程数的两倍,例如配置应用6线程,当写队列长度小于12时会持续应用单线程I/O。
- I/O线程在initThreadedIO()被创立前,互斥锁处于加锁状态,因而线程不能进行理论的工作解决。server对象的io_threads_active属性默认会处于敞开状态,在进行首次多线程写之前才会被开启。这意味着服务启动后的读操作依然会应用单线程读,产生执行后果到写的pending list中,在第二次循环中,服务判断是否有配置启用TIO,将server.io_threads_active属性关上,而后进行多线程写操作,从下一次循环开始TIO能力被作用于读操作上。上一点说过写I/O会有配置和队列长度断定,在断定不须要TIO写时,会从新把server.io_threads_active敞开,意味着只管你曾经在配置文件外面关上TIO读,然而Redis依然会依据负载时不时跳过应用它。
总结与思考
Redis 6.0引入的Threaded I/O,将Socket读写提早和线程化,在网络I/O的方向上给Redis带来了肯定的性能晋升,并且应用门槛比拟低,用户无需做太多的变更,即可在不影响业务的状况下白嫖闲暇的线程资源。
一方面,这部分的晋升可能还难以让处于Redis 5甚至Redis 3版本的用户有足够的能源进行降级,特地是思考到很多业务场景中Redis的性能并没有差到成为瓶颈,而且新版本的福利也未通过大规模验证,势必会影响到企业级利用中更多用户关注的服务稳定性。同时,TIO的晋升比照集群性能仿佛还有肯定的差距,这可能更加会让本来就处于集群架构的企业用户疏忽这个性能。
从稳定性的角度上来看,新版本的性能未通过大规模验证,是否值得降级还有待考据。
本次的版本能够说是Redis从诞生至今最大的更新,不只有Threaded I/O,包含RESP3、ACLs和SSL,有趣味的同学能够自行去理解。
Redis源码学习思路
办法
README.md应该是咱们理解Redis的入口,而不是全局搜寻main()办法。请关注Redis internals大节下的内容,这里介绍了Redis的代码构造,Redis每个文件都是一个“general idea”,其中server.c和network.c的局部逻辑和代码在本文曾经介绍过了,长久化相干的aof.c和rdb.c、数据库相干的db.c、Redis对象相干的object.c、复制相干的replication.c等都值得注意。其余包含Redis的命令是以什么样的模式编码的,也能在README.md中找到答案,这样能够不便咱们进一步浏览代码时疾速定位。
工具
工具的话我举荐visual code即可,装置好C/C++后,Mac基本上能够无压力浏览源码。Windows装置好mingw-w64即可。
另外几个代码的关键点,其实也在本文中呈现过:
- main(),终点
- initServer(),初始化
- aeMain(),事件循环
- readQueryFromClient(),读事件的Handler
- processInputBuffer(),命令解决的入口
如果像本文一样想理解Network的内容,能够在aeMain()处打断点,而后关注中network.c中的办法;如果想关注具体命令相干的内容,能够在processInputBuffer()处打断点,而后关注$command.c或者相似文件中的办法,README.md文件里也曾经介绍过命令办法的命名格局,定位非常容易。其余经常出现的其余动作,例如长久化、复制等,大略会呈现在命令执行的前后,或者工夫事件内,也可能在beforeSleep()中。server.h中定义的redisServer和client是Redis中两个十分重要的构造,在业务上很多内容都是转化为对它们的属性的相干操作,要特地注意。
参考文档
https://github.com/redis/redis
https://github.com/redis/redis-doc
https://redis.io/documentation
https://www.youtube.com/c/Redislabs/videos
文|夜铭
关注得物技术,携手走向技术的云端