共计 4487 个字符,预计需要花费 12 分钟才能阅读完成。
一提到高并发,就没有方法绕开 I / O 复用,再具体到特定的平台 linux, 就没方法绕开 epoll. epoll 为啥高效的原理就不讲了,感兴趣的同学能够自行搜寻钻研一下,也能够间接用现成的 Swoole,原理一样都挺不错的。
php 怎么玩 epoll?首先得装置个 libevent 库,再装个 event 扩大或者 libevent 扩大就能够欢快地游玩了.
有些人搞不清楚 libevent 库跟 libevent 扩大的区别,简略来说,libevent 库是 C 语言对 epoll 的封装,跟 PHP 毛关系都没有;
libevent 扩大就是 PHP 跟 libevent 库的沟通桥梁。实际上 PHP 的很多扩大就是干这个事的,有一些优良的 C 语言库,PHP 想间接拿来用,就通过 PHP 扩大的形式接入到 PHP。
libevent 扩大和 event 扩大轻易选一个装,event 扩大因为更面向对象一点。本人去 http://pecl.php.net 外面搜跟本人 PHP 版本对应的扩大,下好编译装置一下就 OK 了. 电脑装了多个版本的 PHP 编译的时候留神一下,phpize 的版本要对应上,别搞错了,典型的五步曲:
phpize
./configure
make
make install
php -m | grep event #看看装上了没
咱们要实现的服务器,传输层是 TCP 协定,应用层协定太多太简单,限于篇幅,会简略地以 HTTP 服务器举个例子,HTTP 协定自身就很简单,要实现起来细节上有很多讲究,咱们也不会齐全实现 HTTP 协定。
首先,创立一个 socket,三步曲,socket_create、socket_bind、socket_listen,为什么是这三步曲呢?
很简略,不论你传输层协定是啥,你上面的网络层协定你得选个版本吧,IPV4 还是 IPV6,传输层工作形式你得选一个吧,全双工、半双工还是单工,TCP 还是 UDP 你也得选一个吧,socket_create 就是这三个选项;
确定了网络层和传输层,你得通知我监听哪个端口吧,这就对应了 socket_bind;而后你得开启监听,并指定一个客户端的队列长度吧,这就是 socket_listen 干的事。
创立完了,同步阻塞咱就不介绍了,一个过程同时最多 hold 处一个连贯,多几个连贯同时申请,就得等呗,超过了 socket_listen 指定的队列长度,就得返回 504 了。多过程也一样,几个过程就有几个并发,过程又是低廉资源,而且过程的上下文切换费时费力,导致整个零碎效率低下。
没关系,咱有 epoll,hold 住万千申请不是梦,先实现一个 Reactor。libevent 库就是 Reactor 模式,间接调用函数就是在应用 Reactor 模式,所以无需纠结到底 php 怎么实现 Reactor 模式。
<?php
use Event;
use EventBase;
class Reactor
{
protected $reactor;
protected $events;
public static $instance = null;
const READ = Event::READ | Event::PERSIST;
const WRITE = Event::WRITE | Event::PERSIST;
public static function getInstance()
{if (is_null(self::$instance)) {self::$instance = new self();
self::$instance->reactor = new EventBase;
}
return self::$instance;
}
public function add($fd, $what, $cb, $arg = null)
{switch ($what) {
case self::READ:
$event = new Event($this->reactor, $fd, self::READ, $cb, $arg);
break;
case self::WRITE:
$event = new Event($this->reactor, $fd, self::WRITE, $cb, $arg);
break;
default:
$event = new Event($this->reactor, $fd, $what, $cb, $arg);
break;
}
$event->add();
$this->events[(int) $fd][$what] = $event;
}
public function del($fd, $what = 'all')
{$events = $this->events[(int) $fd];
if ($what == 'all') {foreach ($events as $event) {$event->free();
}
} else {if ($what != self::READ && $what != self::WRITE) {throw new \Exception('不存在的事件');
}
$events[$what]->free();}
}
public function run()
{$this->reactor->loop();
}
public function stop()
{foreach ($this->events as $events) {foreach ($events as $event) {$event->free();
}
}
$this->reactor->stop();}
}
下面的代码很简略,简略解释一下概念,EventBase 就是个容器,外面装的 Event 实例,这么一说,下面的代码就十分好懂了。而后一个 Server.
<?php
use Throwable;
use Monolog\Handler\StreamHandler;
class Server
{
protected $ip;
protected $port;
protected $socket;
protected $reactor;
public function __construct($ip, $port)
{
$this->ip = $ip;
$this->port = $port;
}
public function start()
{$socket = $this->createTcpConnection();
stream_set_blocking($socket, false);
Reactor::getInstance()->add($socket, Reactor::READ, function($socket) {$conn = stream_socket_accept($socket);
stream_set_blocking($conn, false);
(new Connection($conn))->handle();});
Reactor::getInstance()->run();
}
public function createTcpConnection()
{$schema = sprintf("tcp://%s:%d", $this->ip, $this->port);
$socket = stream_socket_server($schema, $errno, $errstr);
if ($errno) {throw new \Exception($errstr);
}
return $socket;
}
}
Connection
<?php
class Connection
{
protected $conn;
protected $read_buffer = '';
protected $write_buffer = '';
public function __construct($conn)
{$this->conn = $conn;}
public function handle()
{Reactor::getInstance()->add($this->conn, Reactor::READ, \Closure::fromCallable([$this, 'read']));
}
private function read($conn)
{
$this->read_buffer = '';
if (is_resource($conn)) {while ($content = fread($conn, 65535)) {$this->read_buffer .= $content;}
}
if ($this->read_buffer) {Reactor::getInstance()->add($conn, Reactor::WRITE, \Closure::fromCallable([$this, 'write']));
} else {Reactor::getInstance()->del($conn);
fclose($conn);
}
}
private function write($conn)
{if (is_resource($conn)) {fwrite($conn, "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf8\r\nContent-Length:11\r\nConnection: keep-alive\r\n\r\nHello!world");
}
}
}
先创立 Socket 的三步曲,设置成非阻塞模式。而后把 socket 加到 Reactor 中监听可读事件,可读的意思就是,缓冲区有数据了,才可读。可读事件产生,阐明有新连贯来了,用 stream_socket_accept
接管新连贯 Conn,把 Conn 放到 Reactor 中监听可读事件,可读事件产生,阐明客户端有数据发送过去了,循环读直到没数据,而后把 Conn 放到 Reactor 中监听可写事件,可写事件产生,阐明客户端数据发送完了,把协定组装一下写入响应。
应用层如果是 HTTP 协定要留神一下 Connection: keep-alive 头,因为要复用连贯,不要一写完就敞开连贯。
撸完出工,用 ab
测一下并发,加 -k
参数复用连贯,i5+8G,3W 的并发没啥问题,当然咱们这儿没有磁盘 I /O,理论状况要从磁盘读取文件,读文件要通过 linux 的零碎调用,而且有几次的文件拷贝操作,花销比拟大,罕用的解决思路是 sendfile,零拷贝间接从一个 FD 到另一个 FD,效率比拟高,毛病就是 PHP 没有现成的曾经实现 sendfile 的扩大,得本人入手,开发成本有点高。
ab 测试 PO 图:
这就是 PHP 实现高并发服务器的思路了,只有是用 EPOLL 解决的,思路都一样,都是三步曲,放到 Reactor 下监听 FD 事件。当然这个只是最简略的模型,还有很多能够改良的中央,比如说多过程,剽窃一下 nginx,一个主过程 + N 个工作过程,多过程的目标还是想利用多核并行工作。
C 语言实现也是这样,只是你可能不必 libevent 库,本人封装 EPOLL, 毕竟 libevent 库有点重,你也用不到 libevent 的很多货色;当然了,C 语言有一堆的数据结构以及定义在数据结构上的操作要写,没有 GC,本人治理内存,还要有良好的设计,上多过程还得搞一搞 IPC 过程间通信的货色,开发难度比 PHP 要大地多,开发周期也很长,有趣味的同学能够本人撸一个玩。