基本概念
文章开篇先脑补一些知识, 有助于阅读,本篇文章主要以 select 为住,介绍 select 实现原理,并利用 select 来实现一个单进程阻塞复用的网络服务器。
IO 多路复用是指内核一旦发现进程指定的一个或者多个 IO 条件准备读取,它就通知该进程,目前支持 I / O 多路复用有 select,poll,epoll,I/ O 多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作,IO 多路复用适用如下场合:
当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用 I / O 复用。
当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
如果一个 TCP 服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到 I / O 复用。
如果一个服务器即要处理 TCP,又要处理 UDP,一般要使用 I / O 复用。
如果一个服务器要处理多个服务或多个协议,一般要使用 I / O 复用。
与多进程和多线程技术相比,I/ O 多路复用技术的最大优势是系统开销小,系统不必创建进程 / 线程,也不必维护这些进程 / 线程,从而大大减小了系统的开销。
select
描述
监视并等待多个文件描述符的属性变化(可读、可写或错误异常)。select 函数监视的文件描述符分 3 类,分别是 writefds、readfds、和 exceptfds。调用后 select 会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时(timeout 指定等待时间),函数才返回。当 select() 函数返回后,可以通过遍历 fdset,来找到就绪的描述符,并且描述符最大不能超过 1024
poll
描述
poll 的机制与 select 类似,与 select 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll 没有最大文件描述符数量的限制。poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
select 与 poll
select/poll 问题很明显,它们需要循环检测连接是否有事件。如果服务器有上百万个连接,在某一时间只有一个连接向服务器发送了数据,select/poll 需要做循环 100 万次,其中只有 1 次是命中的,剩下的 99 万 9999 次都是无效的,白白浪费了 CPU 资源。
epoll
描述
epoll 是在 2.6 内核中提出的,是之前的 select 和 poll 的增强版本。相对于 select 和 poll 来说,epoll 更加灵活,没有描述符限制, 无需轮询。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中。简单点来说就是当连接有 I / O 流事件产生的时候,epoll 就会去告诉进程哪个连接有 I / O 流事件产生,然后进程就去处理这个事件。
网络服务器
单进程阻塞复用的网络服务器,如下图所示
描述
服务监听流程如上 1、保存所有的 socket, 通过 select 系统调用,监听 socket 描述符的可读事件 2、select 会在内核空间监听一旦发现 socket 可读,会从内核空间传递至用户空间,在用户空间通过逻辑判断是服务端 socket 可读,还是客户端的 socket 可读 3、如果是服务端的 socket 可读,说明有新的客户端建立,将 socket 保留到监听数组当中 4、如果是客户端的 socket 可读,说明当前已经可以去读取客户端发送过来的内容了,读取内容,然后响应给客户端。缺点:1、select 模式本身的缺点(1、循环遍历处理事件、2、内核空间传递数据的消耗)2、单进程对于大量任务处理乏力
代码实现
class Worker{
// 监听 socket
protected $socket = NULL;
// 连接事件回调
public $onConnect = NULL;
// 接收消息事件回调
public $onMessage = NULL;
public $workerNum=4; // 子进程个数
public $allSocket; // 存放所有 socket
public function __construct($socket_address) {
// 监听地址 + 端口
$this->socket=stream_socket_server($socket_address);
stream_set_blocking($this->socket,0); // 设置非阻塞
$this->allSocket[(int)$this->socket]=$this->socket;
}
public function start() {
// 获取配置文件
$this->fork();
}
public function fork(){
$this->accept();// 子进程负责接收客户端请求
}
public function accept(){
// 创建多个子进程阻塞接收服务端 socket
while (true){
$write=$except=[];
// 需要监听 socket
$read=$this->allSocket;
// 状态谁改变
stream_select($read,$write,$except,60);
// 怎么区分服务端跟客户端
foreach ($read as $index=>$val){
// 当前发生改变的是服务端,有连接进入
if($val === $this->socket){
$clientSocket=stream_socket_accept($this->socket); // 阻塞监听
// 触发事件的连接的回调
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
$this->allSocket[(int)$clientSocket]=$clientSocket;
}else{
// 从连接当中读取客户端的内容
$buffer=fread($val,1024);
// 如果数据为空,或者为 false, 不是资源类型
if(empty($buffer)){
if(feof($val) || !is_resource($val)){
// 触发关闭事件
fclose($val);
unset($this->allSocket[(int)$val]);
continue;
}
}
// 正常读取到数据, 触发消息接收事件, 响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$val,$buffer);
}
}
}
}
}
}
$worker = new Worker(‘tcp://0.0.0.0:9805’);
// 连接事件
$worker->onConnect = function ($fd) {
//echo ‘ 连接事件触发 ’,(int)$fd,PHP_EOL;
};
// 消息接收
$worker->onMessage = function ($conn, $message) {
// 事件回调当中写业务逻辑
$content=” 回复的消息 ”;
$http_resonse = “HTTP/1.1 200 OK\r\n”;
$http_resonse .= “Content-Type: text/html;charset=UTF-8\r\n”;
$http_resonse .= “Connection: keep-alive\r\n”; // 连接保持
$http_resonse .= “Server: php socket server\r\n”;
$http_resonse .= “Content-length: “.strlen($content).”\r\n\r\n”;
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); // 启动
函数
stream_socket_server
在 PHP 中提供了一个非常方便的函数一次性创建、绑定端口、监听端口
stream_set_blocking (resource $stream , int $mode) : bool
为资源流设置阻塞或者阻塞模式,$mode 0 非阻塞,1 阻塞
stream_socket_accept (resource $server_socket \[, float $timeout = ini_get(“default_socket_timeout”) [, string &$peername]] ) : resource
接受由 stream_socket_server() 创建的套接字连接