首发地址
day05 多线程实现都须要留神什么?
我的项目仓库地址
https://github.com/lzs123/CProxy,欢送fork and star!
往期教程
day01-从一个根底的socket服务说起
day02 真正的高并发还得看IO多路复用
day03 C++我的项目开发配置最佳实际(vscode近程开发配置、格式化、代码查看、cmake治理配置)
day04 高性能服务设计思路
工作线程如何初始化?
在咱们的设计中,工作线程自身是一个事件循环,启动后会陷入阻塞,期待事件产生。为了达到这个成果,线程启动时须要做一些初始化工作。
咱们定义了EventLoopThread
类,类的定义如下
class EventLoopThread { public: EventLoopThread() : thread_(std::bind(&EventLoopThread::ThreadFunc, this)), mutex_(), cond_(){}; ~EventLoopThread(); void StartLoop(); void ThreadFunc(); void AddChannel(SP_Channel); void AddConn(SP_Conn); SP_EventLoop GetLoop() { return loop_; } private: SP_EventLoop loop_; bool started_; std::thread thread_; std::mutex mutex_; std::condition_variable cond_;};
SP结尾的代表对应类的shared_ptr智能指针类型;比方SP_EventLoop =std::shared_ptr\<EventLoop>
在EventLoopThread
构造函数中,创立了一个std::thread
对象,并以EventLoopThread::ThreadFunc
作为线程执行函数。
void EventLoopThread::ThreadFunc() try { if (loop_) { throw "loop_ is not null"; } loop_ = SP_EventLoop(new EventLoop()); { std::unique_lock<std::mutex> lock(mutex_); started_ = true; cond_.notify_all(); } loop_->Loop();} catch (std::exception& e) { SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what()); abort();}
咱们先看看第5行:loop_ = SP_EventLoop(new EventLoop());
初始化了一个EventLoop
并赋值到EventLoopThread
成员变量loop_
上,咱们先看看EventLoop
的定义
class EventLoop { public: EventLoop() : poller_(SP_Epoll(new Epoll())){}; void Loop(); void AddToPoller(SP_Channel channel); void UpdateToPoller(SP_Channel channel); void RemoveFromPoller(SP_Channel channel); private: SP_EventDispatcher poller_;};
能够看到,在EventLoop
的构造函数中,初始化了一个Epoll
对象赋值到变量poller_
上。poller_
自身是个EventDispacther
对象,Epoll
继承了EventDispatcher
,示意基于epoll实现的事件散发;
这样做的益处是如果要新增一种事件散发机制,比方我的项目要反对mac环境,咱们须要用kqueue
代替epoll
实现事件散发。这个时候咱们只须要批改EventLoop
的构造函数,将新的事件散发对象Kqueue
赋值给poller_
即可。
咱们再看看Epoll
在初始化时做了什么。
Epoll::Epoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)), epoll_events_(EVENTSNUM) { assert(epoll_fd_ > 0);}
调用了epoll_create1
,创立了一个epoll
实例,也就是说,一个Epoll
对象初始化时,就曾经在内核筹备好了一个eventpoll
对象,咱们能够增加套接字并监听相干事件了。
看回EventLoopThread::ThreadFunc
, 在初始化一个EventLoop
对象后,EventLoopThread::ThreadFunc
会再调用loop_->Loop()
,这个就是咱们之前说的事件循环了。
void EventLoop::Loop() { std::vector<SP_Channel> ready_channels; for (;;) { ready_channels.clear(); ready_channels = poller_->WaitForReadyChannels(); for (SP_Channel chan : ready_channels) { chan->HandleEvents(); } }}
EventLoop::Loop
自身是个死循环,大略逻辑也比较简单,就是一直从poller_
中获取触发事件的channel
列表,再遍历该列表,调用对应的HandleEvent
事件处理函数。
当没有事件时,该循环会阻塞在WaitForReadyChannels
处,底层其实是阻塞在epoll_wait
。(阻塞过程中线程是挂起状态,并不会占用cpu)。
咱们简略回顾下线程的初始化。
- 创立了一个
EventLoop
对象,底层通过调用epoll_create1
创立了epoll
实例,能够通过该epoll
实例增加事件监听。 之后调用
EventLoop::Loop
,在没有事件时,线程会陷入阻塞;当有事件产生时,会调用注册时对应的handleEvents
办法进行解决。如何控制线程启动的程序?
下面咱们讲了线程的初始化,但初始化后,
EventLoopThread
还须要调用StartLoop
能力开始工作。这其实是为了让主线程期待线程池中的工作线程实现初始化。为什么要管制?
首先讲讲主线程为什么要期待工作线程实现初始化。
在咱们的线程模型设计中,主线程负责监听接管新连贯申请,而后抉择线程池中的一个工作线程,将新连贯套接字交给工作线程解决。
假如工作线程不须要StartLoop
,在工作线程初始化后间接退出到线程池。void EventLoopThreadPool::start() { for (int i = 0; i < thread_num_; i++) { SP_EventLoopThread t(new EventLoopThread()); // t->StartLoop(); threads_.emplace_back(t); }}
当有新连贯时,主线程通过从线程池中获取一个工作线程。但这个时候,咱们是没法保障选出的工作线程是曾经初始化了
loop_
的。因为EventLoopThread::ThreadFunc
的执行是异步的,执行程序可能如下。
在主线程选中将新连贯增加到工作线程中时,工作线程的loop_
此时还未初始化,可能会导致程序间接coredump。
所以,咱们必须想方法让工作线程的EventLoop
初始化在主线程开始接管新连贯申请之前。如何管制?
这实际上是一个多线程的告诉问题,咱们次要采纳的是
mutex
和condition
这两个武器,通过条件变量来实现告诉。在C++中,咱们通常应用
condition_variable
搭配互斥量mutex
来解决线程间同步问题。次要用到的是condition_variable::notify_xx
和condition_variable::wait
函数。
顾名思义,wait
就是一个期待的作用,假如咱们在线程A进行wait
,流程如下:- lock获取到锁,
- 调用
wait
,会先主动unlock开释锁,而后将线程阻塞住; - 被其余线程唤醒后,会主动lock获取锁,继续执行
wait
的下一行代码
唤醒线程的函数是
notify_all
和notify_one
,两者的区别在于notify_one()
每次只唤醒一个线程,那么notify_all()
函数会唤醒所有期待中的线程(当最终能抢到锁的只有一个线程)。调用流程如下:- lock获取到锁
- 调用
notify_all/notify_one
,唤醒期待中的线程 - 开释锁
咱们为EventLoopThread
引入了StartLoop
办法,大略成果如下
为了浏览不便,在这里再贴一遍EventLoopThread
相干的代码
// lib/event_loop_thread.hclass EventLoopThread { public: ... void StartLoop(); void ThreadFunc(); ... private: ... bool started_; std::mutex mutex_; std::condition_variable cond_;};// lib/event_loop_thread.cppvoid EventLoopThread::ThreadFunc() try { if (loop_) { throw "loop_ is not null"; } loop_ = SP_EventLoop(new EventLoop()); { std::unique_lock<std::mutex> lock(mutex_); started_ = true; cond_.notify_all(); } loop_->Loop();} catch (std::exception& e) { SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what()); abort();}void EventLoopThread::StartLoop() { std::unique_lock<std::mutex> lock(mutex_); while (!started_) cond_.wait(lock);}
首先,咱们须要明确,在工作线程初始化loop_
后,就代表该线程曾经筹备实现,能够接管解决套接字了。所以咱们在实现loop_
的初始化后,将started_
置为true
,而后就发送notify告诉唤醒期待线程。
而在StartLoop
函数中,咱们先查看started_
是否为false
,如果是true
,代表工作线程曾经初始化完loop_
了,这种状况StartLoop
不再须要wait,间接返回即可;如果started_
为false
,则陷入期待,直到工作线程实现loop_
初始化后唤醒。
如何将套接字增加到工作线程?
最初,咱们认真聊聊新连贯套接字是如何增加到工作线程中的。
没有申请时,主线程会阻塞在accept
调用,当有新连贯申请时,accept
会返回新连贯套接字accept_fd
。
主线程会先将accept_fd
封装成一个Conn
对象,上一节《day04 高性能服务设计思路》讲到我的项目中有多种连贯,这些连贯有一个独特的基类Conn
, Conn
次要是将套接字封装成一个Channel
,并设置该Channel
各种事件回调解决逻辑。不同类型的Conn
有各自的回调解决逻辑。
接下来,主线程通过EventLoopThreadPool::PickRandThread
获取一个工作线程。
SP_EventLoopThread EventLoopThreadPool::PickRandThread() { SP_EventLoopThread t; { std::unique_lock<std::mutex> lock(thread_mutex_); t = threads_[next_work_thread_Idx_]; next_work_thread_Idx_ = (next_work_thread_Idx_ + 1) % thread_num_; } return t;}
这里咱们间接采纳轮训策略选取线程池中的线程。
获取到工作线程后,咱们间接调用EventLoopThread::AddConn
,将该连贯交由工作线程解决。
// lib/event_loop_thread.cppvoid EventLoopThread::AddConn(SP_Conn conn) { loop_->AddToPoller(conn->GetChannel()); }// lib/event_loop.cppvoid EventLoop::AddToPoller(SP_Channel channel) { poller_->PollAdd(channel); }// lib/epoll.cppvoid Epoll::PollAdd(SP_Channel channel) { int fd = channel->getFd(); epoll_event event; event.data.fd = fd; event.events = channel->GetEvents(); if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) { SPDLOG_CRITICAL("epoll_ctl fd: {} err: {}", fd, strerror(errno)); } else { fd2chan_[fd] = channel; }}
能够发现,底层是调用epoll_ctl
将套接字fd
加到对应工作线程的epoll实例上。
这里值得注意的是,【套接字增加到工作线程的epoll实例】这个动作是在主线程上实现,因为epoll是线程平安的,所以在主线程间接操作工作线程的epoll实例是没有问题的。
持续思考
有没有方法将【套接字增加到工作线程的epoll实例】这个动作放到工作线程上实现呢?其实这种做法更为广泛,比方有些时候为了防止加锁,进步操作效率,某些操作须要由主线程触发,由工作线程执行。
这里的难点在于工作线程是自身是个有限循环,在没有事件产生时,会始终阻塞在epoll_wait
上,这种状况下,主线程如何告诉工作线程执行操作呢?
这里介绍一种思路,咱们能够在EventLoop
初始化时,通过eventfd()
调用创立一个套接字event_fd
,EventLoop
增加监听event_fd
的读事件。
在EventLoop::Loop
函数中,每次解决完一轮读写后,都会再执行一个函数doPendingFns()
, 伪代码如下
void EventLoop::Loop() { std::vector<SP_Channel> ready_channels; for (;;) { ready_channels.clear(); ready_channels = poller_->WaitForReadyChannels(); for (SP_Channel chan : ready_channels) { chan->HandleEvents(); } doPendingFns(); }}void doPendingFns() { std::vector<Functor> fns; { MutexLockGuard lock(mutex_); fns.swap(pendingFns_); } for (auto fn : fns) { fn(); }}
主线程在须要工作线程执行某个函数时,只须要往工作线程的pendingFns
列表增加对应的函数,再往event_fd
轻易写一些数据,让工作线程退出阻塞,工作线程最终会在doPendingFns
遍历执行pendingFns
列表中的全副函数。
如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容