1. Boost.Asio 线程池
下载:https://sourceforge.net/proje...
VS 中应用:我的项目 - 属性 - VC目录 - 蕴含目录,增加 YourPath\asio-1.18.2\include
官网文档:https://www.boost.org/doc/lib...
#include <iostream>#include <atomic>#include <asio.hpp>using namespace std::literals;static std::atomic_uint32_t count = 0;int main(){ // 两个线程 asio::thread_pool pool(2); auto work = []() { std::this_thread::sleep_for(1ns); count++; }; int n = 1000; for (int i = 0; i < n; i++) { // 提交工作 asio::post(pool, work); } // 期待所有线程执行实现 pool.join(); std::cout << "count = " << count << '\n';}
count = 1000
其余操作:
void stop();
如果能够的话,立马终止线程,还未被执行的工作可能将不会被执行.
2. 自定义线程池实现
实现思路:
- 保护一个工作队列,其中保留用户提交的工作;
- 创立一个线程汇合,而后令每个线程去轮询工作队列,从工作队列中提取工作并执行.
ThreadPool.h
#ifndef THREADPOOL_H#define THREADPOOL_H#include <thread>#include <mutex>#include <condition_variable>#include <atomic>#include <functional>#include <vector>#include <list>using Task = std::function<void()>;class ThreadPool{public: ThreadPool(int nThreads = 1); ~ThreadPool(); // 提交工作 bool submitTask(const Task& task); bool submitTask(Task&& task); // 期待线程执行结束 void join();private: void runTasks();private: std::vector<std::thread> m_threads; // 工作者线程汇合 std::list<Task> m_taskQueue; // 工作队列 std::atomic<bool> m_exit; // 是否要退出 std::mutex m_taskQueueMutex; std::condition_variable m_taskQueueNotEmpty;};#endif
ThreadPool.cpp:
ThreadPool::ThreadPool(int nThreads) : m_exit(false){ m_threads.reserve(nThreads); for (int i = 0; i < nThreads; i++) { m_threads.emplace_back(std::move(std::thread(&ThreadPool::runTasks, this))); }}ThreadPool::~ThreadPool(){ join();}bool ThreadPool::submitTask(const Task& task){ std::lock_guard<std::mutex> qLock(m_taskQueueMutex); if (m_taskQueue.size() == m_taskQueue.max_size()) { return false; } m_taskQueue.push_back(task); m_taskQueueNotEmpty.notify_one(); return true;}bool ThreadPool::submitTask(Task&& task){ std::lock_guard<std::mutex> qLock(m_taskQueueMutex); if (m_taskQueue.size() == m_taskQueue.max_size()) { return false; } m_taskQueue.emplace_back(std::move(task)); m_taskQueueNotEmpty.notify_one(); return true;}void ThreadPool::join(){ m_exit.store(true); m_taskQueueNotEmpty.notify_all(); for (auto&& t : m_threads) { t.join(); } m_threads.clear();}void ThreadPool::runTasks(){ Task task; while (true) { std::unique_lock<std::mutex> qLock(m_taskQueueMutex); while (m_taskQueue.empty() && !m_exit) { m_taskQueueNotEmpty.wait(qLock); } if (m_taskQueue.empty()) { return; } task = m_taskQueue.front(); m_taskQueue.pop_front(); qLock.unlock(); task(); }}
主程序:
#include <iostream>#include <atomic>#include "ThreadPool.h"using namespace std::literals;static std::atomic_uint32_t count = 0;int main(){ ThreadPool pool(2); int n = 1000; for (int i = 0; i < n; i++) { pool.submitTask([]() { std::this_thread::sleep_for(1ns); count++; }); } pool.join(); std::cout << "count = " << count << '\n';}
count = 1000