前言:

假如服务器的硬件资源“富余”,那么进步服务器性能的一个很间接的办法就是空间换工夫,即“节约”服务器的硬件资源,以换取其运行效率。晋升服务器性能的一个重要办法就是采纳“池”的思路,即对一组资源在服务器启动之初就被齐全创立好并初始化,这称为动态资源分配。当服务器进入正式运行阶段,即开始解决客户端申请时,如果它须要相干资源就能够间接从池中获取,无需动态分配。很显然,间接从池中获得所须要资源比动态分配资源的速度快得多,因为调配系统资源的零碎调用都是很耗时的。当服务器解决完一个客户端连贯后,能够把相干资源放回池中,毋庸执行零碎调用开释资源。从最终成果来看,资源分配和回收的零碎调用只产生在服务器的启动和完结,这种“池”的形式防止了两头的工作处理过程对内核的频繁拜访,进步了服务器的性能。咱们罕用的线程池和内存池都是基于以上“池”的劣势所设计进去的晋升服务器性能的办法,明天打算以C++98设计一个基于Linux零碎的简略线程池。

一、提出疑难:为什么要采纳线程池?

首先想一想,咱们个别的服务器都是动态创建子线程来实现并发服务器的,比方每当有一个客户端申请建设连贯时咱们就动静调用pthread_create去创立线程去解决该连贯申请。这种模式有什么毛病呢?

动态创建线程是比拟费时的,这将会导致较慢的客户响应。
动态创建的子线程通常只用来为一个客户服务,这将导致系统上产生大量的轻微线程,线程切换也会消耗CPU工夫。
所以咱们为了进一步晋升服务器性能,能够采取“池”的思路,把线程的创立放在程序的初始化阶段一次实现,这就防止了动态创建线程导致服务器响应申请的性能降落。

二、解决疑难:线程池的设计思路

1、以单例模式设计线程池,保障线程池全剧惟一;
2、在获取线程池实例进行线程池初始化:线程事后创立+工作队列创立;
3、创立一个工作类,咱们实在的工作会继承该类,实现工作执行。

线程池模式

依据以上思路咱们能够给出这么一个线程池类的框架:

class ThreadPool{private:    std::queue<Task*> taskQueue;   //工作队列    bool isRunning;           //线程池运行标记    pthread_t* pThreadSet;  //指向线程id汇合的指针    int threadsNum;         //线程数目    pthread_mutex_t mutex;    //互斥锁    pthread_cond_t condition;   //条件变量    //单例模式,保障全局线程池只有一个    ThreadPool(int num=10);    void createThreads();  //创立内存池    void clearThreads();  //回收线程    void clearQueue();  //清空工作队列    static void* threadFunc(void* arg);    Task* takeTask();  //工作线程获取工作public:    void addTask(Task* pTask);   //工作入队    static ThreadPool* createThreadPool(int num=10); //静态方法,用于创立线程池实例    ~ThreadPool();    int getQueueSize(); //获取工作队列中的工作数目    int getThreadlNum();  //获取线程池中线程总数目 };

(1)单例模式下的线程池的初始化

首先咱们以饿汉单例模式来设计这个线程池,以保障该线程池全局惟一:

1、构造函数私有化
2、提供一个动态函数来获取线程池对象

//饿汉模式,线程平安ThreadPool* ThreadPool::createThreadPool(int num){    static ThreadPool* pThreadPoolInstance = new ThreadPool(num);    return pThreadPoolInstance;}123456

ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
线程池对象初始化时咱们须要做三件事:

相干变量的初始化(线程池状态、互斥锁、条件变量等)+工作队列的创立+线程事后创立

ThreadPool::ThreadPool(int num):threadsNum(num){    printf("creating threads pool...n");    isRunning = true;    pthread_mutex_init(&mutex, NULL);    pthread_cond_init(&condition, NULL);    createThreads();    printf("created threads pool successfully!n");}123456789

线程池的数目依据对象创立时输出的数目来创立,如果不指定数目,咱们就是应用默认数目10个。

void ThreadPool::createThreads(){    pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);    for(int i=0;i<threadsNum;i++)    {        pthread_create(&pThreadSet[i], NULL, threadFunc, this);    }}12345678

(2)工作增加和线程调度

对于每一个服务申请咱们都能够看作是一个工作,一个工作来了咱们就将它送进线程池中的工作队列中,并通过条件变量的形式告诉线程池中的闲暇线程去拿工作去实现。那问题来了,这里的工作在编程的层面上看到底是什么?咱们能够将工作看成是一个回调函数,将要执行的函数指针往工作队列外面送就能够了,咱们线程拿到这个指针后运行该函数就等于实现服务申请。基于以上的思考,咱们设计了一个独自的形象工作类,让子类继承。类外面有个纯虚函数run(),用于执行相应操作。

思考到回调函数须要传参数进来,所以特意设置了个指针arg来存储参数地址,到时候咱们就能够依据该指针解析出传入的函数实参是什么了。

工作基类

class Task{public:    Task(void* a = NULL): arg(a)    {    }    void SetArg(void* a) {        arg = a;    }    virtual int run()=0;protected:    void* arg;};typedef struct{    int task_id;    std::string task_name;}msg_t;class MyTask: public Task{public:    int run() {        msg_t* msg = (msg_t*)arg;        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),               msg->task_id, msg->task_name.c_str());        sleep(10);        return 0;    }};123456789101112131415161718192021222324252627282930313233343536373839

真正应用该类时就本人定义一个子类继承Task类,并实现run()函数,并通过SetArg()办法去设置传入的参数。比方能够这么用:

msg_t msg[10];MyTask task_A[10];//模仿生产者生产工作for(int i=0;i<10;i++){    msg[i].task_id = i;    sprintf(buf,"qq_task_%d",i);    msg[i].task_name = buf;    task_A[i].SetArg(&msg[i]);    pMyPool->addTask(&task_A[i]);    sleep(1);}12345678910111213

当初来到线程池设计中最难搞的中央:线程调度。一个工作来了,到底怎么让闲暇线程去拿工作去做呢?咱们又如何保障闲暇的线程一直地去拿工作呢?

形象而言,这是一个生产者消费者的模型,零碎一直往工作队列里送工作,咱们通过互斥锁和条件变量来管制工作的退出和获取,线程每当闲暇时就会去调用takeTask()去拿工作。如果队列没工作那么一些没取得互斥锁的线程就会拥塞期待(因为没锁),取得互斥锁的那个线程会因为没工作而拥塞期待。一旦有工作就会唤醒这个带锁线程拿走工作开释互斥锁。看看代码层面是如何操作的:

退出一个工作

void ThreadPool::addTask(Task* pTask){    pthread_mutex_lock(&mutex);    taskQueue.push(pTask);    printf("one task is put into queue! Current queue size is %lun",taskQueue.size());    pthread_mutex_unlock(&mutex);    pthread_cond_signal(&condition);}12345678

取走一个工作

Task* ThreadPool::takeTask(){    Task* pTask = NULL;    while(!pTask)    {        pthread_mutex_lock(&mutex);        //线程池运行失常但工作队列为空,那就期待工作的到来        while(taskQueue.empty() && isRunning)        {            pthread_cond_wait(&condition, &mutex);        }        if(!isRunning)        {            pthread_mutex_unlock(&mutex);            break;        }        else if(taskQueue.empty())        {            pthread_mutex_unlock(&mutex);            continue;        }        pTask = taskQueue.front();        taskQueue.pop();        pthread_mutex_unlock(&mutex);    }    return pTask;}12345678910111213141516171819202122232425262728293031

线程中的回调函数。这里留神的是,如果取到的工作为空,咱们认为是线程池敞开的信号(线程池销毁时咱们会在析构函数中调用pthread_cond_broadcast(&condition)来告诉线程来拿工作,拿到的当然是空指针),咱们退出该线程。

void* ThreadPool::threadFunc(void* arg){    ThreadPool* p = (ThreadPool*)arg;    while(p->isRunning)    {        Task* task = p->takeTask();        //如果取到的工作为空,那么咱们完结这个线程        if(!task)        {            //printf("%lu thread will shutdown!n", pthread_self());            break;        }        printf("take one...n");        task->run();    }}123456789101112131415161718

(3)应用例子和测试

上面给出一个线程池的一个应用例子。能够看出,我首先定义了msg_t的构造体,这是因为咱们的服务响应函数是带参数的,所以咱们定义了这个构造体并把其地址作为参数传进线程池中去(通过SetArg办法)。而后咱们也定义了一个工作类MyTask继承于Task,并重写了run办法。咱们要执行的服务函数就能够写在run函数之中。当须要往工作队列投放工作时调用addTask()就能够了,而后线程池会本人安顿工作的散发,外界毋庸关怀。所以一个线程池执行工作的过程能够简化为:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool

#include <stdio.h>#include "thread_pool.h"#include <string>#include <stdlib.h>typedef struct{    int task_id;    std::string task_name;}msg_t;class MyTask: public Task{public:    int run() {        msg_t* msg = (msg_t*)arg;        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),               msg->task_id, msg->task_name.c_str());        sleep(10);        return 0;    }};int main() {    ThreadPool* pMyPool = ThreadPool::createThreadPool(5);    char buf[32] = {0};    msg_t msg[10];    MyTask task_A[10];    //模仿生产者生产工作    for(int i=0;i<10;i++)    {        msg[i].task_id = i;        sprintf(buf,"qq_task_%d",i);        msg[i].task_name = buf;        task_A[i].SetArg(&msg[i]);        pMyPool->addTask(&task_A[i]);        sleep(1);    }    while(1)    {        //printf("there are still %d tasks need to processn", pMyPool->getQueueSize());        if (pMyPool->getQueueSize() == 0)        {            printf("Now I will exit from mainn");            break;        }        sleep(1);    }    delete pMyPool;    return 0;}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758

程序具体运行的逻辑是,咱们建设了一个5个线程大小的线程池,而后咱们又生成了10个工作,往工作队列里放。因为线程数小于工作数,所以当每个线程都拿到本人的工作时,工作队列中还有5个工作待处理,而后有些线程解决完本人的工作了,又去队列里取工作,直到所有工作被解决完了,循环完结,销毁线程池,退出程序。

本群收费分享学习材料(C/C++,Linux,golang,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,ffmpeg,TCP/IP,协程,DPDK,嵌入式)等,交换探讨支付材料请加群Q:1106675687。