关于c++:C高性能网络服务保姆级教程-day02-真正的高并发还得看IO多路复用

教程阐明

C++高性能网络服务保姆级教程

首发地址

day02 真正的高并发还得看IO多路复用

本节目的

应用epoll实现一个高并发的服务器

从单过程讲起

上节从一个根底的socket服务说起咱们实现了一个根本的socket服务器,并留了个思考题

先启动server,而后启动一个client,不输出数据,这个时候在另外一个终端上再启动一个client,并在第二个client终端中输出数据,会产生什么呢?

实际操作后,咱们会发现,在第二个client输出后,服务端并没有响应,直到第一个client也输出数据实现交互后,第二个client才会有数据返回。

这是因为服务端accept获取到第一个client的套接字后,因为第一个client未输出数据,所以服务端过程会阻塞在期待客户端数据那一行。

...
int read_num = read(accept_fd, read_msg, 100);
...

所以,第二个client实现三次握手后,连贯始终在服务端的全连贯队列中,期待accept获取解决。

多线程,一个线程一个连贯

后续的client无奈失去解决是因为服务端只有一个线程,获取client套接字还有连贯通信全在一个线程中。

那咱们间接开多个线程就好了,主线程只负责accept获取客户端套接字。每来一个连贯,咱们就新起一个线程去解决客户端和服务端的通信。这样多个连贯之间就不会相互影响了。服务端程序如下:

// per_conn_per_thread_server.cpp
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <thread>
#include <arpa/inet.h>
#include <string.h>
#include <cstdio>
#include <errno.h>

void handleConn(int accept_fd) {
  char read_msg[100];
  int read_num = read(accept_fd, read_msg, 100);
  printf("get msg from client: %s\n", read_msg);
  int write_num = write(accept_fd, read_msg, read_num);
  close(accept_fd);
}

int main() {
  int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_addr;
  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  server_addr.sin_port = htons(8888);
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
    printf("bind err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }

  if (listen(listen_fd, 2048) < 0) {
    printf("listen err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }
  
  struct sockaddr_in client_addr;
  bzero(&client_addr, sizeof(struct sockaddr_in));
  socklen_t client_addr_len = sizeof(client_addr);
  int accept_fd = 0;
  while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) {
    printf("get accept_fd: %d from: %s:%d\n", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    std::thread handleThread(handleConn, accept_fd);
    // 将线程设置为后盾线程,防止阻塞主线程
    handleThread.detach();
  }
}

应用thread库时,如果应用g++进行编译须要增加-lpthread,残缺编译命令:

g++ -std=c++11 xxx.cpp -lpthread

看似解决阻塞问题了,但其实这种计划有大缺点,只有咱们略微加大下客户端的并发度,就会发现服务端会解决不过去。每来一个连贯都创立一个新线程,解决完后再销毁线程,这种解决形式老本太大。

IO多路复用和Reactor模型

咱们仔细分析下,「per connection per thread」呈现性能瓶颈有以下几个起因:

  1. 一个零碎能同时创立的线程数量是无限的,而且线程数量越多,占用内存也会变多,容易导致OOM。
  2. 每个连贯都用一个新线程去解决,解决完结后销毁对应线程,线程创立和销毁都须要较大开销。
  3. 一个线程当执行工夫片用完或者遇到零碎调用阻塞时,都会让出CPU。CPU会保留线程的现场信息,而后去执行其余线程(这个过程也称为CPU上下文切换)。所以当线程数很多时,CPU的线程上下文切换也会越频繁,真正用于解决连贯通信的工夫也会越少。也就是CPU在瞎忙活。

既然是因为并发量高时线程太多导致的性能问题,那如果有一种技术,能让一个线程负责N个连贯就能完满解决了。伪代码如下:

class HandleThread {
    std::vector<int> handle_fds;
    void addFd(int fd) {handle_fds.push_back(fd)};
    void work();
}
HandleThread::work() {
    for(;;) {
        int readyFd = getReadyIOFd();
        ...
        // 对readyFd读写解决
        ...
    }
}

auto pool = createThreadPool(4);
int accept_fd = accept(...);
HandleThread thread = pool.getThread();
thread.addFd(accept_fd);

下面代码大家应该很容易看懂,先创立一个指定线程数量的线程池,主线程获取到新连贯后,丢到线程池的一个线程去解决。每个线程初始化后会执行work函数,work函数是一个while死循环,外面的getReadyIOFd会阻塞线程,直到有可读可写的套接字时,才会唤醒线程,去进行连贯的读写。

扫盲点:个别咱们讲的因为零碎调用(比方read/write等)导致阻塞,这个时候阻塞的线程状态会被置为挂起,不会占用CPU。所以下面尽管有个while死循环,但在getReadyIOFd被阻塞了,getReadyIOFd底层也是个零碎调用(具体实现咱们前面会讲到),在没有可读写的套接字时线程并不会占用CPU。

下面的流程,其实就是赫赫有名的IO多路复用和Reactor多线程模型了。

epoll退场

这一节咱们具体聊聊一个handleThread是如何治理多个套接字的。

IO多路复用的实现模型大家多少听过一些,咱们先比拟下常见的select和epoll

select

select简略了解就是拿一个数组保留连贯套接字,调用select时,会将整个数组拷贝到内核空间中,如果以后数组中没有可读写的套接字,线程被阻塞。

等到数组中有可读写的套接字,或者超时(select能够设置阻塞的超时工夫),select调用会返回,而后线程遍历全副数组,找到可读写的套接字,进行读写解决。

select存在以下几个毛病:

  1. 数组中的套接字数量有限度。最多1024个,这个数是select代码中写死的,具体可看/usr/include/bits/typesizes.h中有定义。
  2. select返回后,只是通知咱们这些数组中有fd就绪了,但却没通知咱们具体是哪个fd可读写,咱们须要轮训整个数据,能力找到可操作的fd。效率比拟低
  3. 须要保护一个用来寄存大量fd的数据结构,这样会使得用户空间和内核空间在传递该构造时复制开销大。

epoll

epoll是linux2.6的时候提出的,epoll在內核中保护了一个eventpoll对象,eventpoll蕴含一个红黑树结构的期待队列wq和一个链表构造的就绪队列rdlist。

新获取到一个套接字后,将该套接字增加到wq中,等到套接字可读写时,操作系统会将该套接字从wq转到rdlist,而后线程间接解决rdlist中的套接字即可,不须要再遍历全副监听的套接字了。

与select相比,能够发现有以下几个长处:

  1. 没有套接字数量的限度
  2. 阻塞返回后,会明确告知哪些套接字是能够读写的,不须要全副轮训,效率较高

epoll根本应用

因为咱们的我的项目选用epoll,所以上面咱们具体讲讲epoll的应用办法

  1. epoll_create创立一个epoll实例

    int epoll_create(int size);
    int epoll_create1(int flags);
  2. size:用来告知內核冀望监控的套接字数量,但在2.6.8之后就废除了,由零碎自动化调配。
  3. flags: 如果设置为0,和epoll_create性能雷同。能够设置为EPOLL_CLOEXEC, 示意当持有epoll句柄的过程fork出一个子过程时,子过程不会蕴含该epoll_fd。
  4. 返回值:胜利返回epoll_fd,失败返回-1
  5. epoll_ctl治理监听的描述符,并注册要监听的事件

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  6. epfd: epoll_create创立的epoll_fd
  7. op: 要操作的类型:
    a. EPOLL_CTL_ADD :注册事件
    b. EPOLL_CTL_MOD:更改事件
    c. EPOLL_CTL_DEL:删除事件
  8. fd: 要操作的文件描述符
  9. event: 要注册的事件类型

    typedef union epoll_data
    {
      void *ptr;
      int fd;
      uint32_t u32;
      uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event
    {
      uint32_t events;    /* Epoll events */
      epoll_data_t data;    /* User data variable */
    }
    
    // epoll_event.event示意具体的事件类型,常见有以下几种:
    // EPOLLIN:文件描述符可读
    // EPOLLOUT:文件描述符可写
    // EPOLLRDHUP:套接字对端断开
    // EPOLLET:边缘触发(前面细讲)
  10. epoll_wait 期待事件产生,没有事件时,调用者过程会被挂起,等到事件产生/超时后返回

    int epoll_wait(int epfd, struct epoll_event* evlist, int maxevents, int timeout);
  11. epfd: epoll_create创立的epoll_fd
  12. evlist: 返回给用户空间的能够解决的IO事件数组,即后面说的就绪队列
  13. maxevents:示意一次epoll_wait最多能够返回的事件数量
  14. timeout: epoll_wait阻塞的超时值,如果设置为-1,示意不超时,如果设置为0,即便没有IO事件也会立刻返回

epoll有EPOLLLT(程度触发)和EPOLLET(边缘触发)两种工作模式:

  • 程度触发:只有socket处于可读状态(缓冲区有数据)或可写状态,无论什么时候进行epoll_wait都会返回该socket,也就是说咱们第一次epoll_wait返回后读了局部数据,在下一次的epoll_wait调用还是会返回之前那个没读完数据的socket。
  • 边缘触发:只有套接字的状态由不可写到可写或由不可读到可读时,才会触发epoll_wait返回。如果咱们第一次epoll_wait返回中读了局部数据,如果该套接字没再收到新数据,那即便该套接字缓存区中还有一些数据没读,下一次的epoll_wait也不会返回该套接字了。所以咱们须要在第一次读时通过循环read的形式把套接字中的数据全读出来。

边缘触发解决起来会比程度触发比拟麻烦,但性能会比程度触发高,因为缩小 epoll 相干零碎调用次数

讲完epoll的应用办法,咱们把后面的伪代码套上epoll的边缘触发模式,残缺代码如下:

#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <thread>
#include <arpa/inet.h>
#include <string.h>
#include <cstdio>
#include <errno.h>
#include <vector>
#include <assert.h>
#include <sys/epoll.h>
#include <fcntl.h>

int setfdNonBlock(int fd) {
  int flag = fcntl(fd, F_GETFL, 0);
  if (flag == -1) return -1;
  flag |= O_NONBLOCK;
  if (fcntl(fd, F_SETFL, flag) == -1) return -1;
  return 0;
};

void handleConn(int accept_fd) {
  char read_msg[100];
  char *buf_ptr = read_msg;
  int total_read_num = 0;
  int read_num = 0;
  // 应用的是epollet边缘触发模式,须要把套接字缓存区中的数据全读完
  do {
    read_num = read(accept_fd, buf_ptr, 100);
    buf_ptr += read_num;
    total_read_num += read_num;
  } while(read_num > 0);
  printf("get msg from client: %s\n", read_msg);
  int write_num = write(accept_fd, read_msg, total_read_num);
  close(accept_fd);
}

int listenServer(char *host, int port) {
  int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_addr;
  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  server_addr.sin_port = htons(8888);
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
    printf("bind err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }

  if (listen(listen_fd, 2048) < 0) {
    printf("listen err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }
  return listen_fd;
}

const int EPOLLWAIT_TIME = 10000;
const int EVENTSMAXNUM = 4096;

class HandleThread {
  public:
    HandleThread() 
    : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)),
      epoll_events_(EVENTSMAXNUM),
      thread_(std::bind(&HandleThread::work, this)) {
      assert(epoll_fd_ > 0);
      thread_.detach();
    }
    ~HandleThread() {
      close(epoll_fd_);
    }
    // 线程理论运行函数
    void work();
    // 增加监听套接字
    void addFd(int fd);
    // 不再监听指定套接字
    void rmFd(int fd);
  private:
    int epoll_fd_;
    std::vector<epoll_event>epoll_events_;
    std::thread thread_;
};

void HandleThread::work() {
  for(;;) {
    int event_count = epoll_wait(epoll_fd_, &*epoll_events_.begin(), epoll_events_.size(), EPOLLWAIT_TIME);
    if (event_count < 0) {
      perror("epoll wait error");
      continue;
    }
    for (int i = 0; i < event_count; i++) {
      epoll_event cur_event = epoll_events_[i];
      int fd = cur_event.data.fd;

      // 不再监听fd,从epoll中去掉
      rmFd(fd);
      // 解决连贯读写
      handleConn(fd);
    }
  }
}

void HandleThread::addFd(int fd) {
  epoll_event event;
  event.data.fd = fd;
  // 只监听读事件
  event.events = EPOLLIN | EPOLLET;
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) {
    perror("epoll_add error");
  }
}

void HandleThread::rmFd(int fd) {
  epoll_event event;
  event.data.fd = fd;
  event.events = EPOLLIN | EPOLLET;
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) < 0) {
    perror("epoll_del error");
  }
}

typedef std::shared_ptr<HandleThread> SP_HandleThread;

class HandleThreadPool {
  public:
    HandleThreadPool(int thread_nums) : thread_nums_(thread_nums), next_thread_idx_(0) {
      for (int i = 0; i < thread_nums; i++) {
        SP_HandleThread t (new HandleThread());
        thread_pool_.push_back(t);
      }
    }
    SP_HandleThread getThread();
  private:
    int thread_nums_;
    int next_thread_idx_;
    std::vector<SP_HandleThread> thread_pool_;
};

// 从线程池中获取一个线程
SP_HandleThread HandleThreadPool::getThread() {
  SP_HandleThread t = thread_pool_[next_thread_idx_];
  next_thread_idx_ = (next_thread_idx_ + 1) % thread_nums_;
  return t;
}

int main() {
  int listen_fd = listenServer("127.0.0.1", 8888);

  // 创立线程池
  HandleThreadPool pool(4);
  // 期待1秒
  sleep(1);
  struct sockaddr_in client_addr;
  bzero(&client_addr, sizeof(struct sockaddr_in));
  socklen_t client_addr_len = sizeof(client_addr);
  int accept_fd = 0;
  while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) {
    printf("get accept_fd: %d from: %s:%d\n", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    // 将fd设置为非阻塞 ?
    setfdNonBlock(accept_fd);
    // 从pool中获取一个线程解决连贯
    SP_HandleThread t = pool.getThread();
    t->addFd(accept_fd);
  }
}

代码比拟长,但不难,大家能够fork下来缓缓看。

应用了智能指针,防止遗记回收堆上的资源。

大家可能会发现代码有两次正文增加了”?”,第一处是在创立线程池后,sleep了1秒,这个当老本节的思考题,大家能够先思考,并想想有没有什么更好的解决办法?

第二处是在获取到accept_fd后,将fd设置为非阻塞了。上面咱们开展具体讲讲。

非阻塞与IO多路复用更搭

首先咱们先聊聊阻塞IO调用和非阻塞IO调用的区别。

阻塞IO调用:过程在调用IO操作时,如果没有数据可读或缓冲区没有闲暇空间可写,导致IO操作未实现,过程被阻塞挂起,后续操作将无奈执行。比方上面代码,如果客户端建设连贯后,始终不发送数据,那服务端执行就会阻塞在read调用,前面的printf无奈被执行到。

int accept_fd = accept(...);
char read_msg[100];
int read_num = read(accept_fd, read_msg, 100);
printf("i am a log\n");

小提示:下面的代码即便客户端只发了1个字节的数据,服务端read调用也会返回,并不是要等到读满100个字节才会返回。

非阻塞IO调用: 过程在调用IO操作时,即便IO操作未实现,该IO调用也会立即返回,之后过程能够进行后续操作。比方上面代码,将accept_fd设置为非阻塞后,再调用read,这时即便客户端没有发数据,服务端也不会始终卡在read调用上,前面的printf能顺利打印进去。

int accept_fd = accept(...);
// 将fd设置为非阻塞
setfdNonBlock(accept_fd);
char read_msg[100];
int read_num = read(accept_fd, read_msg, 100);
printf("i am a log\n");

上面咱们再说下为什么IO多路复用要搭配非阻塞IO?

在后面,咱们应用epoll实现了一个线程治理多个套接字,当某个套接字有读写事件时,epoll_wait调用返回,通知咱们哪些套接字能读,但并不会通知咱们某个套接字上有多少数据可读。

  • 应用非阻塞IO解决形式:咱们只有循环的read,直到读完全副的数据即可(read返回0)。
  • 应用阻塞IO解决形式:每次只能调用一次read,因为咱们并不知道下一次循环中还有没有数据可读,如果没数据就会阻塞整个过程了,所以只能期待下一次的epoll_wait返回了。这对于程度触发还可行,但对于边缘触发就不行了,因为咱们不晓得这个套接字还会不会有新数据写入,如果对端不再写入新数据,那缓冲区中剩下的数据就再也读不到了。

残缺源码已上传到https://github.com/lzs123/CPr…,欢送fork and star!

参考文章

如果这篇文章说不清epoll的实质,那就过去掐死我吧!

写在最初

如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理