关于后端:系列教程多线程实现都需要注意什么

4次阅读

共计 6430 个字符,预计需要花费 17 分钟才能阅读完成。

首发地址

day05 多线程实现都须要留神什么?

我的项目仓库地址

https://github.com/lzs123/CProxy,欢送 fork and star!

往期教程

day01- 从一个根底的 socket 服务说起

day02 真正的高并发还得看 IO 多路复用

day03 C++ 我的项目开发配置最佳实际(vscode 近程开发配置、格式化、代码查看、cmake 治理配置)

day04 高性能服务设计思路


工作线程如何初始化?

在咱们的设计中,工作线程自身是一个事件循环,启动后会陷入阻塞,期待事件产生。为了达到这个成果,线程启动时须要做一些初始化工作。
咱们定义了 EventLoopThread 类,类的定义如下

class EventLoopThread {
    public:
    EventLoopThread()
    : thread_(std::bind(&EventLoopThread::ThreadFunc, this)),
      mutex_(),
      cond_(){};
    ~EventLoopThread();
    void StartLoop();
    void ThreadFunc();
    void AddChannel(SP_Channel);
    void AddConn(SP_Conn);
    SP_EventLoop GetLoop() { return loop_;}
    
    private:
    SP_EventLoop loop_;
    bool started_;
    std::thread thread_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

SP 结尾的代表对应类的 shared_ptr 智能指针类型;比方 SP_EventLoop =std::shared_ptr\<EventLoop>

EventLoopThread 构造函数中,创立了一个 std::thread 对象,并以 EventLoopThread::ThreadFunc 作为线程执行函数。

void EventLoopThread::ThreadFunc() try {if (loop_) {throw "loop_ is not null";}
  loop_ = SP_EventLoop(new EventLoop());
  {std::unique_lock<std::mutex> lock(mutex_);
    started_ = true;
    cond_.notify_all();}
  loop_->Loop();} catch (std::exception& e) {SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
  abort();}

咱们先看看第 5 行:loop_ = SP_EventLoop(new EventLoop());初始化了一个 EventLoop 并赋值到 EventLoopThread 成员变量 loop_ 上,咱们先看看 EventLoop 的定义

class EventLoop {
 public:
  EventLoop() : poller_(SP_Epoll(new Epoll())){};
  void Loop();
  void AddToPoller(SP_Channel channel);
  void UpdateToPoller(SP_Channel channel);
  void RemoveFromPoller(SP_Channel channel);

 private:
  SP_EventDispatcher poller_;
};

能够看到,在 EventLoop 的构造函数中,初始化了一个 Epoll 对象赋值到变量 poller_ 上。poller_自身是个 EventDispacther 对象,Epoll继承了 EventDispatcher,示意基于 epoll 实现的事件散发;
这样做的益处是如果要新增一种事件散发机制,比方我的项目要反对 mac 环境,咱们须要用 kqueue 代替 epoll 实现事件散发。这个时候咱们只须要批改 EventLoop 的构造函数,将新的事件散发对象 Kqueue 赋值给 poller_ 即可。
咱们再看看 Epoll 在初始化时做了什么。

Epoll::Epoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)), epoll_events_(EVENTSNUM) {assert(epoll_fd_ > 0);
}

调用了 epoll_create1,创立了一个epoll 实例,也就是说,一个 Epoll 对象初始化时,就曾经在内核筹备好了一个 eventpoll 对象,咱们能够增加套接字并监听相干事件了。
看回 EventLoopThread::ThreadFunc, 在初始化一个EventLoop 对象后,EventLoopThread::ThreadFunc会再调用loop_->Loop(),这个就是咱们之前说的事件循环了。

void EventLoop::Loop() {
  std::vector<SP_Channel> ready_channels;
  for (;;) {ready_channels.clear();
    ready_channels = poller_->WaitForReadyChannels();
    for (SP_Channel chan : ready_channels) {chan->HandleEvents();
    }
  }
}

EventLoop::Loop自身是个死循环,大略逻辑也比较简单,就是一直从 poller_ 中获取触发事件的 channel 列表,再遍历该列表,调用对应的 HandleEvent 事件处理函数。
当没有事件时,该循环会阻塞在 WaitForReadyChannels 处,底层其实是阻塞在 epoll_wait。(阻塞过程中线程是挂起状态,并不会占用 cpu)。
咱们简略回顾下线程的初始化。

  1. 创立了一个 EventLoop 对象,底层通过调用 epoll_create1 创立了 epoll 实例,能够通过该 epoll 实例增加事件监听。
  2. 之后调用 EventLoop::Loop,在没有事件时,线程会陷入阻塞;当有事件产生时,会调用注册时对应的handleEvents 办法进行解决。

    如何控制线程启动的程序?

    下面咱们讲了线程的初始化,但初始化后,EventLoopThread还须要调用 StartLoop 能力开始工作。这其实是为了让主线程期待线程池中的工作线程实现初始化。

    为什么要管制?

    首先讲讲主线程为什么要期待工作线程实现初始化。
    在咱们的线程模型设计中,主线程负责监听接管新连贯申请,而后抉择线程池中的一个工作线程,将新连贯套接字交给工作线程解决。
    假如工作线程不须要StartLoop,在工作线程初始化后间接退出到线程池。

    void EventLoopThreadPool::start() {for (int i = 0; i < thread_num_; i++) {SP_EventLoopThread t(new EventLoopThread());
     // t->StartLoop();
     threads_.emplace_back(t);
      }
    }

    当有新连贯时,主线程通过从线程池中获取一个工作线程。但这个时候,咱们是没法保障选出的工作线程是曾经初始化了 loop_ 的。因为 EventLoopThread::ThreadFunc 的执行是异步的,执行程序可能如下。

    在主线程选中将新连贯增加到工作线程中时,工作线程的 loop_ 此时还未初始化,可能会导致程序间接 coredump。
    所以,咱们必须想方法让工作线程的 EventLoop 初始化在主线程开始接管新连贯申请之前。

    如何管制?

    这实际上是一个多线程的告诉问题,咱们次要采纳的是 mutexcondition这两个武器,通过条件变量来实现告诉。

    在 C ++ 中,咱们通常应用 condition_variable 搭配互斥量 mutex 来解决线程间同步问题。次要用到的是 condition_variable::notify_xxcondition_variable::wait函数。
    顾名思义,wait就是一个期待的作用,假如咱们在线程 A 进行wait, 流程如下:

    1. lock 获取到锁,
    2. 调用wait,会先主动 unlock 开释锁,而后将线程阻塞住;
    3. 被其余线程唤醒后,会主动 lock 获取锁,继续执行 wait 的下一行代码

    唤醒线程的函数是 notify_allnotify_one, 两者的区别在于 notify_one()每次只唤醒一个线程,那么 notify_all() 函数会唤醒所有期待中的线程(当最终能抢到锁的只有一个线程)。调用流程如下:

    1. lock 获取到锁
    2. 调用notify_all/notify_one,唤醒期待中的线程
    3. 开释锁

咱们为 EventLoopThread 引入了 StartLoop 办法,大略成果如下

为了浏览不便,在这里再贴一遍 EventLoopThread 相干的代码

// lib/event_loop_thread.h
class EventLoopThread {
    public:
    ...
    void StartLoop();
    void ThreadFunc();
    ...
    private:
    ...
    bool started_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

// lib/event_loop_thread.cpp
void EventLoopThread::ThreadFunc() try {if (loop_) {throw "loop_ is not null";}
  loop_ = SP_EventLoop(new EventLoop());
  {std::unique_lock<std::mutex> lock(mutex_);
    started_ = true;
    cond_.notify_all();}
  loop_->Loop();} catch (std::exception& e) {SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
  abort();}

void EventLoopThread::StartLoop() {std::unique_lock<std::mutex> lock(mutex_);
  while (!started_) cond_.wait(lock);
}

首先,咱们须要明确,在工作线程初始化 loop_ 后,就代表该线程曾经筹备实现,能够接管解决套接字了。所以咱们在实现 loop_ 的初始化后,将 started_ 置为 true, 而后就发送 notify 告诉唤醒期待线程。
而在 StartLoop 函数中, 咱们先查看 started_ 是否为 false,如果是true, 代表工作线程曾经初始化完loop_ 了,这种状况 StartLoop 不再须要 wait,间接返回即可;如果 started_false, 则陷入期待,直到工作线程实现 loop_ 初始化后唤醒。

如何将套接字增加到工作线程?

最初,咱们认真聊聊新连贯套接字是如何增加到工作线程中的。
没有申请时,主线程会阻塞在 accept 调用,当有新连贯申请时,accept会返回新连贯套接字 accept_fd
主线程会先将 accept_fd 封装成一个 Conn 对象,上一节《day04 高性能服务设计思路》讲到我的项目中有多种连贯,这些连贯有一个独特的基类 Conn, Conn 次要是将套接字封装成一个 Channel, 并设置该Channel 各种事件回调解决逻辑。不同类型的 Conn 有各自的回调解决逻辑。

接下来,主线程通过 EventLoopThreadPool::PickRandThread 获取一个工作线程。

SP_EventLoopThread EventLoopThreadPool::PickRandThread() {
  SP_EventLoopThread t;
  {std::unique_lock<std::mutex> lock(thread_mutex_);
    t = threads_[next_work_thread_Idx_];
    next_work_thread_Idx_ = (next_work_thread_Idx_ + 1) % thread_num_;
  }
  return t;
}

这里咱们间接采纳轮训策略选取线程池中的线程。
获取到工作线程后,咱们间接调用EventLoopThread::AddConn, 将该连贯交由工作线程解决。

// lib/event_loop_thread.cpp
void EventLoopThread::AddConn(SP_Conn conn) {loop_->AddToPoller(conn->GetChannel()); 
}

// lib/event_loop.cpp
void EventLoop::AddToPoller(SP_Channel channel) {poller_->PollAdd(channel); 
}

// lib/epoll.cpp
void Epoll::PollAdd(SP_Channel channel) {int fd = channel->getFd();
  epoll_event event;
  event.data.fd = fd;
  event.events = channel->GetEvents();
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) {SPDLOG_CRITICAL("epoll_ctl fd: {} err: {}", fd, strerror(errno));
  } else {fd2chan_[fd] = channel;
  }
}

能够发现,底层是调用 epoll_ctl 将套接字 fd 加到对应工作线程的 epoll 实例上。
这里值得注意的是,【套接字增加到工作线程的 epoll 实例】这个动作是在主线程上实现,因为 epoll 是线程平安的,所以在主线程间接操作工作线程的 epoll 实例是没有问题的。

持续思考

有没有方法将【套接字增加到工作线程的 epoll 实例】这个动作放到工作线程上实现呢?其实这种做法更为广泛,比方有些时候为了防止加锁,进步操作效率,某些操作须要由主线程触发,由工作线程执行。
这里的难点在于工作线程是自身是个有限循环,在没有事件产生时,会始终阻塞在 epoll_wait 上,这种状况下,主线程如何告诉工作线程执行操作呢?
这里介绍一种思路,咱们能够在 EventLoop 初始化时,通过 eventfd() 调用创立一个套接字 event_fdEventLoop 增加监听 event_fd 的读事件。
EventLoop::Loop 函数中,每次解决完一轮读写后,都会再执行一个函数doPendingFns(), 伪代码如下

void EventLoop::Loop() {
  std::vector<SP_Channel> ready_channels;
  for (;;) {ready_channels.clear();
    ready_channels = poller_->WaitForReadyChannels();
    for (SP_Channel chan : ready_channels) {chan->HandleEvents();
    }
    doPendingFns();}
}

void doPendingFns() {
  std::vector<Functor> fns;
  {MutexLockGuard lock(mutex_);
      fns.swap(pendingFns_);
  }
  for (auto fn : fns) {fn();
  }
}

主线程在须要工作线程执行某个函数时,只须要往工作线程的 pendingFns 列表增加对应的函数,再往 event_fd 轻易写一些数据,让工作线程退出阻塞,工作线程最终会在 doPendingFns 遍历执行 pendingFns 列表中的全副函数。


如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容

正文完
 0