关于workflow:一个逻辑完备的线程池

70次阅读

共计 9843 个字符,预计需要花费 25 分钟才能阅读完成。

一个逻辑齐备的线程池

开源我的项目 Workflow 中有一个十分重要的根底模块:代码仅 300 行的 C 语言线程池

逻辑齐备的三个特点在第 3 局部开始解说,欢送跳阅,或间接到 Github 主页上围观代码。

https://github.com/sogou/work…

0 – Workflow 的 thrdpool

Workflow 的大招:计算通信融为一体的异步调度模式,而计算的外围:Executor 调度器,就是基于这个线程池实现的。能够说,一个通用而高效的线程池,是咱们写 C /C++ 代码时离不开的根底模块。

thrdpool代码地位在src/kernel/,不仅能够间接拿来应用,同时也适宜浏览学习。

而更重要的,秉承 Workflow 我的项目自身一贯的谨严极简的风格,这个 thrdpool 代码极致简洁,实现逻辑上亦十分齐备,构造精美,处处谨严,不得不让我惊叹:

妙啊!!!🤩

你可能会很好奇,线程池还能写出什么别致的新思路吗?先列出一些,你们细品:

  • 特点 1:创立完线程池后,无需记录任何线程 id 或对象,线程池能够通过一个等一个的形式优雅地去完结所有线程
  • 特点 2:线程工作能够由另一个线程工作调起;甚至线程池正在被销毁时也能够提交下一个工作;(这很重要,因为线程自身很可能是不晓得线程池的状态的;
  • 特点 3:同理,线程工作也能够销毁这个线程池;(十分残缺~

我真的急不可待为大家深层解读一下,这个我愿称之为“逻辑齐备”的线程池

1 – 前置常识

第一局部我先从最根本的内容梳理一些集体了解,有根底的小伙伴能够间接跳过。如果有不精确的中央,欢送大家斧正交换~

为什么须要线程池?(其实思路不仅对线程池,对任何无限资源的调度治理都是相似的)

咱们晓得,通过零碎提供的 pthread 或者 std::thread 创立线程,就能够实现多线程并发执行咱们的代码。

然而 CPU 的核数是固定的,所以真正并发执行的最大值也是固定的,过多的线程创立除了频繁产生创立的 overhead 以外,还会导致对系统资源进行争抢,这些都是不必要的节约。

因而咱们能够治理无限个线程,循环且正当地利用它们。♻️

那么线程池个别蕴含哪些内容呢?

  • 首先是治理若干个~ 工具人~ 线程;
  • 其次是治理交给线程去执行的工作,这个个别会有一个队列;
  • 再而后线程之间须要一些同步机制,比方 mutex、condition 等;
  • 最初就是各线程池实现上本身须要的其余内容了;

好了,接下来咱们看看 Workflowthrdpool是怎么做的。

2 – 代码概览

以下共 7 步罕用思路,足以让咱们把代码飞快过一遍。

第 1 步:先看头文件,模块提供什么接口。

咱们关上thrdpool.h,能够只关注三个接口:

// 创立线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把工作交给线程池的入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); 
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                      thrdpool_t *pool);
第 2 步:接口上有什么数据结构。

也就是,咱们如何形容一个交给线程池的工作。

struct thrdpool_task                                                            
{void (*routine)(void *);  // 一个函数指针
    void *context;            // 一个上下文
};  
第 3 步:再看实现.c,有什么外部数据结构。
struct __thrdpool
{
    struct list_head task_queue;   // 工作队列
    size_t nthreads;               // 线程个数
    size_t stacksize;              // 结构线程时的参数
    pthread_t tid;                 // 运行起来之后,pool 上记录的这个是 zero 值
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    pthread_key_t key;
    pthread_cond_t *terminate;
};

没有一个多余,每一个成员都很到位:

  1. tid:线程 id,整个线程池只有一个,它不会奇怪地去记录任何一个线程的 id,这样就不完满了,它平时运行的时候是空值,退出的时候,它是用来实现链式期待的要害。
  2. mutexcond是常见的线程间同步的工具,其中这个 cond 是用来给 生产者和消费者 去操作工作队列用的。
  3. key:是线程池的 key,而后会赋予给每个由线程池创立的线程作为他们的 thread local,用于辨别这个线程是否是线程池创立的。
  4. 咱们还看到一个pthread_cond_t *terminate,这有两个用处:不仅是退出时的标记位,而且还是调用退出的那个人要期待的 condition。

以上各个成员的用处,如同说了,又如同没说,🤔是因为 简直每一个成员都值得深挖一下,所以咱们记住它们,前面看代码的时候就会恍然大悟!😃

第 4 步:接口都调用了什么外围函数。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
    thrdpool_t *pool;
    ret = pthread_key_create(&pool->key, NULL);
    if (ret == 0)
    {
        ... // 去掉了其余代码,然而留神到方才的 tid 和 terminate 的赋值
        memset(&pool->tid, 0, sizeof (pthread_t));
        pool->terminate = NULL;
        if (__thrdpool_create_threads(nthreads, pool) >= 0)
            return pool;
        ...

这里能够看到 __thrdpool_create_threads() 里边最要害的就是循环创立 nthreads 个线程。

        while (pool->nthreads < nthreads)                                       
        {ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
            ...
第 5 步:略读外围函数的性能。

所以咱们在上一步晓得了,每个线程执行的是 __thrdpool_routine()。不难想象,它会 不停从队列拿工作进去执行

static void *__thrdpool_routine(void *arg)                                      
{                                                                               
    ...                                   
    while (1)                                                                   
    {
        // 1. 从队列里拿一个工作进去,没有就期待
        pthread_mutex_lock(&pool->mutex);
        while (!pool->terminate && list_empty(&pool->task_queue))
            pthread_cond_wait(&pool->cond, &pool->mutex);

        if (pool->terminate) // 2. 线程池完结的标记位,记住它,先跳过
            break;

        // 3. 如果能走到这里,祝贺你,拿到了工作~                                                         
        entry = list_entry(*pos, struct __thrdpool_task_entry, list);
        list_del(*pos);
        pthread_mutex_unlock(&pool->mutex); // 4. 先解锁
                                                                                
        task_routine = entry->task.routine;
        task_context = entry->task.context;
        free(entry);                                                            
        task_routine(task_context); // 5. 再执行

        // 6. 这里也先记住它,意思是线程池里的线程能够销毁线程池
        if (pool->nthreads == 0) 
        {                                                                       
            /* Thread pool was destroyed by the task. */
            free(pool);
            return NULL;
        }                                                                       
    }
    ... // 前面还有魔法,留下一章解读~~~
第 6 步:把函数之间的关系分割起来。

方才看到的 __thrdpool_routine() 就是线程的外围函数了,它能够和谁关联起来呢?👉

👈能够和接口 thrdpool_schedule() 关联上。

咱们说过,线程池上有个队列治理工作,

  • 所以,每个执行 routine 的线程,都是消费者;
  • 而每个发动 schedule 的线程,都是生产者;

咱们曾经看过消费者了,来看看生产者的代码:

inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
                                thrdpool_t *pool)
{struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;  

    entry->task = *task;
    pthread_mutex_lock(&pool->mutex);
    list_add_tail(&entry->list, &pool->task_queue); // 增加到队列里
    pthread_cond_signal(&pool->cond);               // 叫醒在期待的线程
    pthread_mutex_unlock(&pool->mutex);
}

说到这里,特点 2 就十分清晰了:

开篇说的 特点 2 是说,”线程工作能够由另一个线程工作调起”。

只有对队列的治理做得好,显然咱们在消费者所执行的函数也能够做生产者。

第 7 步:看其余状况的解决,对于线程池来说就是比方销毁的状况。

只看咱们接口 thrdpool_destroy()的实现是非常简单的:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),            
                      thrdpool_t *pool)                                         
{
    ...
    // 1. 外部会设置 pool->terminate,并叫醒所有等在队列拿工作的线程
    __thrdpool_terminate(in_pool, pool);

    // 2. 把队列里还没有执行的工作都拿进去,通过 pending 返回给用户
    list_for_each_safe(pos, tmp, &pool->task_queue)                             
    {entry = list_entry(pos, struct __thrdpool_task_entry, list);            
        list_del(pos);                                                          
        if (pending)                                                            
            pending(&entry->task);                                              
        ... // 前面就是销毁各种内存,同样有魔法~

在退出的时候,咱们那些曾经提交然而还没有被执行的工作是相对不能就这么扔掉了的,于是咱们能够传入一个 pending() 函数,下层能够做本人的回收、回调、任何保障下层逻辑齐备的事件

设计的完整性,无处不在。

接下来咱们就能够跟着咱们的外围问题,针对性地看看每个特点都是怎么实现的。

3 – 特点 1: 一个期待一个的优雅退出

这里提出一个问题:线程池要退出,如何完结所有线程?

个别线程池的实现都是须要记录下所有的线程 id,或者 thread 对象,以便于咱们去 jion 期待它们完结。

然而咱们方才看,pool 里并没有记录所有的 tid 呀?正如开篇说的,pool 上只有一个 tid,而且还是个空的值

所以 特点 1 给出了 Workflowthrdpool的答案:

无需记录所有线程,我能够让线程挨个主动退出、且一个期待一个,最终达到我调用完 thrdpool_destroy()后内存能够回收洁净的目标。

这里先给一个简略的图,假如发动 destroy 的人是 main 线程,咱们如何做到一个等一个退出:

最简略的:内部线程发动 destroy👇

步骤如下:

  1. 线程的退出,由 thrdpool_destroy()设置 pool->terminate 开始。
  2. 咱们每个线程,在 while(1)里会第一工夫发现 terminate,线程池要退出了,而后会 break 出这个 while 循环。
  3. 留神这个时候,还持有着 mutex 锁,咱们拿出 pool 上惟一的那个 tid,放到我的长期变量,我会依据拿进去的值做不同的解决。且我会把我本人的 tid 放上去,而后再解 mutex 锁。
  4. 那么很显然,第一个从 pool 上拿 tid 的人,会发现这是个 0 值,就能够间接完结了,不必负责期待任何其他人,但我在齐全完结之前须要有人负责期待我的完结,所以我会把我的 id 放上去。
  5. 而如果发现自己从 pool 里拿到的 tid 不是 0 值,阐明我要负责 jion 上一个人 ,并且把我的 tid 放上去, 让下一个人负责我
  6. 最初的那个人,是那个发现 pool->nthreads 为 0 的人,那么我就能够通过这个 terminate(它自身是个 condition)去告诉发动 destroy 的人。
  7. 最初发起者就能够退了。🔚

是不是十分有意思!!!十分优雅的做法!!!

所以咱们会发现,其实 大家不太须要晓得太多信息,只须要晓得我要负责的上一个人

当然每一步都是十分谨严的,咱们联合方才跳过的第一段魔法🔮感受一下:

static void *__thrdpool_routine(void *arg)                                         
{while (1)
    {pthread_mutex_lock(&pool->mutex); // 1. 留神这里还持有锁
        ... // 等着队列拿工作进去
        if (pool->terminate) // 2. 这既是标识位,也是发动销毁的那个人所期待的 condition
            break;
        ... // 执行拿到的工作
    }

    /* One thread joins another. Don't need to keep all thread IDs. */
    tid = pool->tid; // 3. 把线程池上记录的那个 tid 拿下来,我来负责上一人
    pool->tid = pthread_self(); // 4. 把我本人记录到线程池上,下一个人来负责我
    if (--pool->nthreads == 0) // 5. 每个人都减 1,最初一个人负责叫醒发动 detroy 的人
        pthread_cond_signal(pool->terminate);

    pthread_mutex_unlock(&pool->mutex); // 6. 这里能够解锁进行期待了
    if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到 0 值
        pthread_join(tid, NULL); // 8. 只有不 0 值,我就要负责等上一个完结能力退

    return NULL; // 9. 退出,干干净净~
}

4 – 特点 2:线程工作能够由另一个线程工作调起

在第二局部咱们看过源码,只有队列治理得好,线程工作里提交下一个工作是齐全 OK 的。

这很正当。👌

那么问题来了,特点 1 又说,咱们每个线程,是不太须要晓得太多线程池的状态和信息的。而线程池的销毁是个过程,如果在这个过程间提交工作会怎么样呢?

因而 特点 2 的一个重要解读是:线程池被销毁时也能够提交下一个工作 。而且方才提过,还没有被执行的工作,能够通过咱们传入的 pending() 函数拿回来。

简略看看销毁时的谨严做法:

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
{                                                                         
    pthread_cond_t term = PTHREAD_COND_INITIALIZER;                                                                      
    pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位
    pool->terminate = &term;   // 2. 之后的增加工作不会被执行,但能够 pending 拿到
    pthread_cond_broadcast(&pool->cond); // 3. 播送所有期待的消费者

    if (in_pool) // 4. 这里的魔法等下讲 >_<~
    {                                                                 
        /* Thread pool destroyed in a pool thread is legal. */         
        pthread_detach(pthread_self());                                 
        pool->nthreads--;                                             
    }
                                                                 
    while (pool->nthreads > 0) // 5. 如果还有线程没有退完,我会等,留神这里是 while
        pthread_cond_wait(&term, &pool->mutex);                       
    pthread_mutex_unlock(&pool->mutex);                                         
    if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)     
        pthread_join(pool->tid, NULL); // 6. 同样地期待打算退出的上一个人
}

5 – 特点 3:同样能够在线程工作里销毁这个线程池

既然线程工作能够做任何事件,实践上,线程工作也能够销毁线程池

作为一个逻辑齐备的线程池,大胆一点,咱们把问号去掉。

而且,销毁并不会完结当前任务,它会等这个工作执行完

设想一下,方才的__thrdpool_routine(),while 里拿进去的那个工作,做的事件居然是发动thrdpool_destroy()

咱们来把下面的图改一下:

大胆点,咱们让一个 routine 来 destroy 线程池👇

如果发动销毁的人,是咱们本人外部的线程,那么咱们就不是等 n 个,而是等 n -1,少了一个内部线程期待咱们。如何实现能力让这些逻辑都完满交融呢?咱们把方才跳过的三段魔法串起来看看。

第一段魔法,销毁的发起者。

如果发现发动销毁的人是线程池外部的线程,那么它具备较强的自我管理意识(因为后面说了,会等它这个工作执行完),而咱们能够放心大胆地pthread_detach,无需任何人 jion 它期待它完结。

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
{ 
    ...
    if (in_pool) // 每个由线程池创立的线程都设置了一个 key,由此判断是否是 in_pool
    {                                                                           
        /* Thread pool destroyed in a pool thread is legal. */                  
        pthread_detach(pthread_self());
        pool->nthreads--;
    }        
第二段魔法:线程池谁来 free?

肯定是发动销毁的那个人。所以这里用 in_pool 来管制 main 线程的回收:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                                       thrdpool_t *pool)
{// 曾经调用完第一段,且挨个 pending(未执行的 task)了
    ... // 销毁其余外部调配的内存
    if (!in_pool) // 如果不是外部线程发动的销毁,要负责回收线程池内存
        free(pool);
}

那当初不是 main 线程发动的销毁呢?发动的销毁的那个外部线程,怎么能保障我能够在最初关头把所有资源回收洁净、调 free(pool)、功成身退呢?

在后面浏览源码第 5 步,其实咱们看过,__thrdpool_routine()里有 free 的中央。

于是当初三段魔法终于串起来了。

第三段魔法:谨严的并发。
static void *__thrdpool_routine(void *arg)
{while (1) 
    {
        ... // 后面执行完一个工作,如果工作里做的事件,是销毁线程池...
        // 留神这个时候,其余内存都曾经被 destroy 的那个清掉了,万万不能够再用什么 mutex、cond
        if (pool->nthreads == 0) 
        {
            /* Thread pool was destroyed by the task. */
            free(pool);
            return NULL;                                                        
        }
        ...

十分重要的一点,因为并发,咱们是不晓得谁先操作的。假如咱们略微改一改这个程序,就又是另一番逻辑

比方我作为一个外部线程,在 routine 里调用 destroy 期间,发现还有线程没有执行完,我就要等在我的 terminate 上,待最初看到 nthreads== 0 的那个人叫醒我。而后我的代码继续执行,函数栈就会从 destroy 回到 routine,也就是下面那几行,而后,free(pool);,这时候我曾经放飞自我 detach 了,能够顺利完结。

你看,无论如何,都能够完满地销毁线程池:

是不是太妙了!我写到这里曾经要打动哭了!😭

6 – 简略的用法

这个线程池只有两个文件: thrdpool.hthrdpool.c,而且只依赖内核的数据结构list.h。咱们把它拿进去玩,本人写一段代码:

void my_routine(void *context) // 咱们要执行的函数                                                  
{printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}                                                                               
                                                                                
void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的工作会到这里
{printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););                                    
} 

int main()                                                                         
{thrdpool_t *thrd_pool = thrdpool_create(3, 1024);  // 创立                          
    struct thrdpool_task task;
    unsigned long long i;
                               
    for (i = 0; i < 5; i++)
    {
        task.routine = &my_routine;                                             
        task.context = reinterpret_cast<void *>(i);                             
        thrdpool_schedule(&task, thrd_pool); // 调用
    }
    getchar(); // 卡住主线程,按回车持续
    thrdpool_destroy(&my_pending, thrd_pool); // 完结
    return 0;                                                                   
} 

咱们再打印几行 log,间接编译就能够跑起来:

简略水平堪比大一上学期 C 语言作业。👶

7 – 并发与构造之美

最初谈谈感触。

看完之后我有种很悔恨为什么没有早点看的感觉,并且有一种,我必定还没有齐全了解到里边的精华,毕竟我不能 粗浅地了解到设计者过后对并发的构思和模型上的抉择

我只能说,没有十多年 顶级的零碎调用和并发编程的功底 写不出这样的代码,没有 极致的审美与对品控的偏执 也写不出这样的代码。

并发编程有很多说道,就正如退出这个这么简略的事件,想要做到退出时回收洁净却很难。如果说你写业务逻辑本人管线程,退出什么的 sleep(1)都无所谓,但做框架的人如果不能把本人的框架做得完满得空逻辑自洽,就不免让人感觉差点意思。

而这个 thrdpool,它作为一个线程池,是如此地逻辑齐备。

再次让我深深地感到震撼:咱们身边那些原始的、底层的、根底的代码,还有很多新思路,还能够写得如此美。

Workflow 我的项目源码地址:GitHub – sogou/workflow: C++ Parallel Computing and Asynchronous Networking Engine

正文完
 0