共计 5911 个字符,预计需要花费 15 分钟才能阅读完成。
前言:
假如服务器的硬件资源“富余”,那么进步服务器性能的一个很间接的办法就是空间换工夫,即“节约”服务器的硬件资源,以换取其运行效率。晋升服务器性能的一个重要办法就是采纳“池”的思路,即对一组资源在服务器启动之初就被齐全创立好并初始化,这称为动态资源分配。当服务器进入正式运行阶段,即开始解决客户端申请时,如果它须要相干资源就能够间接从池中获取,无需动态分配。很显然,间接从池中获得所须要资源比动态分配资源的速度快得多,因为调配系统资源的零碎调用都是很耗时的。当服务器解决完一个客户端连贯后,能够把相干资源放回池中,毋庸执行零碎调用开释资源。从最终成果来看,资源分配和回收的零碎调用只产生在服务器的启动和完结,这种“池”的形式防止了两头的工作处理过程对内核的频繁拜访,进步了服务器的性能。咱们罕用的线程池和内存池都是基于以上“池”的劣势所设计进去的晋升服务器性能的办法,明天打算以 C ++98 设计一个基于 Linux 零碎的简略线程池。
一、提出疑难:为什么要采纳线程池?
首先想一想,咱们个别的服务器都是动态创建子线程来实现并发服务器的,比方每当有一个客户端申请建设连贯时咱们就动静调用 pthread_create 去创立线程去解决该连贯申请。这种模式有什么毛病呢?
动态创建线程是比拟费时的,这将会导致较慢的客户响应。
动态创建的子线程通常只用来为一个客户服务,这将导致系统上产生大量的轻微线程,线程切换也会消耗 CPU 工夫。
所以咱们为了进一步晋升服务器性能,能够采取“池”的思路,把线程的创立放在程序的初始化阶段一次实现,这就防止了动态创建线程导致服务器响应申请的性能降落。
二、解决疑难:线程池的设计思路
1、以单例模式设计线程池,保障线程池全剧惟一;
2、在获取线程池实例进行线程池初始化:线程事后创立 + 工作队列创立;
3、创立一个工作类,咱们实在的工作会继承该类,实现工作执行。
线程池模式
依据以上思路咱们能够给出这么一个线程池类的框架:
class ThreadPool
{
private:
std::queue<Task*> taskQueue; // 工作队列
bool isRunning; // 线程池运行标记
pthread_t* pThreadSet; // 指向线程 id 汇合的指针
int threadsNum; // 线程数目
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t condition; // 条件变量
// 单例模式,保障全局线程池只有一个
ThreadPool(int num=10);
void createThreads(); // 创立内存池
void clearThreads(); // 回收线程
void clearQueue(); // 清空工作队列
static void* threadFunc(void* arg);
Task* takeTask(); // 工作线程获取工作
public:
void addTask(Task* pTask); // 工作入队
static ThreadPool* createThreadPool(int num=10); // 静态方法,用于创立线程池实例
~ThreadPool();
int getQueueSize(); // 获取工作队列中的工作数目
int getThreadlNum(); // 获取线程池中线程总数目};
(1)单例模式下的线程池的初始化
首先咱们以饿汉单例模式来设计这个线程池,以保障该线程池全局惟一:
1、构造函数私有化
2、提供一个动态函数来获取线程池对象
// 饿汉模式,线程平安
ThreadPool* ThreadPool::createThreadPool(int num)
{static ThreadPool* pThreadPoolInstance = new ThreadPool(num);
return pThreadPoolInstance;
}
123456
ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
线程池对象初始化时咱们须要做三件事:
相干变量的初始化(线程池状态、互斥锁、条件变量等)+ 工作队列的创立 + 线程事后创立
ThreadPool::ThreadPool(int num):threadsNum(num)
{printf("creating threads pool...n");
isRunning = true;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condition, NULL);
createThreads();
printf("created threads pool successfully!n");
}
123456789
线程池的数目依据对象创立时输出的数目来创立,如果不指定数目,咱们就是应用默认数目 10 个。
void ThreadPool::createThreads()
{pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);
for(int i=0;i<threadsNum;i++)
{pthread_create(&pThreadSet[i], NULL, threadFunc, this);
}
}
12345678
(2)工作增加和线程调度
对于每一个服务申请咱们都能够看作是一个工作,一个工作来了咱们就将它送进线程池中的工作队列中,并通过条件变量的形式告诉线程池中的闲暇线程去拿工作去实现。那问题来了,这里的工作在编程的层面上看到底是什么?咱们能够将工作看成是一个回调函数,将要执行的函数指针往工作队列外面送就能够了,咱们线程拿到这个指针后运行该函数就等于实现服务申请。基于以上的思考,咱们设计了一个独自的形象工作类,让子类继承。类外面有个纯虚函数 run(),用于执行相应操作。
思考到回调函数须要传参数进来,所以特意设置了个指针 arg 来存储参数地址,到时候咱们就能够依据该指针解析出传入的函数实参是什么了。
工作基类
class Task
{
public:
Task(void* a = NULL): arg(a)
{ }
void SetArg(void* a) {arg = a;}
virtual int run()=0;
protected:
void* arg;
};
typedef struct
{
int task_id;
std::string task_name;
}msg_t;
class MyTask: public Task
{
public:
int run() {msg_t* msg = (msg_t*)arg;
printf("working thread[%lu] : task_id:%d task_name:%sn", pthread_self(),
msg->task_id, msg->task_name.c_str());
sleep(10);
return 0;
}
};
123456789101112131415161718192021222324252627282930313233343536373839
真正应用该类时就本人定义一个子类继承 Task 类,并实现 run()函数,并通过 SetArg()办法去设置传入的参数。比方能够这么用:
msg_t msg[10];
MyTask task_A[10];
// 模仿生产者生产工作
for(int i=0;i<10;i++)
{msg[i].task_id = i;
sprintf(buf,"qq_task_%d",i);
msg[i].task_name = buf;
task_A[i].SetArg(&msg[i]);
pMyPool->addTask(&task_A[i]);
sleep(1);
}
12345678910111213
当初来到线程池设计中最难搞的中央:线程调度。一个工作来了,到底怎么让闲暇线程去拿工作去做呢?咱们又如何保障闲暇的线程一直地去拿工作呢?
形象而言,这是一个生产者消费者的模型,零碎一直往工作队列里送工作,咱们通过互斥锁和条件变量来管制工作的退出和获取,线程每当闲暇时就会去调用 takeTask()去拿工作。如果队列没工作那么一些没取得互斥锁的线程就会拥塞期待(因为没锁),取得互斥锁的那个线程会因为没工作而拥塞期待。一旦有工作就会唤醒这个带锁线程拿走工作开释互斥锁。看看代码层面是如何操作的:
退出一个工作
void ThreadPool::addTask(Task* pTask)
{pthread_mutex_lock(&mutex);
taskQueue.push(pTask);
printf("one task is put into queue! Current queue size is %lun",taskQueue.size());
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&condition);
}
12345678
取走一个工作
Task* ThreadPool::takeTask()
{
Task* pTask = NULL;
while(!pTask)
{pthread_mutex_lock(&mutex);
// 线程池运行失常但工作队列为空,那就期待工作的到来
while(taskQueue.empty() && isRunning)
{pthread_cond_wait(&condition, &mutex);
}
if(!isRunning)
{pthread_mutex_unlock(&mutex);
break;
}
else if(taskQueue.empty())
{pthread_mutex_unlock(&mutex);
continue;
}
pTask = taskQueue.front();
taskQueue.pop();
pthread_mutex_unlock(&mutex);
}
return pTask;
}
12345678910111213141516171819202122232425262728293031
线程中的回调函数。这里留神的是,如果取到的工作为空,咱们认为是线程池敞开的信号(线程池销毁时咱们会在析构函数中调用 pthread_cond_broadcast(&condition)来告诉线程来拿工作,拿到的当然是空指针),咱们退出该线程。
void* ThreadPool::threadFunc(void* arg)
{ThreadPool* p = (ThreadPool*)arg;
while(p->isRunning)
{Task* task = p->takeTask();
// 如果取到的工作为空,那么咱们完结这个线程
if(!task)
{//printf("%lu thread will shutdown!n", pthread_self());
break;
}
printf("take one...n");
task->run();}
}
123456789101112131415161718
(3)应用例子和测试
上面给出一个线程池的一个应用例子。能够看出,我首先定义了 msg_t 的构造体,这是因为咱们的服务响应函数是带参数的,所以咱们定义了这个构造体并把其地址作为参数传进线程池中去(通过 SetArg 办法)。而后咱们也定义了一个工作类 MyTask 继承于 Task,并重写了 run 办法。咱们要执行的服务函数就能够写在 run 函数之中。当须要往工作队列投放工作时调用 addTask()就能够了,而后线程池会本人安顿工作的散发,外界毋庸关怀。所以一个线程池执行工作的过程能够简化为:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool
#include <stdio.h>
#include "thread_pool.h"
#include <string>
#include <stdlib.h>
typedef struct
{
int task_id;
std::string task_name;
}msg_t;
class MyTask: public Task
{
public:
int run() {msg_t* msg = (msg_t*)arg;
printf("working thread[%lu] : task_id:%d task_name:%sn", pthread_self(),
msg->task_id, msg->task_name.c_str());
sleep(10);
return 0;
}
};
int main() {ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
char buf[32] = {0};
msg_t msg[10];
MyTask task_A[10];
// 模仿生产者生产工作
for(int i=0;i<10;i++)
{msg[i].task_id = i;
sprintf(buf,"qq_task_%d",i);
msg[i].task_name = buf;
task_A[i].SetArg(&msg[i]);
pMyPool->addTask(&task_A[i]);
sleep(1);
}
while(1)
{//printf("there are still %d tasks need to processn", pMyPool->getQueueSize());
if (pMyPool->getQueueSize() == 0)
{printf("Now I will exit from mainn");
break;
}
sleep(1);
}
delete pMyPool;
return 0;
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
程序具体运行的逻辑是,咱们建设了一个 5 个线程大小的线程池,而后咱们又生成了 10 个工作,往工作队列里放。因为线程数小于工作数,所以当每个线程都拿到本人的工作时,工作队列中还有 5 个工作待处理,而后有些线程解决完本人的工作了,又去队列里取工作,直到所有工作被解决完了,循环完结,销毁线程池,退出程序。
本群收费分享学习材料 (C/C++,Linux,golang,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,ffmpeg,TCP/IP,协程,DPDK,嵌入式) 等,交换探讨支付材料请加群 Q:1106675687。