一个逻辑齐备的线程池

开源我的项目Workflow中有一个十分重要的根底模块:代码仅300行的C语言线程池

逻辑齐备的三个特点在第3局部开始解说,欢送跳阅,或间接到Github主页上围观代码。

https://github.com/sogou/work...

0 - Workflow的thrdpool

Workflow的大招:计算通信融为一体的异步调度模式,而计算的外围:Executor调度器,就是基于这个线程池实现的。能够说,一个通用而高效的线程池,是咱们写C/C++代码时离不开的根底模块。

thrdpool代码地位在src/kernel/,不仅能够间接拿来应用,同时也适宜浏览学习。

而更重要的,秉承Workflow我的项目自身一贯的谨严极简的风格,这个thrdpool代码极致简洁,实现逻辑上亦十分齐备,构造精美,处处谨严,不得不让我惊叹:

妙啊!!!

你可能会很好奇,线程池还能写出什么别致的新思路吗?先列出一些,你们细品:

  • 特点1:创立完线程池后,无需记录任何线程id或对象,线程池能够通过一个等一个的形式优雅地去完结所有线程
  • 特点2:线程工作能够由另一个线程工作调起;甚至线程池正在被销毁时也能够提交下一个工作;(这很重要,因为线程自身很可能是不晓得线程池的状态的;
  • 特点3:同理,线程工作也能够销毁这个线程池;(十分残缺~

我真的急不可待为大家深层解读一下,这个我愿称之为“逻辑齐备”的线程池

1 - 前置常识

第一局部我先从最根本的内容梳理一些集体了解,有根底的小伙伴能够间接跳过。如果有不精确的中央,欢送大家斧正交换~

为什么须要线程池?(其实思路不仅对线程池,对任何无限资源的调度治理都是相似的)

咱们晓得,通过零碎提供的pthread或者std::thread创立线程,就能够实现多线程并发执行咱们的代码。

然而CPU的核数是固定的,所以真正并发执行的最大值也是固定的,过多的线程创立除了频繁产生创立的overhead以外,还会导致对系统资源进行争抢,这些都是不必要的节约。

因而咱们能够治理无限个线程,循环且正当地利用它们。♻️

那么线程池个别蕴含哪些内容呢?

  • 首先是治理若干个~工具人~线程;
  • 其次是治理交给线程去执行的工作,这个个别会有一个队列;
  • 再而后线程之间须要一些同步机制,比方mutex、condition等;
  • 最初就是各线程池实现上本身须要的其余内容了;

好了,接下来咱们看看Workflowthrdpool是怎么做的。

2 - 代码概览

以下共7步罕用思路,足以让咱们把代码飞快过一遍。

第1步:先看头文件,模块提供什么接口。

咱们关上thrdpool.h,能够只关注三个接口:

// 创立线程池thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);// 把工作交给线程池的入口int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); // 销毁线程池void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),                      thrdpool_t *pool);
第2步:接口上有什么数据结构。

也就是,咱们如何形容一个交给线程池的工作。

struct thrdpool_task                                                            {                                                                                   void (*routine)(void *);  // 一个函数指针    void *context;            // 一个上下文};  
第3步:再看实现.c,有什么外部数据结构。
struct __thrdpool{    struct list_head task_queue;   // 工作队列    size_t nthreads;               // 线程个数    size_t stacksize;              // 结构线程时的参数    pthread_t tid;                 // 运行起来之后,pool上记录的这个是zero值    pthread_mutex_t mutex;    pthread_cond_t cond;    pthread_key_t key;    pthread_cond_t *terminate;};

没有一个多余,每一个成员都很到位:

  1. tid:线程id,整个线程池只有一个,它不会奇怪地去记录任何一个线程的id,这样就不完满了,它平时运行的时候是空值,退出的时候,它是用来实现链式期待的要害。
  2. mutexcond是常见的线程间同步的工具,其中这个cond是用来给生产者和消费者去操作工作队列用的。
  3. key:是线程池的key,而后会赋予给每个由线程池创立的线程作为他们的thread local,用于辨别这个线程是否是线程池创立的。
  4. 咱们还看到一个pthread_cond_t *terminate,这有两个用处:不仅是退出时的标记位 ,而且还是调用退出的那个人要期待的condition。

以上各个成员的用处,如同说了,又如同没说,是因为简直每一个成员都值得深挖一下,所以咱们记住它们,前面看代码的时候就会恍然大悟!

第4步:接口都调用了什么外围函数。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize){    thrdpool_t *pool;    ret = pthread_key_create(&pool->key, NULL);    if (ret == 0)    {        ... // 去掉了其余代码,然而留神到方才的tid和terminate的赋值        memset(&pool->tid, 0, sizeof (pthread_t));        pool->terminate = NULL;        if (__thrdpool_create_threads(nthreads, pool) >= 0)            return pool;        ...

这里能够看到__thrdpool_create_threads()里边最要害的就是循环创立nthreads个线程。

        while (pool->nthreads < nthreads)                                               {                                                                                   ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);            ...
第5步:略读外围函数的性能。

所以咱们在上一步晓得了,每个线程执行的是__thrdpool_routine()。不难想象,它会不停从队列拿工作进去执行

static void *__thrdpool_routine(void *arg)                                      {                                                                                   ...                                       while (1)                                                                       {        // 1. 从队列里拿一个工作进去,没有就期待        pthread_mutex_lock(&pool->mutex);        while (!pool->terminate && list_empty(&pool->task_queue))            pthread_cond_wait(&pool->cond, &pool->mutex);        if (pool->terminate) // 2. 线程池完结的标记位,记住它,先跳过            break;        // 3. 如果能走到这里,祝贺你,拿到了工作~                                                                 entry = list_entry(*pos, struct __thrdpool_task_entry, list);        list_del(*pos);        pthread_mutex_unlock(&pool->mutex); // 4. 先解锁                                                                                        task_routine = entry->task.routine;        task_context = entry->task.context;        free(entry);                                                                    task_routine(task_context); // 5. 再执行        // 6. 这里也先记住它,意思是线程池里的线程能够销毁线程池        if (pool->nthreads == 0)         {                                                                                   /* Thread pool was destroyed by the task. */            free(pool);            return NULL;        }                                                                           }    ... // 前面还有魔法,留下一章解读~~~
第6步:把函数之间的关系分割起来。

方才看到的__thrdpool_routine()就是线程的外围函数了,它能够和谁关联起来呢?

能够和接口thrdpool_schedule()关联上。

咱们说过,线程池上有个队列治理工作,

  • 所以,每个执行routine的线程,都是消费者;
  • 而每个发动schedule的线程,都是生产者;

咱们曾经看过消费者了,来看看生产者的代码:

inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,                                thrdpool_t *pool){    struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;      entry->task = *task;    pthread_mutex_lock(&pool->mutex);    list_add_tail(&entry->list, &pool->task_queue); // 增加到队列里    pthread_cond_signal(&pool->cond);               // 叫醒在期待的线程    pthread_mutex_unlock(&pool->mutex);}

说到这里,特点2就十分清晰了:

开篇说的特点2是说,”线程工作能够由另一个线程工作调起”。

只有对队列的治理做得好,显然咱们在消费者所执行的函数也能够做生产者。

第7步:看其余状况的解决,对于线程池来说就是比方销毁的状况。

只看咱们接口thrdpool_destroy()的实现是非常简单的:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),                                  thrdpool_t *pool)                                         {    ...    // 1. 外部会设置pool->terminate,并叫醒所有等在队列拿工作的线程    __thrdpool_terminate(in_pool, pool);    // 2. 把队列里还没有执行的工作都拿进去,通过pending返回给用户    list_for_each_safe(pos, tmp, &pool->task_queue)                                 {        entry = list_entry(pos, struct __thrdpool_task_entry, list);                    list_del(pos);                                                                  if (pending)                                                                        pending(&entry->task);                                                      ... // 前面就是销毁各种内存,同样有魔法~

在退出的时候,咱们那些曾经提交然而还没有被执行的工作是相对不能就这么扔掉了的,于是咱们能够传入一个pending()函数,下层能够做本人的回收、回调、任何保障下层逻辑齐备的事件

设计的完整性,无处不在。

接下来咱们就能够跟着咱们的外围问题,针对性地看看每个特点都是怎么实现的。

3 - 特点1: 一个期待一个的优雅退出

这里提出一个问题:线程池要退出,如何完结所有线程?

个别线程池的实现都是须要记录下所有的线程id,或者thread对象,以便于咱们去jion期待它们完结。

然而咱们方才看,pool里并没有记录所有的tid呀?正如开篇说的,pool上只有一个tid,而且还是个空的值

所以特点1给出了Workflowthrdpool的答案:

无需记录所有线程,我能够让线程挨个主动退出、且一个期待一个,最终达到我调用完thrdpool_destroy()后内存能够回收洁净的目标。

这里先给一个简略的图,假如发动destroy的人是main线程,咱们如何做到一个等一个退出:

最简略的:内部线程发动destroy

步骤如下:

  1. 线程的退出,由thrdpool_destroy()设置pool->terminate开始。
  2. 咱们每个线程,在while(1)里会第一工夫发现terminate,线程池要退出了,而后会break出这个while循环。
  3. 留神这个时候,还持有着mutex锁,咱们拿出pool上惟一的那个tid,放到我的长期变量,我会依据拿进去的值做不同的解决。且我会把我本人的tid放上去,而后再解mutex锁。
  4. 那么很显然,第一个从pool上拿tid的人,会发现这是个0值,就能够间接完结了,不必负责期待任何其他人,但我在齐全完结之前须要有人负责期待我的完结,所以我会把我的id放上去。
  5. 而如果发现自己从pool里拿到的tid不是0值,阐明我要负责jion上一个人,并且把我的tid放上去,让下一个人负责我
  6. 最初的那个人,是那个发现pool->nthreads为0的人,那么我就能够通过这个terminate(它自身是个condition)去告诉发动destroy的人。
  7. 最初发起者就能够退了。

是不是十分有意思!!!十分优雅的做法!!!

所以咱们会发现,其实大家不太须要晓得太多信息,只须要晓得我要负责的上一个人

当然每一步都是十分谨严的,咱们联合方才跳过的第一段魔法感受一下:

static void *__thrdpool_routine(void *arg)                                         {    while (1)    {         pthread_mutex_lock(&pool->mutex); // 1.留神这里还持有锁        ... // 等着队列拿工作进去        if (pool->terminate) // 2. 这既是标识位,也是发动销毁的那个人所期待的condition            break;        ... // 执行拿到的工作    }    /* One thread joins another. Don't need to keep all thread IDs. */    tid = pool->tid; // 3. 把线程池上记录的那个tid拿下来,我来负责上一人    pool->tid = pthread_self(); // 4. 把我本人记录到线程池上,下一个人来负责我    if (--pool->nthreads == 0) // 5. 每个人都减1,最初一个人负责叫醒发动detroy的人        pthread_cond_signal(pool->terminate);    pthread_mutex_unlock(&pool->mutex); // 6. 这里能够解锁进行期待了    if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到0值        pthread_join(tid, NULL); // 8. 只有不0值,我就要负责等上一个完结能力退    return NULL; // 9. 退出,干干净净~}

4 - 特点2:线程工作能够由另一个线程工作调起

在第二局部咱们看过源码,只有队列治理得好,线程工作里提交下一个工作是齐全OK的。

这很正当。

那么问题来了,特点1又说,咱们每个线程,是不太须要晓得太多线程池的状态和信息的。而线程池的销毁是个过程,如果在这个过程间提交工作会怎么样呢?

因而特点2的一个重要解读是:线程池被销毁时也能够提交下一个工作。而且方才提过,还没有被执行的工作,能够通过咱们传入的pending()函数拿回来。

简略看看销毁时的谨严做法:

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 {                                                                             pthread_cond_t term = PTHREAD_COND_INITIALIZER;                                                                          pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位    pool->terminate = &term;   // 2. 之后的增加工作不会被执行,但能够pending拿到    pthread_cond_broadcast(&pool->cond); // 3. 播送所有期待的消费者    if (in_pool) // 4. 这里的魔法等下讲>_<~    {                                                                         /* Thread pool destroyed in a pool thread is legal. */                 pthread_detach(pthread_self());                                         pool->nthreads--;                                                 }                                                                     while (pool->nthreads > 0) // 5. 如果还有线程没有退完,我会等,留神这里是while        pthread_cond_wait(&term, &pool->mutex);                           pthread_mutex_unlock(&pool->mutex);                                             if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)             pthread_join(pool->tid, NULL); // 6.同样地期待打算退出的上一个人}

5 - 特点3:同样能够在线程工作里销毁这个线程池

既然线程工作能够做任何事件,实践上,线程工作也能够销毁线程池

作为一个逻辑齐备的线程池,大胆一点,咱们把问号去掉。

而且,销毁并不会完结当前任务,它会等这个工作执行完

设想一下,方才的__thrdpool_routine(),while里拿进去的那个工作,做的事件居然是发动thrdpool_destroy()...

咱们来把下面的图改一下:

大胆点,咱们让一个routine来destroy线程池

如果发动销毁的人,是咱们本人外部的线程,那么咱们就不是等n个,而是等n-1,少了一个内部线程期待咱们。如何实现能力让这些逻辑都完满交融呢?咱们把方才跳过的三段魔法串起来看看。

第一段魔法,销毁的发起者。

如果发现发动销毁的人是线程池外部的线程,那么它具备较强的自我管理意识(因为后面说了,会等它这个工作执行完),而咱们能够放心大胆地pthread_detach,无需任何人jion它期待它完结。

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 {     ...    if (in_pool) // 每个由线程池创立的线程都设置了一个key,由此判断是否是in_pool    {                                                                                   /* Thread pool destroyed in a pool thread is legal. */                          pthread_detach(pthread_self());        pool->nthreads--;    }        
第二段魔法:线程池谁来free?

肯定是发动销毁的那个人。所以这里用in_pool来管制main线程的回收:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),                                       thrdpool_t *pool){    // 曾经调用完第一段,且挨个pending(未执行的task)了    ... // 销毁其余外部调配的内存    if (!in_pool) // 如果不是外部线程发动的销毁,要负责回收线程池内存        free(pool);}

那当初不是main线程发动的销毁呢?发动的销毁的那个外部线程,怎么能保障我能够在最初关头把所有资源回收洁净、调free(pool)、功成身退呢?

在后面浏览源码第5步,其实咱们看过,__thrdpool_routine()里有free的中央。

于是当初三段魔法终于串起来了。

第三段魔法:谨严的并发。
static void *__thrdpool_routine(void *arg){    while (1)     {        ... // 后面执行完一个工作,如果工作里做的事件,是销毁线程池...        // 留神这个时候,其余内存都曾经被destroy的那个清掉了,万万不能够再用什么mutex、cond        if (pool->nthreads == 0)         {            /* Thread pool was destroyed by the task. */            free(pool);            return NULL;                                                                }        ...

十分重要的一点,因为并发,咱们是不晓得谁先操作的。假如咱们略微改一改这个程序,就又是另一番逻辑

比方我作为一个外部线程,在routine里调用destroy期间,发现还有线程没有执行完,我就要等在我的terminate上,待最初看到nthreads==0的那个人叫醒我。而后我的代码继续执行,函数栈就会从destroy回到routine,也就是下面那几行,而后,free(pool);,这时候我曾经放飞自我detach了,能够顺利完结。

你看,无论如何,都能够完满地销毁线程池:

是不是太妙了!我写到这里曾经要打动哭了!

6 - 简略的用法

这个线程池只有两个文件: thrdpool.hthrdpool.c,而且只依赖内核的数据结构list.h。咱们把它拿进去玩,本人写一段代码:

void my_routine(void *context) // 咱们要执行的函数                                                  {                                                                                   printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );}                                                                                                                                                               void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的工作会到这里{    printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););                                    } int main()                                                                         {    thrdpool_t *thrd_pool = thrdpool_create(3, 1024);  // 创立                              struct thrdpool_task task;    unsigned long long i;                                   for (i = 0; i < 5; i++)    {        task.routine = &my_routine;                                                     task.context = reinterpret_cast<void *>(i);                                     thrdpool_schedule(&task, thrd_pool); // 调用    }    getchar(); // 卡住主线程,按回车持续    thrdpool_destroy(&my_pending, thrd_pool); // 完结    return 0;                                                                   } 

咱们再打印几行log,间接编译就能够跑起来:

简略水平堪比大一上学期C语言作业。

7 - 并发与构造之美

最初谈谈感触。

看完之后我有种很悔恨为什么没有早点看的感觉,并且有一种,我必定还没有齐全了解到里边的精华,毕竟我不能粗浅地了解到设计者过后对并发的构思和模型上的抉择

我只能说,没有十多年顶级的零碎调用和并发编程的功底写不出这样的代码,没有极致的审美与对品控的偏执也写不出这样的代码。

并发编程有很多说道,就正如退出这个这么简略的事件,想要做到退出时回收洁净却很难。如果说你写业务逻辑本人管线程,退出什么的sleep(1)都无所谓,但做框架的人如果不能把本人的框架做得完满得空逻辑自洽,就不免让人感觉差点意思。

而这个thrdpool,它作为一个线程池,是如此地逻辑齐备。

再次让我深深地感到震撼:咱们身边那些原始的、底层的、根底的代码,还有很多新思路,还能够写得如此美。

Workflow我的项目源码地址:GitHub - sogou/workflow: C++ Parallel Computing and Asynchronous Networking Engine