在2020年5月推出的 Redis 6.0 版本中,Redis 在执行模型中应用了多线程来解决 IO 工作,这样设计的目标,是为了充分利用以后服务器的多核个性,应用多核运行多线程,让多线程帮忙减速数据读取、命令解析以及数据写回的速度,晋升 Redis 整体性能。
那么,这些多线程具体是在什么时候启动,又是通过什么形式来解决 IO 申请的呢?
(1) IO多线程
Redis里的IO多线程是指Redis Server读取客户端申请或者向客户端写数据时,应用多个线程,利用CPU资源,放慢整体读写速度。
(2) IO多线程的原理
IO多线程的原理是在CPU资源、内存资源利用不充沛的状况下,开启多个线程能够充分利用CPU资源,放慢整体读写速度。
(3) 源码解析
// file: server.c/** * main办法 */int main(int argc, char **argv) { // InitServerLast();}
// file: server.c/** * 服务器初始化的一些步骤须要最初实现(在加载模块之后)。 * 具体来说,因为 ld.so 中的竞争谬误导致线程创立,其中线程本地存储初始化与 dlopen 调用发生冲突。 * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */void InitServerLast() { // bioInit(); // 初始化IO线程 initThreadedIO(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory();}
// file: networking.c/** * 初始化线程 I/O 所需的数据结构。 */void initThreadedIO(void) { // io_threads_active 初始化为 0,示意 IO 线程还没有被激活 server.io_threads_active = 0; /* We start with threads not active. */ // 如果用户抉择了单个线程,则不要生成任何线程:咱们将间接从主线程解决 I/O。 // 只有1个主 IO 线程 if (server.io_threads_num == 1) return; // 最多有128个IO线程 if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } // 生成并初始化 I/O 线程。 for (int i = 0; i < server.io_threads_num; i++) { // 咱们为包含主线程在内的所有线程所做的事件。 // 链表 io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */ // 咱们只为额定的线程做的事件。 pthread_t tid; // 初始化io_threads_mutex数组 pthread_mutex_init(&io_threads_mutex[i],NULL); // 初始化io_threads_pending数组 io_threads_pending[i] = 0; pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ // 调用pthread_create函数创立IO线程,线程运行函数为IOThreadMain if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } // 初始化io_threads数组,设置值为线程标识 io_threads[i] = tid; }}
// file: networking.c/** * */void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsigned long)myid; char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable(); while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; } /* Give the main thread a chance to stop this thread. */ if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } serverAssert(io_threads_pending[id] != 0); if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; // 链表节点 listNode *ln; // 获取IO线程要解决的客户端列表 listRewind(io_threads_list[id],&li); // 遍历链表 while((ln = listNext(&li))) { // 获取链表节点的值 也就是一个客户端 client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { // 写操作 writeToClient(c,0); // 将数据写到客户端 } else if (io_threads_op == IO_THREADS_OP_READ) { // 读操作 readQueryFromClient(c->conn); // 取数据 } else { serverPanic("io_threads_op value is unknown"); } } // 解决完所有客户端后,清空链表 listEmpty(io_threads_list[id]); // 将该线程的待处理工作数量设置为0 io_threads_pending[id] = 0; if (tio_debug) printf("[%ld] Done\n", id); }}
每一个IO线程运行时,都会一直查看是否有期待它解决的客户端。如果有,就依据操作类型,从客户端读取数据或是将数据写回客户端。
这也是为什么咱们把这些线程称为IO线程的起因。
(4) 从客户端读取数据
readQueryFromClient
void readQueryFromClient(connection *conn) { // 从连贯的公有数据获取client client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; // 在退出事件循环时查看咱们是否想稍后从客户端读取。 如果启用了线程 I/O,就会呈现这种状况。 // 判断是否稍后解决 从客户端读数据 if (postponeClientRead(c)) return; /* Update total number of reads on server */ server.stat_total_reads_processed++; readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 内存预调配 扩充sds字符串开端的可用空间 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { // 客户端曾经敞开连贯 serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } // 客户端输出缓冲区中有更多的数据,持续解析,以防查看是否有要执行的残缺命令。 processInputBuffer(c);}
(5) 向客户端写数据
// file: networking.c /* ----------------------------------------------------------------------------- * 更高级别的函数用于在客户端输入缓冲区上对数据进行排队。 * 以下函数是命令实现将调用的函数。 * -------------------------------------------------------------------------- *//* * 将对象“obj”字符串示意增加到客户端输入缓冲区。 * * @param *c redis client * @param *obj 命令执行的后果 类型是redisObject */void addReply(client *c, robj *obj) { // 判断client是否能够接管新数据 (假客户端不能接管) if (prepareClientToWrite(c) != C_OK) return; // 依据redisobject格局把数据写入缓存 if (sdsEncodedObject(obj)) { // obj如果是row或者embstr格局 // 尝试将应答增加到客户端构造中的动态缓冲区。 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 将回复增加到回复列表中。 _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格局 /* 对于整数编码字符串,咱们只需应用优化函数将其转换为字符串,并将后果字符串附加到输入缓冲区。 */ char buf[32]; // 数字转为字符串 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); }}
/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns C_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client * (used to load AOF in memory), a master or because the setup of the write * handler failed, the function returns C_ERR. * * The function may return C_OK without actually installing the write * event handler in the following cases: * * 1) The event handler should already be installed since the output buffer * already contains something. * 2) The client is a slave but not yet online, so we want to just accumulate * writes in the buffer but not actually sending them yet. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */int prepareClientToWrite(client *c) { /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag * is set. */ if ((c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; if (!c->conn) return C_ERR; /* Fake client for AOF loading. */ /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK;}
(6) 如何把待读客户端调配给IO线程执行?
/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */int handleClientsWithPendingReadsUsingThreads(void) { if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; if (tio_debug) printf("%d TOTAL READ pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_READ; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O READ All threads finshed\n"); /* Run the list of clients again to process the new buffers. */ while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); /* Clients can become paused while executing the queued commands, * so we need to check in between each command. If a pause was * executed, we still remove the command and it will get picked up * later when clients are unpaused and we re-queue all clients. */ if (clientsArePaused()) continue; if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ continue; } processInputBuffer(c); /* We may have pending replies if a thread readQueryFromClient() produced * replies and did not install a write handler (it can't). */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); } /* Update processed count on server */ server.stat_io_reads_processed += processed; return processed;}
(7) 如何把待写客户端调配给 IO 线程执行?
int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ /* If I/O threads are disabled or we have few clients to serve, don't * use I/O threads, but thejboring synchronous code. */ if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); } /* Start threads if needed. */ if (!server.io_threads_active) startThreadedIO(); if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; /* Remove clients from the list of pending writes since * they are going to be closed ASAP. */ if (c->flags & CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln); continue; } int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O WRITE All threads finshed\n"); /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); /* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); /* Update processed count on server */ server.stat_io_writes_processed += processed; return processed;}
(8) 总结
1、Redis 6.0 之前,解决客户端申请是单线程,这种模型的毛病是,只能用到「单核」CPU。如果并发量很高,那么在读写客户端数据时,容易引发性能瓶颈,所以 Redis 6.0 引入了多 IO 线程解决这个问题
2、配置文件开启 io-threads N 后,Redis Server 启动时,会启动 N - 1 个 IO 线程(主线程也算一个 IO 线程),这些 IO 线程执行的逻辑是 networking.c 的 IOThreadMain 函数。但默认只开启多线程「写」client socket,如果要开启多线程「读」,还需配置 io-threads-do-reads = yes
3、Redis 在读取客户端申请时,判断如果开启了 IO 多线程,则把这个 client 放到 clients_pending_read 链表中(postponeClientRead 函数),之后主线程在解决每次事件循环之前,把链表数据轮询放到 IO 线程的链表(io_threads_list)中
4、同样地,在写回响应时,是把 client 放到 clients_pending_write 中(prepareClientToWrite 函数),执行事件循环之前把数据轮询放到 IO 线程的链表(io_threads_list)中
5、主线程把 client 散发到 IO 线程时,本人也会读写客户端 socket(主线程也要分担一部分读写操作),之后「期待」所有 IO 线程实现读写,再由主线程「串行」执行后续逻辑
6、每个 IO 线程,不停地从 io_threads_list 链表中取出 client,并依据指定类型读、写 client socket
7、IO 线程在解决读、写 client 时有些许差别,如果 write_client_pedding < io_threads * 2,则间接由「主线程」负责写,不再交给 IO 线程解决,从而节俭 CPU 耗费
8、Redis 官网倡议,服务器起码 4 核 CPU 才倡议开启 IO 多线程,4 核 CPU 倡议开 2-3 个 IO 线程,8 核 CPU 开 6 个 IO 线程,超过 8 个线程性能晋升不大
9、Redis 官网示意,开启多 IO 线程后,性能可晋升 1 倍。当然,如果 Redis 性能足够用,没必要开 IO 线程
(9) 参考资料
https://weikeqin.com/tags/redis/
Redis源码分析与实战 学习笔记 Day13 13 | Redis 6.0多IO线程的效率进步了吗?
https://time.geekbang.org/col...