乐趣区

关于c++:C高性能网络服务保姆级教程-day02-真正的高并发还得看IO多路复用

教程阐明

C++ 高性能网络服务保姆级教程

首发地址

day02 真正的高并发还得看 IO 多路复用

本节目的

应用 epoll 实现一个高并发的服务器

从单过程讲起

上节从一个根底的 socket 服务说起咱们实现了一个根本的 socket 服务器,并留了个思考题

先启动 server,而后启动一个 client,不输出数据,这个时候在另外一个终端上再启动一个 client,并在第二个 client 终端中输出数据,会产生什么呢?

实际操作后,咱们会发现,在第二个 client 输出后,服务端并没有响应,直到第一个 client 也输出数据实现交互后,第二个 client 才会有数据返回。

这是因为服务端 accept 获取到第一个 client 的套接字后,因为第一个 client 未输出数据,所以服务端过程会阻塞在期待客户端数据那一行。

...
int read_num = read(accept_fd, read_msg, 100);
...

所以,第二个 client 实现三次握手后,连贯始终在服务端的全连贯队列中,期待 accept 获取解决。

多线程,一个线程一个连贯

后续的 client 无奈失去解决是因为服务端只有一个线程,获取 client 套接字还有连贯通信全在一个线程中。

那咱们间接开多个线程就好了,主线程只负责 accept 获取客户端套接字。每来一个连贯,咱们就新起一个线程去解决客户端和服务端的通信。这样多个连贯之间就不会相互影响了。服务端程序如下:

// per_conn_per_thread_server.cpp
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <thread>
#include <arpa/inet.h>
#include <string.h>
#include <cstdio>
#include <errno.h>

void handleConn(int accept_fd) {char read_msg[100];
  int read_num = read(accept_fd, read_msg, 100);
  printf("get msg from client: %s\n", read_msg);
  int write_num = write(accept_fd, read_msg, read_num);
  close(accept_fd);
}

int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_addr;
  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  server_addr.sin_port = htons(8888);
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {printf("bind err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }

  if (listen(listen_fd, 2048) < 0) {printf("listen err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }
  
  struct sockaddr_in client_addr;
  bzero(&client_addr, sizeof(struct sockaddr_in));
  socklen_t client_addr_len = sizeof(client_addr);
  int accept_fd = 0;
  while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) {printf("get accept_fd: %d from: %s:%d\n", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    std::thread handleThread(handleConn, accept_fd);
    // 将线程设置为后盾线程,防止阻塞主线程
    handleThread.detach();}
}

应用 thread 库时,如果应用 g ++ 进行编译须要增加-lpthread, 残缺编译命令:

g++ -std=c++11 xxx.cpp -lpthread

看似解决阻塞问题了,但其实这种计划有大缺点,只有咱们略微加大下客户端的并发度,就会发现服务端会解决不过去。每来一个连贯都创立一个新线程,解决完后再销毁线程,这种解决形式老本太大。

IO 多路复用和 Reactor 模型

咱们仔细分析下,「per connection per thread」呈现性能瓶颈有以下几个起因:

  1. 一个零碎能同时创立的线程数量是无限的,而且线程数量越多,占用内存也会变多,容易导致 OOM。
  2. 每个连贯都用一个新线程去解决,解决完结后销毁对应线程,线程创立和销毁都须要较大开销。
  3. 一个线程当执行工夫片用完或者遇到零碎调用阻塞时,都会让出 CPU。CPU 会保留线程的现场信息,而后去执行其余线程(这个过程也称为 CPU 上下文切换)。所以当线程数很多时,CPU 的线程上下文切换也会越频繁,真正用于解决连贯通信的工夫也会越少。也就是 CPU 在瞎忙活。

既然是因为并发量高时线程太多导致的性能问题,那如果有一种技术,能让一个线程负责 N 个连贯就能完满解决了。伪代码如下:

class HandleThread {
    std::vector<int> handle_fds;
    void addFd(int fd) {handle_fds.push_back(fd)};
    void work();}
HandleThread::work() {for(;;) {int readyFd = getReadyIOFd();
        ...
        // 对 readyFd 读写解决
        ...
    }
}

auto pool = createThreadPool(4);
int accept_fd = accept(...);
HandleThread thread = pool.getThread();
thread.addFd(accept_fd);

下面代码大家应该很容易看懂,先创立一个指定线程数量的线程池,主线程获取到新连贯后,丢到线程池的一个线程去解决。每个线程初始化后会执行 work 函数,work 函数是一个 while 死循环,外面的 getReadyIOFd 会阻塞线程,直到有可读可写的套接字时,才会唤醒线程,去进行连贯的读写。

扫盲点:个别咱们讲的因为零碎调用(比方 read/write 等)导致阻塞,这个时候阻塞的线程状态会被置为挂起,不会占用 CPU。所以下面尽管有个 while 死循环,但在 getReadyIOFd 被阻塞了,getReadyIOFd 底层也是个零碎调用(具体实现咱们前面会讲到),在没有可读写的套接字时线程并不会占用 CPU。

下面的流程,其实就是赫赫有名的 IO 多路复用和 Reactor 多线程模型了。

epoll 退场

这一节咱们具体聊聊一个 handleThread 是如何治理多个套接字的。

IO 多路复用的实现模型大家多少听过一些,咱们先比拟下常见的 select 和 epoll

select

select 简略了解就是拿一个数组保留连贯套接字,调用 select 时,会将整个数组拷贝到内核空间中,如果以后数组中没有可读写的套接字,线程被阻塞。

等到数组中有可读写的套接字,或者超时(select 能够设置阻塞的超时工夫),select 调用会返回,而后线程遍历全副数组,找到可读写的套接字,进行读写解决。

select 存在以下几个毛病:

  1. 数组中的套接字数量有限度。最多 1024 个,这个数是 select 代码中写死的,具体可看 /usr/include/bits/typesizes.h 中有定义。
  2. select 返回后,只是通知咱们这些数组中有 fd 就绪了,但却没通知咱们具体是哪个 fd 可读写,咱们须要轮训整个数据,能力找到可操作的 fd。效率比拟低
  3. 须要保护一个用来寄存大量 fd 的数据结构,这样会使得用户空间和内核空间在传递该构造时复制开销大。

epoll

epoll 是 linux2.6 的时候提出的,epoll 在內核中保护了一个 eventpoll 对象,eventpoll 蕴含一个红黑树结构的期待队列 wq 和一个链表构造的就绪队列 rdlist。

新获取到一个套接字后,将该套接字增加到 wq 中,等到套接字可读写时,操作系统会将该套接字从 wq 转到 rdlist,而后线程间接解决 rdlist 中的套接字即可,不须要再遍历全副监听的套接字了。

与 select 相比,能够发现有以下几个长处:

  1. 没有套接字数量的限度
  2. 阻塞返回后,会明确告知哪些套接字是能够读写的,不须要全副轮训,效率较高

epoll 根本应用

因为咱们的我的项目选用 epoll,所以上面咱们具体讲讲 epoll 的应用办法

  1. epoll_create创立一个 epoll 实例

    int epoll_create(int size);
    int epoll_create1(int flags);
  2. size:用来告知內核冀望监控的套接字数量,但在 2.6.8 之后就废除了,由零碎自动化调配。
  3. flags: 如果设置为 0,和 epoll_create 性能雷同。能够设置为EPOLL_CLOEXEC, 示意当持有 epoll 句柄的过程 fork 出一个子过程时,子过程不会蕴含该 epoll_fd。
  4. 返回值:胜利返回 epoll_fd, 失败返回 -1
  5. epoll_ctl治理监听的描述符,并注册要监听的事件

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  6. epfd: epoll_create创立的 epoll_fd
  7. op: 要操作的类型:
    a. EPOLL_CTL_ADD:注册事件
    b. EPOLL_CTL_MOD:更改事件
    c. EPOLL_CTL_DEL:删除事件
  8. fd: 要操作的文件描述符
  9. event: 要注册的事件类型

    typedef union epoll_data
    {
      void *ptr;
      int fd;
      uint32_t u32;
      uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event
    {
      uint32_t events;    /* Epoll events */
      epoll_data_t data;    /* User data variable */
    }
    
    // epoll_event.event 示意具体的事件类型,常见有以下几种:// EPOLLIN:文件描述符可读
    // EPOLLOUT:文件描述符可写
    // EPOLLRDHUP:套接字对端断开
    // EPOLLET:边缘触发(前面细讲)
  10. epoll_wait 期待事件产生,没有事件时,调用者过程会被挂起,等到事件产生 / 超时后返回

    int epoll_wait(int epfd, struct epoll_event* evlist, int maxevents, int timeout);
  11. epfd: epoll_create创立的 epoll_fd
  12. evlist: 返回给用户空间的能够解决的 IO 事件数组,即后面说的就绪队列
  13. maxevents:示意一次 epoll_wait 最多能够返回的事件数量
  14. timeout:epoll_wait阻塞的超时值,如果设置为 -1,示意不超时,如果设置为 0,即便没有 IO 事件也会立刻返回

epoll 有 EPOLLLT(程度触发)和 EPOLLET(边缘触发)两种工作模式:

  • 程度触发:只有 socket 处于可读状态 (缓冲区有数据) 或可写状态,无论什么时候进行 epoll_wait 都会返回该 socket,也就是说咱们第一次 epoll_wait 返回后读了局部数据,在下一次的 epoll_wait 调用还是会返回之前那个没读完数据的 socket。
  • 边缘触发:只有套接字的状态由不可写到可写或由不可读到可读时,才会触发 epoll_wait 返回。如果咱们第一次 epoll_wait 返回中读了局部数据,如果该套接字没再收到新数据 ,那即便该套接字缓存区中还有一些数据没读,下一次的epoll_wait 也不会返回该套接字了。所以咱们须要在第一次读时通过循环 read 的形式把套接字中的数据全读出来。

边缘触发解决起来会比程度触发比拟麻烦,但性能会比程度触发高,因为缩小 epoll 相干零碎调用次数

讲完 epoll 的应用办法,咱们把后面的伪代码套上 epoll 的边缘触发模式,残缺代码如下:

#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <thread>
#include <arpa/inet.h>
#include <string.h>
#include <cstdio>
#include <errno.h>
#include <vector>
#include <assert.h>
#include <sys/epoll.h>
#include <fcntl.h>

int setfdNonBlock(int fd) {int flag = fcntl(fd, F_GETFL, 0);
  if (flag == -1) return -1;
  flag |= O_NONBLOCK;
  if (fcntl(fd, F_SETFL, flag) == -1) return -1;
  return 0;
};

void handleConn(int accept_fd) {char read_msg[100];
  char *buf_ptr = read_msg;
  int total_read_num = 0;
  int read_num = 0;
  // 应用的是 epollet 边缘触发模式,须要把套接字缓存区中的数据全读完
  do {read_num = read(accept_fd, buf_ptr, 100);
    buf_ptr += read_num;
    total_read_num += read_num;
  } while(read_num > 0);
  printf("get msg from client: %s\n", read_msg);
  int write_num = write(accept_fd, read_msg, total_read_num);
  close(accept_fd);
}

int listenServer(char *host, int port) {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_addr;
  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  server_addr.sin_port = htons(8888);
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {printf("bind err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }

  if (listen(listen_fd, 2048) < 0) {printf("listen err: %s\n", strerror(errno));
    close(listen_fd);
    return -1;
  }
  return listen_fd;
}

const int EPOLLWAIT_TIME = 10000;
const int EVENTSMAXNUM = 4096;

class HandleThread {
  public:
    HandleThread() 
    : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)),
      epoll_events_(EVENTSMAXNUM),
      thread_(std::bind(&HandleThread::work, this)) {assert(epoll_fd_ > 0);
      thread_.detach();}
    ~HandleThread() {close(epoll_fd_);
    }
    // 线程理论运行函数
    void work();
    // 增加监听套接字
    void addFd(int fd);
    // 不再监听指定套接字
    void rmFd(int fd);
  private:
    int epoll_fd_;
    std::vector<epoll_event>epoll_events_;
    std::thread thread_;
};

void HandleThread::work() {for(;;) {int event_count = epoll_wait(epoll_fd_, &*epoll_events_.begin(), epoll_events_.size(), EPOLLWAIT_TIME);
    if (event_count < 0) {perror("epoll wait error");
      continue;
    }
    for (int i = 0; i < event_count; i++) {epoll_event cur_event = epoll_events_[i];
      int fd = cur_event.data.fd;

      // 不再监听 fd,从 epoll 中去掉
      rmFd(fd);
      // 解决连贯读写
      handleConn(fd);
    }
  }
}

void HandleThread::addFd(int fd) {
  epoll_event event;
  event.data.fd = fd;
  // 只监听读事件
  event.events = EPOLLIN | EPOLLET;
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) {perror("epoll_add error");
  }
}

void HandleThread::rmFd(int fd) {
  epoll_event event;
  event.data.fd = fd;
  event.events = EPOLLIN | EPOLLET;
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) < 0) {perror("epoll_del error");
  }
}

typedef std::shared_ptr<HandleThread> SP_HandleThread;

class HandleThreadPool {
  public:
    HandleThreadPool(int thread_nums) : thread_nums_(thread_nums), next_thread_idx_(0) {for (int i = 0; i < thread_nums; i++) {SP_HandleThread t (new HandleThread());
        thread_pool_.push_back(t);
      }
    }
    SP_HandleThread getThread();
  private:
    int thread_nums_;
    int next_thread_idx_;
    std::vector<SP_HandleThread> thread_pool_;
};

// 从线程池中获取一个线程
SP_HandleThread HandleThreadPool::getThread() {SP_HandleThread t = thread_pool_[next_thread_idx_];
  next_thread_idx_ = (next_thread_idx_ + 1) % thread_nums_;
  return t;
}

int main() {int listen_fd = listenServer("127.0.0.1", 8888);

  // 创立线程池
  HandleThreadPool pool(4);
  // 期待 1 秒
  sleep(1);
  struct sockaddr_in client_addr;
  bzero(&client_addr, sizeof(struct sockaddr_in));
  socklen_t client_addr_len = sizeof(client_addr);
  int accept_fd = 0;
  while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) {printf("get accept_fd: %d from: %s:%d\n", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    // 将 fd 设置为非阻塞 ?
    setfdNonBlock(accept_fd);
    // 从 pool 中获取一个线程解决连贯
    SP_HandleThread t = pool.getThread();
    t->addFd(accept_fd);
  }
}

代码比拟长,但不难,大家能够 fork 下来缓缓看。

应用了智能指针,防止遗记回收堆上的资源。

大家可能会发现代码有两次正文增加了 ”?”,第一处是在创立线程池后,sleep 了 1 秒,这个当老本节的思考题,大家能够先思考,并想想有没有什么更好的解决办法?

第二处是在获取到 accept_fd 后,将 fd 设置为非阻塞了。上面咱们开展具体讲讲。

非阻塞与 IO 多路复用更搭

首先咱们先聊聊阻塞 IO 调用和非阻塞 IO 调用的区别。

阻塞 IO 调用 :过程在调用 IO 操作时,如果没有数据可读或缓冲区没有闲暇空间可写,导致 IO 操作未实现,过程被阻塞挂起,后续操作将无奈执行。比方上面代码,如果客户端建设连贯后,始终不发送数据,那服务端执行就会阻塞在read 调用,前面的 printf 无奈被执行到。

int accept_fd = accept(...);
char read_msg[100];
int read_num = read(accept_fd, read_msg, 100);
printf("i am a log\n");

小提示:下面的代码即便客户端只发了 1 个字节的数据,服务端 read 调用也会返回,并不是要等到读满 100 个字节才会返回。

非阻塞 IO 调用 : 过程在调用 IO 操作时,即便 IO 操作未实现,该 IO 调用也会立即返回,之后过程能够进行后续操作。比方上面代码,将 accept_fd 设置为非阻塞后,再调用read,这时即便客户端没有发数据,服务端也不会始终卡在read 调用上,前面的 printf 能顺利打印进去。

int accept_fd = accept(...);
// 将 fd 设置为非阻塞
setfdNonBlock(accept_fd);
char read_msg[100];
int read_num = read(accept_fd, read_msg, 100);
printf("i am a log\n");

上面咱们再说下为什么 IO 多路复用要搭配非阻塞 IO?

在后面,咱们应用 epoll 实现了一个线程治理多个套接字,当某个套接字有读写事件时,epoll_wait调用返回,通知咱们哪些套接字能读,但并不会通知咱们某个套接字上有多少数据可读。

  • 应用非阻塞 IO 解决形式:咱们只有循环的 read,直到读完全副的数据即可(read 返回 0)。
  • 应用阻塞 IO 解决形式:每次只能调用一次 read,因为咱们并不知道下一次循环中还有没有数据可读,如果没数据就会阻塞整个过程了,所以只能期待下一次的 epoll_wait 返回了。这对于程度触发还可行,但对于边缘触发就不行了,因为咱们不晓得这个套接字还会不会有新数据写入,如果对端不再写入新数据,那缓冲区中剩下的数据就再也读不到了。

残缺源码已上传到 https://github.com/lzs123/CPr…, 欢送 fork and star!

参考文章

如果这篇文章说不清 epoll 的实质,那就过去掐死我吧!

写在最初

如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容。

退出移动版