1. Boost.Asio 线程池
下载:https://sourceforge.net/proje…
VS 中应用:我的项目 – 属性 – VC 目录 – 蕴含目录,增加 YourPath\asio-1.18.2\include
官网文档:https://www.boost.org/doc/lib…
#include <iostream>
#include <atomic>
#include <asio.hpp>
using namespace std::literals;
static std::atomic_uint32_t count = 0;
int main()
{
// 两个线程
asio::thread_pool pool(2);
auto work = []()
{std::this_thread::sleep_for(1ns);
count++;
};
int n = 1000;
for (int i = 0; i < n; i++)
{
// 提交工作
asio::post(pool, work);
}
// 期待所有线程执行实现
pool.join();
std::cout << "count =" << count << '\n';
}
count = 1000
其余操作:
void stop();
如果能够的话,立马终止线程,还未被执行的工作可能将不会被执行.
2. 自定义线程池实现
实现思路:
- 保护一个工作队列,其中保留用户提交的工作;
- 创立一个线程汇合,而后令每个线程去轮询工作队列,从工作队列中提取工作并执行.
ThreadPool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <vector>
#include <list>
using Task = std::function<void()>;
class ThreadPool
{
public:
ThreadPool(int nThreads = 1);
~ThreadPool();
// 提交工作
bool submitTask(const Task& task);
bool submitTask(Task&& task);
// 期待线程执行结束
void join();
private:
void runTasks();
private:
std::vector<std::thread> m_threads; // 工作者线程汇合
std::list<Task> m_taskQueue; // 工作队列
std::atomic<bool> m_exit; // 是否要退出
std::mutex m_taskQueueMutex;
std::condition_variable m_taskQueueNotEmpty;
};
#endif
ThreadPool.cpp:
ThreadPool::ThreadPool(int nThreads) : m_exit(false)
{m_threads.reserve(nThreads);
for (int i = 0; i < nThreads; i++)
{m_threads.emplace_back(std::move(std::thread(&ThreadPool::runTasks, this)));
}
}
ThreadPool::~ThreadPool()
{join();
}
bool ThreadPool::submitTask(const Task& task)
{std::lock_guard<std::mutex> qLock(m_taskQueueMutex);
if (m_taskQueue.size() == m_taskQueue.max_size())
{return false;}
m_taskQueue.push_back(task);
m_taskQueueNotEmpty.notify_one();
return true;
}
bool ThreadPool::submitTask(Task&& task)
{std::lock_guard<std::mutex> qLock(m_taskQueueMutex);
if (m_taskQueue.size() == m_taskQueue.max_size())
{return false;}
m_taskQueue.emplace_back(std::move(task));
m_taskQueueNotEmpty.notify_one();
return true;
}
void ThreadPool::join()
{m_exit.store(true);
m_taskQueueNotEmpty.notify_all();
for (auto&& t : m_threads)
{t.join();
}
m_threads.clear();}
void ThreadPool::runTasks()
{
Task task;
while (true)
{std::unique_lock<std::mutex> qLock(m_taskQueueMutex);
while (m_taskQueue.empty() && !m_exit)
{m_taskQueueNotEmpty.wait(qLock);
}
if (m_taskQueue.empty())
{return;}
task = m_taskQueue.front();
m_taskQueue.pop_front();
qLock.unlock();
task();}
}
主程序:
#include <iostream>
#include <atomic>
#include "ThreadPool.h"
using namespace std::literals;
static std::atomic_uint32_t count = 0;
int main()
{ThreadPool pool(2);
int n = 1000;
for (int i = 0; i < n; i++)
{pool.submitTask([]()
{std::this_thread::sleep_for(1ns);
count++;
});
}
pool.join();
std::cout << "count =" << count << '\n';
}
count = 1000