关于数据结构:线程池浅谈

32次阅读

共计 3677 个字符,预计需要花费 10 分钟才能阅读完成。

线程池

概念

线程池就是首先创立一些线程,它们的汇合称为线程池。应用线程池能够很好的进步性能,线程池在系统启动时创立大量闲暇的线程,当工作池有工作时,线程池中的线程排队去支付工作池的工作,如果以后没有工作就阻塞期待。

工作机制

  1. 在有线程池的状况下,工作是交给线程池而不是间接交给某个线程,在线程池拿到工作后,寻找是否有闲暇的线程,如果有就交付给该线程执行。
  2. 一个线程,只能同时执行一个工作,执行结束后,线程重回闲暇状态,期待下一个工作。

应用线程池的起因

多线程运行工夫,零碎一直启动和敞开线程,老本很高,会适度耗费线程资源,以及适度切换线程的危险,从而导致系统资源的解体。

图解线程池

代码

线程池构造

typedef struct NMANAGER
{
    struct NWORKER *workers; // 治理的线程链表头节点
    struct NJOBS *jobs;      // 工作池头节点

    int totWorkers;  // 以后一共领有多少线程
    int idleWorkers; // 以后闲暇线程数量

    pthread_mutex_t pool_mtx; // 线程池互斥锁
    pthread_cond_t pool_cond; // 线程池条件锁,所有线程期待这个条件锁

} nManager;

线程构造

typedef struct NWORKER
{
    pthread_t pthreadid; // 该线程的线程 id
    struct NMANAGER *pool; // 处于哪一个线程池
    int terminate; // 是否被终止,1 为销毁该线程,0 则相同
    struct NWORKER *next; // 链表中的下一个线程
    struct NWORKER *prev;// 链表中的上一个线程
} nWorker;

工作池

typedef struct NJOBS
{void (*func)(void *arg); // 工作回调,线程执行该工作
    void *user_data;         // 工作的参数

    struct NJOBS *next; // 工作池的下一个工作
    struct NJOBS *prev; // 工作池上一个工作
} nJobs;

线程池的创立

int ThreadPoolCreate(threadpool *pool, int numsThread)
{
    //1. 初始化线程池相干属性
    //2. 创立肯定数量线程,扔进线程池
    if (pool == NULL)
    {printf("create error\n");
        return 1;
    }
    if (numsThread <= 0)
        numsThread = 1;
    memset(pool, 0, sizeof(nManager));
    pool->totWorkers = 0;
    pool->idleWorkers = 0;
    pthread_mutex_init(&pool->pool_mtx, NULL);
    pthread_cond_init(&pool->pool_cond, NULL);
    //pool->pool_mtx = PTHREAD_MUTEX_INITIALIZER;
    pthread_t managerpid;
    pthread_create(&managerpid, NULL, ManagerThread, (void *)pool);
    pthread_detach(managerpid);
    int i = 0;
    for (i = 0; i < numsThread; i++)
    {nWorker *worker = (nWorker *)malloc(sizeof(nWorker));
        if (worker == NULL)
        {printf("worker error\n");
            return 1;
        }
        memset(worker, 0, sizeof(nWorker));
        int ret = pthread_create(&worker->pthreadid, NULL, ThreadWorking, worker);
        if (ret)
        {perror("pthread create");
            return 1;
        }
        pthread_detach(worker->pthreadid);
        worker->terminate = 0;
        worker->pool = pool;
        LIST_ADD(worker, pool->workers); // 扔进线程池
        pthread_mutex_lock(&pool->pool_mtx);
        pool->totWorkers++;  // 总共线程减少
        pool->idleWorkers++; // 闲暇线程
        pthread_mutex_unlock(&pool->pool_mtx);
    }
    return 0;
}

销毁线程池

void ThreadPoolDestroy(threadpool *pool)
{
    nWorker *w = pool->workers;
    for (w = pool->workers; w != NULL; w = w->next)
    {w->terminate = 1; // 令 teminate = 1,在线程的执行函数中,会本人把本人给覆灭,耦合度低。}
    pthread_mutex_lock(&pool->pool_mtx);
    pthread_cond_broadcast(&pool->pool_cond); // 告诉所有期待条件锁的线程,没有期待的线程执行完工作之后,会自行销毁
    pthread_mutex_unlock(&pool->pool_mtx);
    free(pool);
}

线程池增加工作

void ThreadPushJob(threadpool *pool, nJobs *job)
{pthread_mutex_lock(&pool->pool_mtx);
    LIST_ADD(job, pool->jobs); // 向线程池增加工作
    printf("add jobs in pool\n");
    pthread_cond_signal(&pool->pool_cond); // 告诉一个线程来工作了
    pthread_mutex_unlock(&pool->pool_mtx);
}

线程执行函数

void *ThreadWorking(void *arg)
{nWorker *worker = (nWorker *)arg;
    while (1)
    {pthread_mutex_lock(&worker->pool->pool_mtx);
        while (worker->pool->jobs == NULL)
        {printf("work waiting------\n");
            if (worker->terminate == 1)
            {
                // 该线程被销毁
                break;
            }
            pthread_cond_wait(&worker->pool->pool_cond, &worker->pool->pool_mtx); // 等到工作池有工作,// 在这个函数之前解锁,而后阻塞在该条件变量上,该函数之后加上相应的互斥锁
        }
        if (worker->terminate == 1)
        {pthread_mutex_unlock(&worker->pool->pool_mtx);
            break;
        }
        nJobs *jobs = worker->pool->jobs;
        LIST_DEL(jobs, worker->pool->jobs);            // 把该工作从工作池取下来
        pthread_mutex_unlock(&worker->pool->pool_mtx); // 当初容许其余线程来工作池取工作啦
        pthread_mutex_lock(&worker->pool->pool_mtx);
        worker->pool->idleWorkers--; // 闲暇线程减一
        pthread_mutex_unlock(&worker->pool->pool_mtx);
        printf("working -------\n");
        jobs->func(jobs->user_data); // 执行该工作的回调函数
        pthread_mutex_lock(&worker->pool->pool_mtx);
        worker->pool->idleWorkers++; // 闲暇线程加一
        pthread_mutex_unlock(&worker->pool->pool_mtx);
        free(jobs); // 开释该工作内存
    }
    // 销毁该线程,同时批改与该线程相干的信息
    pthread_mutex_lock(&worker->pool->pool_mtx);
    worker->pool->totWorkers--;
    worker->pool->idleWorkers--;
    LIST_DEL(worker, worker->pool->workers); // 从线程池中取出
    pthread_mutex_unlock(&worker->pool->pool_mtx);
    free(worker); // 开释
    pthread_exit(NULL);
}

剩下问题

  • 目前线程池不反对,动静的放大和扩张。个别闲暇线程过多的时候,咱们思考缩小线程池容量,忙线程多的时候,咱们思考减少线程。
  • 在线程池的构造体中曾经有了总共线程个数和休闲线程个数,能够应用这个两个进行编写相应程序。
  • 线程池大小问题,咱们能够应用一个线程:管理者线程来保护线程池的大小.

正文完
 0