共计 16707 个字符,预计需要花费 42 分钟才能阅读完成。
前言
为了晋升执行命令前后的网络 I / O 性能,Redis6.0 引入了 Threaded I/O。
上面就来一起学习一下新个性 Threaded I/O。
本文浏览程序
- Redis 是如何运行的
- 命令执行前后产生了什么
- Threaded I/O 模型
- Threaded I/O 的实现于机制
- 总结与思考
- Redis 源码学习思路
Redis 是如何运行的
循环处理事件
Redis 的函数入口在 server.c 中,main()办法流程如下图所示
在 main()办法中 Redis 首先须要做的是 初始化各种库以及服务配置。具体举例:
- crc64_init()会初始化一个 crc 校验用的 Lookup Table
- getRandomBytes()为 hashseed 填充随机元素作为初始化值,用作哈希表的 seed
- …
- initServerConfig()中执行了大量对 server 对象属性的初始化操作:
- 初始化 server.runid,如 29e05f486b8d41e68234a68c8b77edaff101c194
- 获取以后的时区信息,寄存至 server.timezone 中
- 初始化 server.next_client_id 值,使得连贯进来的客户端 id 从 1 开始自增
- …
- ACLInit()是对 Redis 6.0 新增的 ACL 零碎的初始化操作,包含初始化用户列表、ACL 日志、默认用户等信息
- 通过 moduleInitModulesSystem()和 tlsInit()初始化模块零碎和 SSL 等
- …
初始化完结后,开始 读取用户的启动参数,和大多数配置加载过程相似,Redis 也通过字符串匹配等剖析用户输出的 argc 和 argv[],这个过程中可能会产生:
- 获取到配置文件门路,批改 server.configfile 的值,后续用于加载配置文件
- 获取到启动选项参数,如 loadmodule 和对应的 Module 文件门路,保留至 options 变量中
解析完参数之后,执行 loadServerConfig(),读取配置文件并与命令行参数 options 的内容进行合并,组成一个 config 变量,并且一一将 name 和 value 设置进 configs 列表中。对于每个 config,有对应的 switch-case 的代码。
例如对于 loadmodule,会执行 queueLoadModule()办法,以实现真正的配置加载:
... | |
} else if (!strcasecmp(argv[0],"logfile") && argc == 2) {...} else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) {queueLoadModule(argv[1],&argv[2],argc-2); | |
} else if (!strcasecmp(argv[0],"sentinel")) { | |
... |
回到 main 办法的流程,Redis 会开始打印启动的日志,执行 initServer()办法,服务依据配置项,持续 为server对象初始化内容,例如:
- 创立事件循环构造体 aeEventLoop(定义在 ae.h),赋值给 server.el
- 依据配置的 db 数目,调配大小为 sizeof(redisDb) * dbnum 的内存空间,server.db 保留这块空间的地址指针
- 每个 db 都是一个 redisDb 构造,将这个构造中的保留 key、保留过期工夫等的字典初始化为空 dict
- …
尔后就是一些依据不同运行模式的初始化,例如惯例模式运行时会记录惯例日志、加载磁盘长久化的数据;而在 sentinel 模式运行时记录哨兵日志,不加载数据等。
在所有筹备操作都实现后,Redis 开始陷入 aeMain() 的事件循环,在这个循环中会一直执行 aeProcessEvents() 解决产生的各种事件,直到 Redis 完结退出。
事件类型
Redis 中存在有两种类型的事件:
- 工夫事件
- 文件事件
工夫事件也就是到了肯定工夫会产生的事件,在 Redis 中它们被记录成一个链表,每次创立新的事件事件的时候,都会在链表头部插入一个 aeTimeEvent 节点(头插法),其中保留了该事件会在何时产生,须要调用什么样的办法解决。
遍历整个链表咱们能够晓得离最近要产生的工夫事件还有多久,因为链表外面的节点依照自增 id 顺序排列,而在产生工夫的维度上时乱序的。
文件事件能够看作 I / O 引起的事件,客户端发送命令会让服务端产生一个读 I /O,对应一个读事件;同样当客户端期待服务端音讯的时候须要变得可写,让服务端写入内容,因而会对应一个写事件。AE_READABLE 事件会在客户端建设连贯、发送命令或其余连贯变得可读的时候产生,而 AE_WRITABLE 事件则会在客户端连贯变得可写的时候产生。
文件事件的构造简略很多,aeFileEvent 记录了这是一个可读事件还是可写事件,对应的解决办法,以及用户数据。
/* File event structure */ | |
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ | |
aeFileProc *rfileProc; /* 读事件处理办法 */ | |
aeFileProc *wfileProc; /* 写事件处理办法 */ | |
void *clientData; | |
} aeFileEvent; |
如果同时产生了两种事件,Redis 会优先解决 AE_READABLE 事件。
aeProcessEvents
aeProcessEvents()办法解决曾经产生和行将产生的各种事件。
在 aeMain()循环进入 aeProcessEvents()后,Redis 首先查看下一次的工夫事件会在什么时候产生,在还没有工夫事件产生的这段时间内,能够调用多路复用的 API aeApiPoll()阻塞并期待文件事件的产生。如果没有文件事件产生,那么超时后返回 0,否则返回已产生的文件事件数量 numevents。
在有文件事件可解决的状况下,Redis 会调用 AE_READABLE 事件的 rfileProc 办法以及 AE_WRITABLE 事件的 wfileProc 办法进行解决:
- 通常状况为先进行读事件,而后进行写事件
- 如果设置了 AE_BARRIER,则做相同的事件,读事件永远在写事件之后。
... | |
if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
fe = &eventLoop->events[fd]; | |
} | |
if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
if (invert) {fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ | |
if ((fe->mask & mask & AE_READABLE) && | |
(!fired || fe->wfileProc != fe->rfileProc)) | |
{fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
... |
在实现后面的解决后,Redis 会持续调用 processTimeEvents()解决工夫事件。遍历整个工夫事件链表,如果此时曾经过了一段时间(阻塞期待或解决文件事件耗时),有工夫事件产生,那么就调用对应工夫事件的 timeProc 办法,将所有曾经过期的工夫事件处理掉:
... | |
if (te->when <= now) { | |
... | |
retval = te->timeProc(eventLoop, id, te->clientData); | |
... | |
processed++; | |
... | |
} | |
... |
如果执行了文件事件之后还没有到最近的工夫事件产生点,那么本次 aeMain()循环中将没有工夫事件被执行,进入下一次循环。
附上源码
/* Process every pending time event, then every pending file event | |
* (that may be registered by time event callbacks just processed). | |
* Without special flags the function sleeps until some file event | |
* fires, or when the next time event occurs (if any). | |
* | |
* If flags is 0, the function does nothing and returns. | |
* if flags has AE_ALL_EVENTS set, all the kind of events are processed. | |
* if flags has AE_FILE_EVENTS set, file events are processed. | |
* if flags has AE_TIME_EVENTS set, time events are processed. | |
* if flags has AE_DONT_WAIT set the function returns ASAP until all | |
* the events that's possible to process without to wait are processed. | |
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. | |
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. | |
* | |
* The function returns the number of events processed. */ | |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) | |
{ | |
int processed = 0, numevents; | |
/* Nothing to do? return ASAP */ | |
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; | |
/* Note that we want to call select() even if there are no | |
* file events to process as long as we want to process time | |
* events, in order to sleep until the next time event is ready | |
* to fire. */ | |
if (eventLoop->maxfd != -1 || | |
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { | |
int j; | |
struct timeval tv, *tvp; | |
long msUntilTimer = -1; | |
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) | |
msUntilTimer = msUntilEarliestTimer(eventLoop); | |
if (msUntilTimer >= 0) { | |
tv.tv_sec = msUntilTimer / 1000; | |
tv.tv_usec = (msUntilTimer % 1000) * 1000; | |
tvp = &tv; | |
} else { | |
/* If we have to check for events but need to return | |
* ASAP because of AE_DONT_WAIT we need to set the timeout | |
* to zero */ | |
if (flags & AE_DONT_WAIT) { | |
tv.tv_sec = tv.tv_usec = 0; | |
tvp = &tv; | |
} else { | |
/* Otherwise we can block */ | |
tvp = NULL; /* wait forever */ | |
} | |
} | |
if (eventLoop->flags & AE_DONT_WAIT) { | |
tv.tv_sec = tv.tv_usec = 0; | |
tvp = &tv; | |
} | |
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) | |
eventLoop->beforesleep(eventLoop); | |
/* Call the multiplexing API, will return only on timeout or when | |
* some event fires. */ | |
numevents = aeApiPoll(eventLoop, tvp); | |
/* After sleep callback. */ | |
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) | |
eventLoop->aftersleep(eventLoop); | |
for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; | |
int mask = eventLoop->fired[j].mask; | |
int fd = eventLoop->fired[j].fd; | |
int fired = 0; /* Number of events fired for current fd. */ | |
/* Normally we execute the readable event first, and the writable | |
* event later. This is useful as sometimes we may be able | |
* to serve the reply of a query immediately after processing the | |
* query. | |
* | |
* However if AE_BARRIER is set in the mask, our application is | |
* asking us to do the reverse: never fire the writable event | |
* after the readable. In such a case, we invert the calls. | |
* This is useful when, for instance, we want to do things | |
* in the beforeSleep() hook, like fsyncing a file to disk, | |
* before replying to a client. */ | |
int invert = fe->mask & AE_BARRIER; | |
/* Note the "fe->mask & mask & ..." code: maybe an already | |
* processed event removed an element that fired and we still | |
* didn't processed, so we check if the event is still valid. | |
* | |
* Fire the readable event if the call sequence is not | |
* inverted. */ | |
if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ | |
} | |
/* Fire the writable event. */ | |
if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
/* If we have to invert the call, fire the readable event now | |
* after the writable one. */ | |
if (invert) {fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ | |
if ((fe->mask & mask & AE_READABLE) && | |
(!fired || fe->wfileProc != fe->rfileProc)) | |
{fe->rfileProc(eventLoop,fd,fe->clientData,mask); | |
fired++; | |
} | |
} | |
processed++; | |
} | |
} | |
/* Check time events */ | |
if (flags & AE_TIME_EVENTS) | |
processed += processTimeEvents(eventLoop); | |
return processed; /* return the number of processed file/time events */ | |
} |
命令执行前后产生了什么
在客户端连贯上 Redis 的时候,通过执行 connSetReadHandler(conn, readQueryFromClient),设置了当读事件产生时,应用 readQueryFromClient()作为读事件的 Handler。
在收到客户端的命令申请时,Redis 进行一些检查和统计后,调用 read()办法将连贯中的数据读取进 client.querybuf 音讯缓冲区中:
void readQueryFromClient(connection *conn) { | |
... | |
nread = connRead(c->conn, c->querybuf+qblen, readlen); | |
... | |
static inline int connRead(connection *conn, void *buf, size_t buf_len) {return conn->type->read(conn, buf, buf_len); | |
} | |
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {int ret = read(conn->fd, buf, buf_len); | |
... | |
} |
而后进入 processInputBuffer(c)开始读取输出缓冲区中的音讯,最初进入 processCommand(c)开始解决输出的命令。
在命令执行失去后果后,首先会寄存在 client.buf 中,并且调用调用 addReply(client c, robj obj)办法,将这个 client 对象追加到 server.clients_pending_write 列表中。此时当次的命令,或者说 AE_READABLE 事件就曾经根本处理完毕了,除了一些额定的统计数据、后处理以外,不会再进行发送响应音讯的动作。
在以后 aeProcessEvents()办法完结后,进入 下一次的循环,第二次循环调用 I / O 多路复用接口期待文件事件产生前,Redis 会查看 server.clients_pending_write 是否有客户端须要进行回复,若有,遍历指向各个待回复客户端的 server.clients_pending_write 列表,通过 listDelNode()一一将客户端从中删除,并将待回复的内容通过 writeToClient(c,0)回复进来。
int writeToClient(client *c, int handler_installed) { | |
... | |
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); | |
... | |
static inline int connWrite(connection *conn, const void *data, size_t data_len) {return conn->type->write(conn, data, data_len); | |
} | |
static int connSocketWrite(connection *conn, const void *data, size_t data_len) {int ret = write(conn->fd, data, data_len); | |
... | |
} |
Threaded I/ O 模型
I/ O 问题与 Threaded I/ O 的引入
如果要说 Redis 会有什么性能问题,那么从 I / O 角度,因为它没有像其余 Database 一样应用磁盘,所以不存在磁盘 I / O 的问题。
在数据进入缓冲区前及从缓冲区写至 Socket 时,存在肯定的网络 I /O,特地是写 I / O 对性能影响比拟大。以往咱们会思考做管道化来减小网络 I / O 的开销,或者将 Redis 部署成 Redis 集群来晋升性能。
在 Redis 6.0 之后,因为 Threaded I/ O 的引入,Redis 开始反对对网络读写的线程化,让更多的线程参加进这部分动作中,同时放弃命令的单线程执行。这样的改变从某种程度上说能够既晋升性能,但又防止将命令执行线程化而须要引入锁或者其余形式解决并行执行的竞态问题。
Threaded I/O 在做什么?
在老版本的实现中,Redis 将不同 client 的命令执行后果保留在各自的 client.buf 中,而后把待回复的 client 寄存在一个列表里,最初在事件循环中一一将 buf 的内容写至对应 Socket。对应在新版本中,Redis 应用多个线程实现这部分操作。
对读操作,Redis 同样地为 server 对象新增了一个 clients_pending_read 属性,当读事件来长期,判断是否满足线程化读的条件,如果满足,那么执行提早读操作,将这个 client 对象增加到 server.clients_pending_read 列表中。和写操作一样,留到下一次事件循环时应用多个线程实现读操作。
Threaded I/ O 的实现与限度
Init 阶段
在 Redis 启动时,如果满足对应参数配置,会进行 I / O 线程初始化的操作。
Redis 会进行一些惯例查看,配置数是否合乎开启多线程 I / O 的要求。
/* Initialize the data structures needed for threaded I/O. */ | |
void initThreadedIO(void) { | |
server.io_threads_active = 0; /* We start with threads not active. */ | |
/* Don't spawn any thread if the user selected a single thread: | |
* we'll handle I/O directly from the main thread. */ | |
if (server.io_threads_num == 1) return; | |
if (server.io_threads_num > IO_THREADS_MAX_NUM) { | |
serverLog(LL_WARNING,"Fatal: too many I/O threads configured." | |
"The maximum number is %d.", IO_THREADS_MAX_NUM); | |
exit(1); | |
} | |
... |
创立一个长度为线程数的 io_threads_list 列表,列表的每个元素都是另一个列表 L,L 将会用来寄存对应线程待处理的多个 client 对象。
... | |
/* Spawn and initialize the I/O threads. */ | |
for (int i = 0; i < server.io_threads_num; i++) { | |
/* Things we do for all the threads including the main thread. */ | |
io_threads_list[i] = listCreate(); | |
if (i == 0) continue; /* Thread 0 is the main thread. */ | |
... |
对于主线程,初始化操作到这里就完结了。
... | |
/* Things we do only for the additional threads. */ | |
pthread_t tid; | |
pthread_mutex_init(&io_threads_mutex[i],NULL); | |
setIOPendingCount(i, 0); | |
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ | |
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); | |
exit(1); | |
} | |
io_threads[i] = tid; | |
} | |
} | |
... |
io_threads_mutex 是一个互斥锁列表,io_threads_mutex[i] 即第 i 个线程的锁,用于后续阻塞 I / O 线程操作,初始化之后将其临时锁定。而后再对每个线程执行创立操作,tid 即其指针,保留至 io_threads 列表中。新的线程会始终执行 IOThreadMain 办法。
Reads/Writes
多线程的读写次要在 handleClientsWithPendingReadsUsingThreads()和 handleClientsWithPendingWritesUsingThreads()中实现,因为两者简直是对称的,所以这里只对读操作进行解说。
同样,Redis 会进行惯例查看,是否启用线程化读写并且启用线程化读(只开启前者则只有写操作是线程化),以及是否有期待读取的客户端。
/* When threaded I/O is also enabled for the reading + parsing side, the | |
* readable handler will just put normal clients into a queue of clients to | |
* process (instead of serving them synchronously). This function runs | |
* the queue using the I/O threads, and process them in order to accumulate | |
* the reads in the buffers, and also parse the first command available | |
* rendering it in the client structures. */ | |
int handleClientsWithPendingReadsUsingThreads(void) {if (!server.io_threads_active || !server.io_threads_do_reads) return 0; | |
int processed = listLength(server.clients_pending_read); | |
if (processed == 0) return 0; | |
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed); | |
... |
这里将 server.clients_pending_read 的列表转化为不便遍历的链表,而后将列表的每个节点(*client 对象)以相似 Round-Robin(轮询调度)的形式调配个各个线程,线程执行各个 client 的读写程序并不需要保障,命令到达的先后顺序曾经由 server.clients_pending_read/write 列表记录,后续也会按这个程序执行。
/* Distribute the clients across N different lists. */ | |
listIter li; | |
listNode *ln; | |
listRewind(server.clients_pending_read,&li); | |
int item_id = 0; | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
int target_id = item_id % server.io_threads_num; | |
listAddNodeTail(io_threads_list[target_id],c); | |
item_id++; | |
} | |
... |
设置状态标记,标识以后处于多线程读的状态。因为标记的存在,Redis 的 Threaded I/ O 刹时只能处于读或写的状态,不能局部线程读,局部写。
... | |
/* Give the start condition to the waiting threads, by setting the | |
* start condition atomic var. */ | |
io_threads_op = IO_THREADS_OP_READ; | |
... |
为每个线程记录下各自须要解决的客户端数量。当不同线程读取到本人的 pending 长度不为 0 时,就会开始进行解决。留神 j 从 1 开始,意味着主线程的 pending 长度始终为 0,因为主线程马上要在这个办法中同步实现本人的工作,不须要晓得期待的工作数。
... | |
for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]); | |
setIOPendingCount(j, count); | |
} | |
... |
主线程此时将本人要解决的 client 解决完。
... | |
/* Also use the main thread to process a slice of clients. */ | |
listRewind(io_threads_list[0],&li); | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
readQueryFromClient(c->conn); | |
} | |
listEmpty(io_threads_list[0]); | |
... |
陷入循环期待,pending 等于各个线程残余工作数之和,当所有线程都没有工作的时候,本轮 I / O 解决完结。
... | |
/* Wait for all the other threads to end their work. */ | |
while(1) { | |
unsigned long pending = 0; | |
for (int j = 1; j < server.io_threads_num; j++) | |
pending += getIOPendingCount(j); | |
if (pending == 0) break; | |
} | |
if (tio_debug) printf("I/O READ All threads finshed\n"); | |
... |
咱们曾经在各自线程中将 conn 中的内容读取至对应 client 的 client.querybuf 输出缓冲区中,所以能够遍历 server.clients_pending_read 列表,串行地进行命令执行操作,同时将 client 从列表中移除。
... | |
/* Run the list of clients again to process the new buffers. */ | |
while(listLength(server.clients_pending_read)) {ln = listFirst(server.clients_pending_read); | |
client *c = listNodeValue(ln); | |
c->flags &= ~CLIENT_PENDING_READ; | |
listDelNode(server.clients_pending_read,ln); | |
if (processPendingCommandsAndResetClient(c) == C_ERR) { | |
/* If the client is no longer valid, we avoid | |
* processing the client later. So we just go | |
* to the next. */ | |
continue; | |
} | |
processInputBuffer(c); | |
/* We may have pending replies if a thread readQueryFromClient() produced | |
* replies and did not install a write handler (it can't). | |
*/ | |
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) | |
clientInstallWriteHandler(c); | |
} | |
... |
解决实现,将解决的数量加到统计属性上,而后返回。
... | |
/* Update processed count on server */ | |
server.stat_io_reads_processed += processed; | |
return processed; |
IOThreadMain
后面还有每个线程具体的工作内容没有解释,它们会始终陷在 IOThreadMain 的循环中,期待执行读写的机会。
照常执行一些初始化内容。
void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is | |
* used by the thread to just manipulate a single sub-array of clients. */ | |
long id = (unsigned long)myid; | |
char thdname[16]; | |
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); | |
redis_set_thread_title(thdname); | |
redisSetCpuAffinity(server.server_cpulist); | |
... |
线程会检测本人的待处理的 client 列表长度,当期待队列长度大于 0 时往下执行,否则会到死循环终点。
这里利用互斥锁,让主线程有机会加锁,使得 I / O 线程卡在执行 pthread_mutex_lock(),达到让 I / O 线程进行工作的成果。
... | |
while(1) { | |
/* Wait for start */ | |
for (int j = 0; j < 1000000; j++) {if (getIOPendingCount(id) != 0) break; | |
} | |
/* Give the main thread a chance to stop this thread. */ | |
if (getIOPendingCount(id) == 0) {pthread_mutex_lock(&io_threads_mutex[id]); | |
pthread_mutex_unlock(&io_threads_mutex[id]); | |
continue; | |
} | |
serverAssert(getIOPendingCount(id) != 0); | |
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); | |
... |
将 io_threads_list[i]的客户端列表转化为不便遍历的链表,一一遍历,借助 io_threads_op 标记判断以后是要执行多线程读还是多线程写,实现对本人要解决的客户端的操作。
... | |
/* Process: note that the main thread will never touch our list | |
* before we drop the pending count to 0. */ | |
listIter li; | |
listNode *ln; | |
listRewind(io_threads_list[id],&li); | |
while((ln = listNext(&li))) {client *c = listNodeValue(ln); | |
if (io_threads_op == IO_THREADS_OP_WRITE) {writeToClient(c,0); | |
} else if (io_threads_op == IO_THREADS_OP_READ) {readQueryFromClient(c->conn); | |
} else {serverPanic("io_threads_op value is unknown"); | |
} | |
} | |
... |
清空本人要解决的客户端列表,并且将本人的待处理数量批改为 0,完结本轮操作。
... | |
listEmpty(io_threads_list[id]); | |
setIOPendingCount(id, 0); | |
if (tio_debug) printf("[%ld] Done\n", id); | |
} | |
} |
Limitation
通过查看代码,应用上 Threaded I/ O 的启用受以下条件影响:
- 配置项 io-threads 须要大于 1,否则会持续应用单线程操作读写 I /O
- 配置项 io-threads-do-reads 管制读 I / O 是否应用线程化
- 对于提早读取,由 postponeClientRead()办法管制。办法中除了配置要求外,还须要以后 client 不能是主从模型的角色,也不能处于曾经期待下次事件循环线程化读取 CLIENT_PENDING_READ 的状态。在这个办法中 client 对象会被增加到期待队列中,并且将 client 的状态改为 CLIENT_PENDING_READ。
- 对于多线程写 I /O,由 handleClientsWithPendingWritesUsingThreads()中的 stopThreadedIOIfNeeded()办法加以限度。除了对应配置项要满足要求外,server.clients_pending_write 的长度须要大于等于配置线程数的两倍,例如配置应用 6 线程,当写队列长度小于 12 时会持续应用单线程 I /O。
- I/ O 线程在 initThreadedIO()被创立前,互斥锁处于加锁状态,因而线程不能进行理论的工作解决。server 对象的 io_threads_active 属性默认会处于敞开状态,在进行首次多线程写之前才会被开启。这意味着服务启动后的读操作依然会应用单线程读,产生执行后果到写的 pending list 中,在第二次循环中,服务判断是否有配置启用 TIO,将 server.io_threads_active 属性关上,而后进行多线程写操作,从下一次循环开始 TIO 能力被作用于读操作上。上一点说过写 I / O 会有配置和队列长度断定,在断定不须要 TIO 写时,会从新把 server.io_threads_active 敞开,意味着只管你曾经在配置文件外面关上 TIO 读,然而 Redis 依然会依据负载时不时跳过应用它。
总结与思考
Redis 6.0 引入的 Threaded I/O,将 Socket 读写提早和线程化,在网络 I / O 的方向上给 Redis 带来了肯定的性能晋升,并且应用门槛比拟低,用户无需做太多的变更,即可在不影响业务的状况下白嫖闲暇的线程资源。
一方面,这部分的晋升可能还难以让处于 Redis 5 甚至 Redis 3 版本的用户有足够的能源进行降级,特地是思考到很多业务场景中 Redis 的性能并没有差到成为瓶颈,而且新版本的福利也未通过大规模验证,势必会影响到企业级利用中更多用户关注的服务稳定性。同时,TIO 的晋升比照集群性能仿佛还有肯定的差距,这可能更加会让本来就处于集群架构的企业用户疏忽这个性能。
从稳定性的角度上来看,新版本的性能未通过大规模验证,是否值得降级还有待考据。
本次的版本能够说是 Redis 从诞生至今最大的更新,不只有 Threaded I/O,包含 RESP3、ACLs 和 SSL,有趣味的同学能够自行去理解。
Redis 源码学习思路
办法
README.md 应该是咱们理解 Redis 的入口,而不是全局搜寻 main()办法。请关注 Redis internals 大节下的内容,这里介绍了 Redis 的代码构造,Redis 每个文件都是一个“general idea”,其中 server.c 和 network.c 的局部逻辑和代码在本文曾经介绍过了,长久化相干的 aof.c 和 rdb.c、数据库相干的 db.c、Redis 对象相干的 object.c、复制相干的 replication.c 等都值得注意。其余包含 Redis 的命令是以什么样的模式编码的,也能在 README.md 中找到答案,这样能够不便咱们进一步浏览代码时疾速定位。
工具
工具的话我举荐 visual code 即可,装置好 C /C++ 后,Mac 基本上能够无压力浏览源码。Windows 装置好 mingw-w64 即可。
另外几个代码的关键点,其实也在本文中呈现过:
- main(),终点
- initServer(),初始化
- aeMain(),事件循环
- readQueryFromClient(),读事件的 Handler
- processInputBuffer(),命令解决的入口
如果像本文一样想理解 Network 的内容,能够在 aeMain()处打断点,而后关注中 network.c 中的办法;如果想关注具体命令相干的内容,能够在 processInputBuffer()处打断点,而后关注 $command.c 或者相似文件中的办法,README.md 文件里也曾经介绍过命令办法的命名格局,定位非常容易。其余经常出现的其余动作,例如长久化、复制等,大略会呈现在命令执行的前后,或者工夫事件内,也可能在 beforeSleep()中。server.h 中定义的 redisServer 和 client 是 Redis 中两个十分重要的构造,在业务上很多内容都是转化为对它们的属性的相干操作,要特地注意。
参考文档
https://github.com/redis/redis
https://github.com/redis/redis-doc
https://redis.io/documentation
https://www.youtube.com/c/Redislabs/videos
文|夜铭
关注得物技术,携手走向技术的云端