关于linux:大牛的Linux编程线程池的设计与实现详细完整版

31次阅读

共计 5911 个字符,预计需要花费 15 分钟才能阅读完成。

前言:

假如服务器的硬件资源“富余”,那么进步服务器性能的一个很间接的办法就是空间换工夫,即“节约”服务器的硬件资源,以换取其运行效率。晋升服务器性能的一个重要办法就是采纳“池”的思路,即对一组资源在服务器启动之初就被齐全创立好并初始化,这称为动态资源分配。当服务器进入正式运行阶段,即开始解决客户端申请时,如果它须要相干资源就能够间接从池中获取,无需动态分配。很显然,间接从池中获得所须要资源比动态分配资源的速度快得多,因为调配系统资源的零碎调用都是很耗时的。当服务器解决完一个客户端连贯后,能够把相干资源放回池中,毋庸执行零碎调用开释资源。从最终成果来看,资源分配和回收的零碎调用只产生在服务器的启动和完结,这种“池”的形式防止了两头的工作处理过程对内核的频繁拜访,进步了服务器的性能。咱们罕用的线程池和内存池都是基于以上“池”的劣势所设计进去的晋升服务器性能的办法,明天打算以 C ++98 设计一个基于 Linux 零碎的简略线程池。

一、提出疑难:为什么要采纳线程池?

首先想一想,咱们个别的服务器都是动态创建子线程来实现并发服务器的,比方每当有一个客户端申请建设连贯时咱们就动静调用 pthread_create 去创立线程去解决该连贯申请。这种模式有什么毛病呢?

动态创建线程是比拟费时的,这将会导致较慢的客户响应。
动态创建的子线程通常只用来为一个客户服务,这将导致系统上产生大量的轻微线程,线程切换也会消耗 CPU 工夫。
所以咱们为了进一步晋升服务器性能,能够采取“池”的思路,把线程的创立放在程序的初始化阶段一次实现,这就防止了动态创建线程导致服务器响应申请的性能降落。

二、解决疑难:线程池的设计思路

1、以单例模式设计线程池,保障线程池全剧惟一;
2、在获取线程池实例进行线程池初始化:线程事后创立 + 工作队列创立;
3、创立一个工作类,咱们实在的工作会继承该类,实现工作执行。

线程池模式

依据以上思路咱们能够给出这么一个线程池类的框架:

class ThreadPool
{
private:
    std::queue<Task*> taskQueue;   // 工作队列
    bool isRunning;           // 线程池运行标记
    pthread_t* pThreadSet;  // 指向线程 id 汇合的指针
    int threadsNum;         // 线程数目
    pthread_mutex_t mutex;    // 互斥锁
    pthread_cond_t condition;   // 条件变量
    // 单例模式,保障全局线程池只有一个
    ThreadPool(int num=10);
    void createThreads();  // 创立内存池
    void clearThreads();  // 回收线程
    void clearQueue();  // 清空工作队列
    static void* threadFunc(void* arg);
    Task* takeTask();  // 工作线程获取工作
public:
    void addTask(Task* pTask);   // 工作入队
    static ThreadPool* createThreadPool(int num=10); // 静态方法,用于创立线程池实例
    ~ThreadPool();
    int getQueueSize(); // 获取工作队列中的工作数目
    int getThreadlNum();  // 获取线程池中线程总数目};

(1)单例模式下的线程池的初始化

首先咱们以饿汉单例模式来设计这个线程池,以保障该线程池全局惟一:

1、构造函数私有化
2、提供一个动态函数来获取线程池对象

// 饿汉模式,线程平安
ThreadPool* ThreadPool::createThreadPool(int num)
{static ThreadPool* pThreadPoolInstance = new ThreadPool(num);
    return pThreadPoolInstance;
}
123456

ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
线程池对象初始化时咱们须要做三件事:

相干变量的初始化(线程池状态、互斥锁、条件变量等)+ 工作队列的创立 + 线程事后创立

ThreadPool::ThreadPool(int num):threadsNum(num)
{printf("creating threads pool...n");
    isRunning = true;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&condition, NULL);
    createThreads();
    printf("created threads pool successfully!n");
}
123456789

线程池的数目依据对象创立时输出的数目来创立,如果不指定数目,咱们就是应用默认数目 10 个。

void ThreadPool::createThreads()
{pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);
    for(int i=0;i<threadsNum;i++)
    {pthread_create(&pThreadSet[i], NULL, threadFunc, this);
    }
}
12345678

(2)工作增加和线程调度

对于每一个服务申请咱们都能够看作是一个工作,一个工作来了咱们就将它送进线程池中的工作队列中,并通过条件变量的形式告诉线程池中的闲暇线程去拿工作去实现。那问题来了,这里的工作在编程的层面上看到底是什么?咱们能够将工作看成是一个回调函数,将要执行的函数指针往工作队列外面送就能够了,咱们线程拿到这个指针后运行该函数就等于实现服务申请。基于以上的思考,咱们设计了一个独自的形象工作类,让子类继承。类外面有个纯虚函数 run(),用于执行相应操作。

思考到回调函数须要传参数进来,所以特意设置了个指针 arg 来存储参数地址,到时候咱们就能够依据该指针解析出传入的函数实参是什么了。

工作基类

class Task
{
public:
    Task(void* a = NULL): arg(a)
    { }

    void SetArg(void* a) {arg = a;}

    virtual int run()=0;

protected:
    void* arg;

};
typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;

class MyTask: public Task
{
public:
    int run() {msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};
123456789101112131415161718192021222324252627282930313233343536373839

真正应用该类时就本人定义一个子类继承 Task 类,并实现 run()函数,并通过 SetArg()办法去设置传入的参数。比方能够这么用:

msg_t msg[10];
MyTask task_A[10];

// 模仿生产者生产工作
for(int i=0;i<10;i++)
{msg[i].task_id = i;
    sprintf(buf,"qq_task_%d",i);
    msg[i].task_name = buf;
    task_A[i].SetArg(&msg[i]);
    pMyPool->addTask(&task_A[i]);
    sleep(1);
}
12345678910111213

当初来到线程池设计中最难搞的中央:线程调度。一个工作来了,到底怎么让闲暇线程去拿工作去做呢?咱们又如何保障闲暇的线程一直地去拿工作呢?

形象而言,这是一个生产者消费者的模型,零碎一直往工作队列里送工作,咱们通过互斥锁和条件变量来管制工作的退出和获取,线程每当闲暇时就会去调用 takeTask()去拿工作。如果队列没工作那么一些没取得互斥锁的线程就会拥塞期待(因为没锁),取得互斥锁的那个线程会因为没工作而拥塞期待。一旦有工作就会唤醒这个带锁线程拿走工作开释互斥锁。看看代码层面是如何操作的:

退出一个工作

void ThreadPool::addTask(Task* pTask)
{pthread_mutex_lock(&mutex);
    taskQueue.push(pTask);
    printf("one task is put into queue! Current queue size is %lun",taskQueue.size());
    pthread_mutex_unlock(&mutex);
    pthread_cond_signal(&condition);
}
12345678

取走一个工作

Task* ThreadPool::takeTask()
{
    Task* pTask = NULL;
    while(!pTask)
    {pthread_mutex_lock(&mutex);
        // 线程池运行失常但工作队列为空,那就期待工作的到来
        while(taskQueue.empty() && isRunning)
        {pthread_cond_wait(&condition, &mutex);
        }

        if(!isRunning)
        {pthread_mutex_unlock(&mutex);
            break;
        }
        else if(taskQueue.empty())
        {pthread_mutex_unlock(&mutex);
            continue;
        }

        pTask = taskQueue.front();
        taskQueue.pop();
        pthread_mutex_unlock(&mutex);

    }

    return pTask;
}
12345678910111213141516171819202122232425262728293031

线程中的回调函数。这里留神的是,如果取到的工作为空,咱们认为是线程池敞开的信号(线程池销毁时咱们会在析构函数中调用 pthread_cond_broadcast(&condition)来告诉线程来拿工作,拿到的当然是空指针),咱们退出该线程。

void* ThreadPool::threadFunc(void* arg)
{ThreadPool* p = (ThreadPool*)arg;
    while(p->isRunning)
    {Task* task = p->takeTask();
        // 如果取到的工作为空,那么咱们完结这个线程
        if(!task)
        {//printf("%lu thread will shutdown!n", pthread_self());
            break;
        }

        printf("take one...n");

        task->run();}
}
123456789101112131415161718

(3)应用例子和测试

上面给出一个线程池的一个应用例子。能够看出,我首先定义了 msg_t 的构造体,这是因为咱们的服务响应函数是带参数的,所以咱们定义了这个构造体并把其地址作为参数传进线程池中去(通过 SetArg 办法)。而后咱们也定义了一个工作类 MyTask 继承于 Task,并重写了 run 办法。咱们要执行的服务函数就能够写在 run 函数之中。当须要往工作队列投放工作时调用 addTask()就能够了,而后线程池会本人安顿工作的散发,外界毋庸关怀。所以一个线程池执行工作的过程能够简化为:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool

#include <stdio.h>
#include "thread_pool.h"
#include <string>
#include <stdlib.h>

typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;

class MyTask: public Task
{
public:
    int run() {msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};

int main() {ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
    char buf[32] = {0};

    msg_t msg[10];
    MyTask task_A[10];

    // 模仿生产者生产工作
    for(int i=0;i<10;i++)
    {msg[i].task_id = i;
        sprintf(buf,"qq_task_%d",i);
        msg[i].task_name = buf;
        task_A[i].SetArg(&msg[i]);
        pMyPool->addTask(&task_A[i]);
        sleep(1);
    }

    while(1)
    {//printf("there are still %d tasks need to processn", pMyPool->getQueueSize());
        if (pMyPool->getQueueSize() == 0)
        {printf("Now I will exit from mainn");
            break;
        }

        sleep(1);
    }

    delete pMyPool;
    return 0;
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758

程序具体运行的逻辑是,咱们建设了一个 5 个线程大小的线程池,而后咱们又生成了 10 个工作,往工作队列里放。因为线程数小于工作数,所以当每个线程都拿到本人的工作时,工作队列中还有 5 个工作待处理,而后有些线程解决完本人的工作了,又去队列里取工作,直到所有工作被解决完了,循环完结,销毁线程池,退出程序。

本群收费分享学习材料 (C/C++,Linux,golang,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,ffmpeg,TCP/IP,协程,DPDK,嵌入式) 等,交换探讨支付材料请加群 Q:1106675687。

正文完
 0