乐趣区

关于c++:C线程池

1. Boost.Asio 线程池

下载:https://sourceforge.net/proje…

VS 中应用:我的项目 – 属性 – VC 目录 – 蕴含目录,增加 YourPath\asio-1.18.2\include

官网文档:https://www.boost.org/doc/lib…

#include <iostream>
#include <atomic>
#include <asio.hpp>

using namespace std::literals;

static std::atomic_uint32_t count = 0;

int main()
{
    // 两个线程
    asio::thread_pool pool(2);

    auto work = []()
    {std::this_thread::sleep_for(1ns);
        count++;
    };

    int n = 1000;
    for (int i = 0; i < n; i++)
    {
        // 提交工作
        asio::post(pool, work);
    }
    
    // 期待所有线程执行实现
    pool.join();

    std::cout << "count =" << count << '\n';
}
count = 1000

其余操作:

void stop();

如果能够的话,立马终止线程,还未被执行的工作可能将不会被执行.

2. 自定义线程池实现

实现思路:

  • 保护一个工作队列,其中保留用户提交的工作;
  • 创立一个线程汇合,而后令每个线程去轮询工作队列,从工作队列中提取工作并执行.

ThreadPool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <vector>
#include <list>

using Task = std::function<void()>;

class ThreadPool
{
public:
    ThreadPool(int nThreads = 1);
    ~ThreadPool();

    // 提交工作
    bool submitTask(const Task& task);
    bool submitTask(Task&& task);

    // 期待线程执行结束
    void join();

private:
    void runTasks();

private:
    std::vector<std::thread> m_threads;             // 工作者线程汇合
    std::list<Task> m_taskQueue;                    // 工作队列
    std::atomic<bool> m_exit;                       // 是否要退出
    std::mutex m_taskQueueMutex;
    std::condition_variable m_taskQueueNotEmpty;
};

#endif

ThreadPool.cpp:

ThreadPool::ThreadPool(int nThreads) : m_exit(false)
{m_threads.reserve(nThreads);

    for (int i = 0; i < nThreads; i++)
    {m_threads.emplace_back(std::move(std::thread(&ThreadPool::runTasks, this)));
    }
}

ThreadPool::~ThreadPool()
{join();
}

bool ThreadPool::submitTask(const Task& task)
{std::lock_guard<std::mutex> qLock(m_taskQueueMutex);

    if (m_taskQueue.size() == m_taskQueue.max_size())
    {return false;}

    m_taskQueue.push_back(task);
    m_taskQueueNotEmpty.notify_one();
    return true;
}

bool ThreadPool::submitTask(Task&& task)
{std::lock_guard<std::mutex> qLock(m_taskQueueMutex);

    if (m_taskQueue.size() == m_taskQueue.max_size())
    {return false;}

    m_taskQueue.emplace_back(std::move(task));
    m_taskQueueNotEmpty.notify_one();
    return true;
}

void ThreadPool::join()
{m_exit.store(true);
    m_taskQueueNotEmpty.notify_all();

    for (auto&& t : m_threads)
    {t.join();
    }

    m_threads.clear();}

void ThreadPool::runTasks()
{
    Task task;

    while (true)
    {std::unique_lock<std::mutex> qLock(m_taskQueueMutex);

        while (m_taskQueue.empty() && !m_exit)
        {m_taskQueueNotEmpty.wait(qLock);
        }

        if (m_taskQueue.empty())
        {return;}

        task = m_taskQueue.front();
        m_taskQueue.pop_front();

        qLock.unlock();

        task();}
}

主程序:

#include <iostream>
#include <atomic>
#include "ThreadPool.h"

using namespace std::literals;

static std::atomic_uint32_t count = 0;

int main()
{ThreadPool pool(2);

    int n = 1000;
    for (int i = 0; i < n; i++)
    {pool.submitTask([]()
            {std::this_thread::sleep_for(1ns);
                count++;
            });
    }

    pool.join();

    std::cout << "count =" << count << '\n';
}
count = 1000
退出移动版