前言:
假如服务器的硬件资源“富余”,那么进步服务器性能的一个很间接的办法就是空间换工夫,即“节约”服务器的硬件资源,以换取其运行效率。晋升服务器性能的一个重要办法就是采纳“池”的思路,即对一组资源在服务器启动之初就被齐全创立好并初始化,这称为动态资源分配。当服务器进入正式运行阶段,即开始解决客户端申请时,如果它须要相干资源就能够间接从池中获取,无需动态分配。很显然,间接从池中获得所须要资源比动态分配资源的速度快得多,因为调配系统资源的零碎调用都是很耗时的。当服务器解决完一个客户端连贯后,能够把相干资源放回池中,毋庸执行零碎调用开释资源。从最终成果来看,资源分配和回收的零碎调用只产生在服务器的启动和完结,这种“池”的形式防止了两头的工作处理过程对内核的频繁拜访,进步了服务器的性能。咱们罕用的线程池和内存池都是基于以上“池”的劣势所设计进去的晋升服务器性能的办法,明天打算以C++98设计一个基于Linux零碎的简略线程池。
一、提出疑难:为什么要采纳线程池?
首先想一想,咱们个别的服务器都是动态创建子线程来实现并发服务器的,比方每当有一个客户端申请建设连贯时咱们就动静调用pthread_create去创立线程去解决该连贯申请。这种模式有什么毛病呢?
动态创建线程是比拟费时的,这将会导致较慢的客户响应。
动态创建的子线程通常只用来为一个客户服务,这将导致系统上产生大量的轻微线程,线程切换也会消耗CPU工夫。
所以咱们为了进一步晋升服务器性能,能够采取“池”的思路,把线程的创立放在程序的初始化阶段一次实现,这就防止了动态创建线程导致服务器响应申请的性能降落。
二、解决疑难:线程池的设计思路
1、以单例模式设计线程池,保障线程池全剧惟一;
2、在获取线程池实例进行线程池初始化:线程事后创立+工作队列创立;
3、创立一个工作类,咱们实在的工作会继承该类,实现工作执行。
线程池模式
依据以上思路咱们能够给出这么一个线程池类的框架:
class ThreadPool{private: std::queue<Task*> taskQueue; //工作队列 bool isRunning; //线程池运行标记 pthread_t* pThreadSet; //指向线程id汇合的指针 int threadsNum; //线程数目 pthread_mutex_t mutex; //互斥锁 pthread_cond_t condition; //条件变量 //单例模式,保障全局线程池只有一个 ThreadPool(int num=10); void createThreads(); //创立内存池 void clearThreads(); //回收线程 void clearQueue(); //清空工作队列 static void* threadFunc(void* arg); Task* takeTask(); //工作线程获取工作public: void addTask(Task* pTask); //工作入队 static ThreadPool* createThreadPool(int num=10); //静态方法,用于创立线程池实例 ~ThreadPool(); int getQueueSize(); //获取工作队列中的工作数目 int getThreadlNum(); //获取线程池中线程总数目 };
(1)单例模式下的线程池的初始化
首先咱们以饿汉单例模式来设计这个线程池,以保障该线程池全局惟一:
1、构造函数私有化
2、提供一个动态函数来获取线程池对象
//饿汉模式,线程平安ThreadPool* ThreadPool::createThreadPool(int num){ static ThreadPool* pThreadPoolInstance = new ThreadPool(num); return pThreadPoolInstance;}123456
ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
线程池对象初始化时咱们须要做三件事:
相干变量的初始化(线程池状态、互斥锁、条件变量等)+工作队列的创立+线程事后创立
ThreadPool::ThreadPool(int num):threadsNum(num){ printf("creating threads pool...n"); isRunning = true; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&condition, NULL); createThreads(); printf("created threads pool successfully!n");}123456789
线程池的数目依据对象创立时输出的数目来创立,如果不指定数目,咱们就是应用默认数目10个。
void ThreadPool::createThreads(){ pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum); for(int i=0;i<threadsNum;i++) { pthread_create(&pThreadSet[i], NULL, threadFunc, this); }}12345678
(2)工作增加和线程调度
对于每一个服务申请咱们都能够看作是一个工作,一个工作来了咱们就将它送进线程池中的工作队列中,并通过条件变量的形式告诉线程池中的闲暇线程去拿工作去实现。那问题来了,这里的工作在编程的层面上看到底是什么?咱们能够将工作看成是一个回调函数,将要执行的函数指针往工作队列外面送就能够了,咱们线程拿到这个指针后运行该函数就等于实现服务申请。基于以上的思考,咱们设计了一个独自的形象工作类,让子类继承。类外面有个纯虚函数run(),用于执行相应操作。
思考到回调函数须要传参数进来,所以特意设置了个指针arg来存储参数地址,到时候咱们就能够依据该指针解析出传入的函数实参是什么了。
工作基类
class Task{public: Task(void* a = NULL): arg(a) { } void SetArg(void* a) { arg = a; } virtual int run()=0;protected: void* arg;};typedef struct{ int task_id; std::string task_name;}msg_t;class MyTask: public Task{public: int run() { msg_t* msg = (msg_t*)arg; printf("working thread[%lu] : task_id:%d task_name:%sn", pthread_self(), msg->task_id, msg->task_name.c_str()); sleep(10); return 0; }};123456789101112131415161718192021222324252627282930313233343536373839
真正应用该类时就本人定义一个子类继承Task类,并实现run()函数,并通过SetArg()办法去设置传入的参数。比方能够这么用:
msg_t msg[10];MyTask task_A[10];//模仿生产者生产工作for(int i=0;i<10;i++){ msg[i].task_id = i; sprintf(buf,"qq_task_%d",i); msg[i].task_name = buf; task_A[i].SetArg(&msg[i]); pMyPool->addTask(&task_A[i]); sleep(1);}12345678910111213
当初来到线程池设计中最难搞的中央:线程调度。一个工作来了,到底怎么让闲暇线程去拿工作去做呢?咱们又如何保障闲暇的线程一直地去拿工作呢?
形象而言,这是一个生产者消费者的模型,零碎一直往工作队列里送工作,咱们通过互斥锁和条件变量来管制工作的退出和获取,线程每当闲暇时就会去调用takeTask()去拿工作。如果队列没工作那么一些没取得互斥锁的线程就会拥塞期待(因为没锁),取得互斥锁的那个线程会因为没工作而拥塞期待。一旦有工作就会唤醒这个带锁线程拿走工作开释互斥锁。看看代码层面是如何操作的:
退出一个工作
void ThreadPool::addTask(Task* pTask){ pthread_mutex_lock(&mutex); taskQueue.push(pTask); printf("one task is put into queue! Current queue size is %lun",taskQueue.size()); pthread_mutex_unlock(&mutex); pthread_cond_signal(&condition);}12345678
取走一个工作
Task* ThreadPool::takeTask(){ Task* pTask = NULL; while(!pTask) { pthread_mutex_lock(&mutex); //线程池运行失常但工作队列为空,那就期待工作的到来 while(taskQueue.empty() && isRunning) { pthread_cond_wait(&condition, &mutex); } if(!isRunning) { pthread_mutex_unlock(&mutex); break; } else if(taskQueue.empty()) { pthread_mutex_unlock(&mutex); continue; } pTask = taskQueue.front(); taskQueue.pop(); pthread_mutex_unlock(&mutex); } return pTask;}12345678910111213141516171819202122232425262728293031
线程中的回调函数。这里留神的是,如果取到的工作为空,咱们认为是线程池敞开的信号(线程池销毁时咱们会在析构函数中调用pthread_cond_broadcast(&condition)来告诉线程来拿工作,拿到的当然是空指针),咱们退出该线程。
void* ThreadPool::threadFunc(void* arg){ ThreadPool* p = (ThreadPool*)arg; while(p->isRunning) { Task* task = p->takeTask(); //如果取到的工作为空,那么咱们完结这个线程 if(!task) { //printf("%lu thread will shutdown!n", pthread_self()); break; } printf("take one...n"); task->run(); }}123456789101112131415161718
(3)应用例子和测试
上面给出一个线程池的一个应用例子。能够看出,我首先定义了msg_t的构造体,这是因为咱们的服务响应函数是带参数的,所以咱们定义了这个构造体并把其地址作为参数传进线程池中去(通过SetArg办法)。而后咱们也定义了一个工作类MyTask继承于Task,并重写了run办法。咱们要执行的服务函数就能够写在run函数之中。当须要往工作队列投放工作时调用addTask()就能够了,而后线程池会本人安顿工作的散发,外界毋庸关怀。所以一个线程池执行工作的过程能够简化为:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool
#include <stdio.h>#include "thread_pool.h"#include <string>#include <stdlib.h>typedef struct{ int task_id; std::string task_name;}msg_t;class MyTask: public Task{public: int run() { msg_t* msg = (msg_t*)arg; printf("working thread[%lu] : task_id:%d task_name:%sn", pthread_self(), msg->task_id, msg->task_name.c_str()); sleep(10); return 0; }};int main() { ThreadPool* pMyPool = ThreadPool::createThreadPool(5); char buf[32] = {0}; msg_t msg[10]; MyTask task_A[10]; //模仿生产者生产工作 for(int i=0;i<10;i++) { msg[i].task_id = i; sprintf(buf,"qq_task_%d",i); msg[i].task_name = buf; task_A[i].SetArg(&msg[i]); pMyPool->addTask(&task_A[i]); sleep(1); } while(1) { //printf("there are still %d tasks need to processn", pMyPool->getQueueSize()); if (pMyPool->getQueueSize() == 0) { printf("Now I will exit from mainn"); break; } sleep(1); } delete pMyPool; return 0;}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
程序具体运行的逻辑是,咱们建设了一个5个线程大小的线程池,而后咱们又生成了10个工作,往工作队列里放。因为线程数小于工作数,所以当每个线程都拿到本人的工作时,工作队列中还有5个工作待处理,而后有些线程解决完本人的工作了,又去队列里取工作,直到所有工作被解决完了,循环完结,销毁线程池,退出程序。
本群收费分享学习材料(C/C++,Linux,golang,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,ffmpeg,TCP/IP,协程,DPDK,嵌入式)等,交换探讨支付材料请加群Q:1106675687。