homestead-安装swoole扩展

执行以下命令 vagrant sshsudo pecl channel-update pecl.php.netsudo pecl install swoole会看到如下提示意思就是扩展已经安装好了,需要在php.ini里面进行配置。输入命令 php -i|grep php.ini会显示php.ini 所在的目录vim /目录/php.ini, 在其中加入extension=swoole.so 重启php-fpm sudo service php7.3-fpm reload检查扩展是否安装完毕 php -m|grep swoole

April 23, 2019 · 1 min · jiezi

(二)单进程阻塞复用的网络服务器

基本概念文章开篇先脑补一些知识,有助于阅读,本篇文章主要以select为住,介绍select实现原理,并利用select来实现一个单进程阻塞复用的网络服务器。IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程,目前支持I/O多路复用有 select,poll,epoll,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作,IO多路复用适用如下场合:当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。select描述监视并等待多个文件描述符的属性变化(可读、可写或错误异常)。select函数监视的文件描述符分 3 类,分别是writefds、readfds、和 exceptfds。调用后 select会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间),函数才返回。当 select()函数返回后,可以通过遍历 fdset,来找到就绪的描述符,并且描述符最大不能超过1024poll描述poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量的限制。poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。select 与 pollselect/poll问题很明显,它们需要循环检测连接是否有事件。如果服务器有上百万个连接,在某一时间只有一个连接向服务器发送了数据,select/poll需要做循环100万次,其中只有1次是命中的,剩下的99万9999次都是无效的,白白浪费了CPU资源。epoll描述epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制,无需轮询。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中。简单点来说就是当连接有I/O流事件产生的时候,epoll就会去告诉进程哪个连接有I/O流事件产生,然后进程就去处理这个事件。网络服务器单进程阻塞复用的网络服务器 ,如下图所示描述服务监听流程如上1、保存所有的socket,通过select系统调用,监听socket描述符的可读事件2、select会在内核空间监听一旦发现socket可读,会从内核空间传递至用户空间,在用户空间通过逻辑判断是服务端socket可读,还是客户端的socket可读3、如果是服务端的socket可读,说明有新的客户端建立,将socket保留到监听数组当中4、如果是客户端的socket可读,说明当前已经可以去读取客户端发送过来的内容了,读取内容,然后响应给客户端。 缺点:1、select模式本身的缺点(1、循环遍历处理事件、2、内核空间传递数据的消耗)2、单进程对于大量任务处理乏力代码实现class Worker{ //监听socket protected $socket = NULL; //连接事件回调 public $onConnect = NULL; //接收消息事件回调 public $onMessage = NULL; public $workerNum=4; //子进程个数 public $allSocket; //存放所有socket public function __construct($socket_address) { //监听地址+端口 $this->socket=stream_socket_server($socket_address); stream_set_blocking($this->socket,0); //设置非阻塞 $this->allSocket[(int)$this->socket]=$this->socket; } public function start() { //获取配置文件 $this->fork(); } public function fork(){ $this->accept();//子进程负责接收客户端请求 } public function accept(){ //创建多个子进程阻塞接收服务端socket while (true){ $write=$except=[]; //需要监听socket $read=$this->allSocket; //状态谁改变 stream_select($read,$write,$except,60); //怎么区分服务端跟客户端 foreach ($read as $index=>$val){ //当前发生改变的是服务端,有连接进入 if($val === $this->socket){ $clientSocket=stream_socket_accept($this->socket); //阻塞监听 //触发事件的连接的回调 if(!empty($clientSocket) && is_callable($this->onConnect)){ call_user_func($this->onConnect,$clientSocket); } $this->allSocket[(int)$clientSocket]=$clientSocket; }else{ //从连接当中读取客户端的内容 $buffer=fread($val,1024); //如果数据为空,或者为false,不是资源类型 if(empty($buffer)){ if(feof($val) || !is_resource($val)){ //触发关闭事件 fclose($val); unset($this->allSocket[(int)$val]); continue; } } //正常读取到数据,触发消息接收事件,响应内容 if(!empty($buffer) && is_callable($this->onMessage)){ call_user_func($this->onMessage,$val,$buffer); } } } } }}$worker = new Worker(’tcp://0.0.0.0:9805’);//连接事件$worker->onConnect = function ($fd) { //echo ‘连接事件触发’,(int)$fd,PHP_EOL;};//消息接收$worker->onMessage = function ($conn, $message) { //事件回调当中写业务逻辑 $content=“回复的消息”; $http_resonse = “HTTP/1.1 200 OK\r\n”; $http_resonse .= “Content-Type: text/html;charset=UTF-8\r\n”; $http_resonse .= “Connection: keep-alive\r\n”; //连接保持 $http_resonse .= “Server: php socket server\r\n”; $http_resonse .= “Content-length: “.strlen($content)."\r\n\r\n”; $http_resonse .= $content; fwrite($conn, $http_resonse);};$worker->start(); //启动函数stream_socket_server在PHP中提供了一个非常方便的函数一次性创建、绑定端口、监听端口stream_set_blocking ( resource $stream , int $mode ) : bool为资源流设置阻塞或者阻塞模式,$mode 0非阻塞,1阻塞stream_socket_accept ( resource $server_socket [, float $timeout = ini_get(“default_socket_timeout”) [, string &$peername ]] ) : resource接受由 stream_socket_server() 创建的套接字连接 ...

April 20, 2019 · 1 min · jiezi

Mix PHP V2 实例:协程池异步邮件发送守护程序

去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。下面演示一个异步邮件发送系统的开发过程,涉及知识点:异步消息队列守护进程协程池如何使用消息队列实现异步PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:redisrabbitmqkafka本次我们选用 Redis 来实现异步邮件发送,Redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:// 入列$redis->lpush($key, $data);// 出列$data = $redis->rpop($key);// 阻塞出列$data = $redis->brpop($key, 10);架构设计本实例由传统 MVC 框架投递邮件发送需求(生产者),Mix PHP 编写的守护程序执行发送任务(消费者)。邮件发送库选型以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。由于发送任务是由 Mix PHP 执行,所以 swiftmailer 是安装在 Mix PHP 项目中,在项目根目录中执行以下命令安装:composer require swiftmailer/swiftmailer生产者开发在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 Mix PHP 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。在传统 MVC 框架的控制器中增加如下代码:通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。// 连接$redis = new \Redis();if (!$redis->connect(‘127.0.0.1’, 6379)) { throw new \Exception(‘Redis connect failed.’);}$redis->auth(’’);$redis->select(0);// 投递任务$data = [ ’to’ => ‘@qq.com’, ‘body’ => ‘The message content’, ‘subject’ => ‘The title content’,];$redis->lpush(‘queue:email’, serialize($data));通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有在生产者中执行,而是待消息被消费者获取后才执行。消费者开发使用本例时,请确保你使用的 Swoole 编译时开启了 openssl本例我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的邮件发送程序。因为我们是开发一个守护程序,所以我们在 applications/daemon 模块中开发,首先我们在配置 applications/daemon/config/main.php 中注册一个命令:// 命令’commands’ => [ ‘mailer’ => [‘Mailer’, ‘description’ => ‘Mailer daemon.’],],注册的命令中指定的 Mailer 命令类,接下来我们编写一个 MailerCommand 类:applications/daemon/src/Commands/MailerCommand.php<?phpnamespace Daemon\Commands;use Daemon\Libraries\MailerWorker;use Mix\Concurrent\CoroutinePool\Dispatcher;use Mix\Core\Coroutine\Channel;use Mix\Helper\ProcessHelper;/ * Class MailerCommand * @package Daemon\Commands * @author liu,jian <coder.keda@gmail.com> /class MailerCommand{ / * 退出 * @var bool / public $quit = false; /* * 主函数 / public function main() { // 捕获信号 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 协程池执行任务 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ ‘jobQueue’ => $jobQueue, ‘maxWorkers’ => $maxWorkers, ]); $dispatch->start(MailerWorker::class); // 投放任务 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop([‘queue:email’], 3); } catch (\Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最后一个键才是值 $jobQueue->push($data); } }); }}从 $data = $redis->brPop([‘queue:email’], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisor、pm2 等工具重启守护进程。上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的邮件发送代码逻辑就在 MailerWorker 类中:applications/daemon/src/Libraries/MailerWorker.php<?phpnamespace Daemon\Libraries;use Mix\Concurrent\CoroutinePool\AbstractWorker;use Mix\Concurrent\CoroutinePool\WorkerInterface;/* * Class MailerWorker * @package Daemon\Libraries * @author liu,jian <coder.keda@gmail.com> /class MailerWorker extends AbstractWorker implements WorkerInterface{ /* * 邮件发送器 * @var Mailer / public $mailer; /* * 初始化事件 / public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 实例化一些需重用的对象 $this->mailer = new Mailer(); } /* * 处理 * @param $data / public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $this->mailer->send($data[’to’], $data[‘subject’], $data[‘body’]); app()->log->info(“Mail sent successfully:to {to} subject {subject}”, $data); } catch (\Throwable $e) { app()->log->error(“Mail failed to send:to {to} subject {subject} error {error}”, array_merge($data, [’error’ => $e->getMessage()])); } }}由以上代码可见,Worker 在初始化时,新增了一个 Mailer 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Mailer 发送程序:applications/daemon/src/Libraries/Mailer.php<?phpnamespace Daemon\Libraries;use Mix\Core\Coroutine;/* * Class Mailer * @package Daemon\Libraries * @author liu,jian <coder.keda@gmail.com> /class Mailer{ /* * 配置信息 / const HOST = ‘smtpdm.aliyun.com’; const PORT = 465; const SECURITY = ‘ssl’; const USERNAME = ‘’; const PASSWORD = ‘’; /* * Mailer constructor. / public function __construct() { // 开启协程钩子 Coroutine::enableHook(); } /* * 发送 * @param $to * @param $subject * @param $body * @return int */ public function send($to, $subject, $body) { // Create the Transport $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new \Swift_Mailer($transport); // Create a message $message = (new \Swift_Message($subject)) ->setFrom([self::USERNAME => ‘**网’]) ->setTo($to) ->setBody($body); // Send the message return $mailer->send($message); }}在 Mailer 发送程序中我们使用了前面 composer 安装的 swiftmailer 库来发送邮件,以上就完成了全部的代码逻辑,现在我们开始测试。先启动消费者守护程序:[root@localhost bin]# ./mix-daemon mailer将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):[root@localhost bin]# php /tmp/push.php消费者守护程序结果:[root@localhost bin]# ./mix-daemon mailer[info] 2019-04-15 11:48:36 [message] Mail sent successfully:to ***@qq.com subject The title content命令行终端打印了发送成功的日志,发送完成。 ...

April 15, 2019 · 3 min · jiezi

swoole进程结构

一、进程的基本知识什么是进程,所谓进程其实就是操作系统中一个正在运行的程序,我们在一个终端当中,通过php,运行一个php文件,这个时候就相当于我们创建了一个进程,这个进程会在系统中驻存,申请属于它自己的内存空间系统资源并且运行相应的程序对于一个进程来说,它的核心内容分为两个部分,一个是它的内存,这个内存是这进程创建之初从系统分配的,它所有创建的变量都会存储在这一片内存环境当中一个是它的上下文环境我们知道进程是运行在操作系统的,那么对于程序来说,它的运行依赖操作系统分配给它的资源,操作系统的一些状态。在操作系统中可以运行多个进程的,对于一个进程来说,它可以创建自己的子进程,那么当我们在一个进程中创建出若干个子进程的时候那么可以看到如图,子进程和父进程一样,拥有自己的内存空间和上下文环境二、Swoole进程结构Swoole的高效不仅仅于底层使用c编写,他的进程结构模型也使其可以高效的处理业务,我们想要深入学习,并且在实际的场景当中使用必须了解,下面我们先看一下结构图:首先先介绍下swoole的这几种进程分别是干什么的:从这些层级的名字,我们先大概说一下,下面这些层级分别是干什么的,做一个详细的说明。Master进程:主进程Manger进程:管理进程Worker进程:工作进程Task进程:异步任务工作进程1、Master进程第一层,Master进程,这个是swoole的主进程,这个进程是用于处理swoole的核心事件驱动的,那么在这个进程当中可以看到它拥有一个MainReactor[线程]以及若干个Reactor[线程],swoole所有对于事件的监听都会在这些线程中实现,比如来自客户端的连接,信号处理等。每一个线程都有自己的用途,下面多每个线程有一个了解MainReactor(主线程)主线程会负责监听server socket,如果有新的连接accept,主线程会评估每个Reactor线程的连接数量。将此连接分配给连接数最少的reactor线程,做一个负载均衡。Reactor线程组Reactor线程负责维护客户端机器的TCP连接、处理网络IO、收发数据完全是异步非阻塞的模式。swoole的主线程在Accept新的连接后,会将这个连接分配给一个固定的Reactor线程,在socket可读时读取数据,并进行协议解析,将请求投递到Worker进程。在socket可写时将数据发送给TCP客户端。心跳包检测线程(HeartbeatCheck)Swoole配置了心跳检测之后,心跳包线程会在固定时间内对所有之前在线的连接发送检测数据包UDP收包线程(UdpRecv)接收并且处理客户端udp数据包2、管理进程ManagerSwoole想要实现最好的性能必须创建出多个工作进程帮助处理任务,但Worker进程就必须fork操作,但是fork操作是不安全的,如果没有管理会出现很多的僵尸进程,进而影响服务器性能,同时worker进程被误杀或者由于程序的原因会异常退出,为了保证服务的稳定性,需要重新创建worker进程。Swoole在运行中会创建一个单独的管理进程,所有的worker进程和task进程都是从管理进程Fork出来的。管理进程会监视所有子进程的退出事件,当worker进程发生致命错误或者运行生命周期结束时,管理进程会回收此进程,并创建新的进程。换句话也就是说,对于worker、task进程的创建、回收等操作全权有“保姆”Manager进程进行管理。再来一张图梳理下Manager进程和Worker/Task进程的关系。3、Worker进程worker 进程属于swoole的主逻辑进程,用户处理客户端的一系列请求,接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端可以是异步非阻塞模式,也可以是同步阻塞模式4、Task进程taskWorker进程这一进城是swoole提供的异步工作进程,这些进程主要用于处理一些耗时较长的同步任务,在worker进程当中投递过来。三、进程查看及流程梳理当启动一个Swoole应用时,一共会创建2 + n + m个进程,2为一个Master进程和一个Manager进程,其中n为Worker进程数。m为TaskWorker进程数。默认如果不设置,swoole底层会根据当前机器有多少CPU核数,启动对应数量的Reactor线程和Worker进程。我机器为1核的。Worker为1。所以现在默认我启动了1个Master进程,1个Manager进程,和1个worker进程,TaskWorker没有设置也就是为0,当前server会产生3个进程。在启动了server之后,在命令行查看当前产生的进程这三个进程中,所有进程的根进程,也就是例子中的2123进程,就是所谓的Master进程;而2212进程,则是Manager进程;最后的2321进程,是Worker进程。client跟server的交互1、client请求到达 Main Reactor,Client实际上是与Master进程中的某个Reactor线程发生了连接。2、Main Reactor根据Reactor的情况,将请求注册给对应的Reactor (每个Reactor都有epoll。用来监听客户端的变化) 3、客户端有变化时Reactor将数据交给worker来处理4、worker处理完毕,通过进程间通信(比如管道、共享内存、消息队列)发给对应的reactor。 5、reactor将响应结果发给相应的连接请求处理完成示意图:后续准备本文是在自己学习Swoole接触到的一些知识,在初步整理后发送出来,希望能与大家一起学习,文章不足等问题大家可以一起讨论学习,欢迎骚扰~~。后面准备从网络模型入手更好的理解swoole的实现原理,比较与传统PHP-FPM工作模式的问题,之前出过一篇关于(一)如何实现一个单进程阻塞的网络服务器大家可以先了解下,如何一步步演变为多进程master-worker模型。欢迎大家指正文章问题~

April 13, 2019 · 1 min · jiezi

一个简单混合协议通讯列子,物联网和互联网通讯。

这个列子主要讨论Tcp,WebSocket和http之间的通讯。长连接和长连接通讯,长连接和短连接通讯。其他协议同理可得Tcp: 代表硬件设备WebSocket: 代表客户端 http: 代表网页 本列子是基于one框架 (https://github.com/lizhichao/one) 开发.配置协议 监听端口由于swoole的模型 WebSocket server 包含 http server , http server 包含 tcp server 。所以我们配置主服务为 WebSocket server ,添加两个http 和 tcp 监听。配置文件如下:return [ ‘server’ => [ ‘server_type’ => \One\Swoole\OneServer::SWOOLE_WEBSOCKET_SERVER, ‘port’ => 8082, ‘action’ => \App\Test\MixPro\Ws::class, ‘mode’ => SWOOLE_PROCESS, ‘sock_type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, ‘set’ => [ ‘worker_num’ => 5 ] ], ‘add_listener’ => [ // http 监听 [ ‘port’ => 8081, ‘action’ => \App\Server\AppHttpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, ‘set’ => [ ‘open_http_protocol’ => true, ‘open_websocket_protocol’ => false ] ], // tcp 监听 [ ‘port’ => 8083, ‘pack_protocol’ => \One\Protocol\Text::class, // tcp 打包,解包协议,方便在终端调试 我们使用 text 协议. 换行符 表示一个包的结束 ‘action’ => \App\Test\MixPro\TcpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, ‘set’ => [ ‘open_http_protocol’ => false, ‘open_websocket_protocol’ => false ] ] ]];接下来去 \App\Test\MixPro\Ws 和 \App\Test\MixPro\TcpPort 实现各种事件处理。\App\Server\AppHttpPort 是框架内置的,通过路由处理http请求的,配置路由即可。配置路由// 首页Router::get(’/mix’, [ ‘use’ => HttpController::class . ‘@index’, ‘middle’ => [\App\Test\MixPro\TestMiddle::class . ‘@isLogin’] // 中间件 如果用户登录了 直接跳转到相应的页面]);Router::group([ ‘middle’ => [\App\Test\MixPro\TestMiddle::class . ‘@checkSession’] // 中间件 让用户登录后 才能进入聊天页面 http websocket 都能获取到这个 session ], function () { // websocket 页面 Router::get(’/mix/ws’, HttpController::class . ‘@ws’); // http 页面 Router::get(’/mix/http’, HttpController::class . ‘@http’); // http 轮训消息接口 Router::post(’/mix/http/loop’, HttpController::class . ‘@httpLoop’); // http 发送消息接口 Router::post(’/mix/http/send’, HttpController::class . ‘@httpSend’);});配置的都是 http 协议路由。 websocket和tpc我们直接在回调action处理。如果你的项目复杂也可以配置相应的路由。one框架的路由支持任何协议,使用方法也是统一的。处理tcp协议其中__construct,onConnect,onClose 不是必须的。 如果你想在服务器运行开始时最一些事情就写到 __construct里面。 onConnect 当有客户端连接时触发,每个客户端触发一次 onClose 当有客户端连接断开时触发,每个客户端触发一次class TcpPort extends Tcp{ use Funs; private $users = []; /** * @var Ws / protected $server; /* * @var Client / protected $global_data; public function __construct($server, $conf) { parent::__construct($server, $conf); $this->global_data = $this->server->global_data; } // 终端连接上服务器时 public function onConnect(\swoole_server $server, $fd, $reactor_id) { $name = uuid(); $this->users[$fd] = $name; $this->sendTo(‘all’, json_encode([‘v’ => 1, ’n’ => $name])); $this->sendToTcp($fd, json_encode([‘v’ => 4, ’n’ => $this->getAllName()])); $this->global_data->bindId($fd, $name); $this->send($fd, “你的名字是:” . $name); } // 消息处理 像某个name 发送消息 public function onReceive(\swoole_server $server, $fd, $reactor_id, $data) { $arr = explode(’ ‘, $data); if (count($arr) !== 3 || $arr[0] !== ‘send’) { $this->send($fd, “格式不正确”); return false; } $n = $arr[1]; $d = $arr[2]; $this->sendTo($n, json_encode([‘v’ => 3, ’n’ => $d])); } // 下线 通知所有其他终端,解除与fd的关系绑定。 public function onClose(\swoole_server $server, $fd, $reactor_id) { echo “tcp close {$fd} \n”; $this->global_data->unBindFd($fd); $this->sendTo(‘all’, json_encode([‘v’ => 2, ’n’ => $this->users[$fd]])); unset($this->users[$fd]); }}定义了一个公共的traitFuns主要实现两个方法,获取所有的终端(tcp,ws,http),和向某个用户发送消息 。在ws、http都会用到这个 在构造函数我们初始化了一个 global_data 用来保存,名称和fd的关系。你也可以使用方式储存。因为fd没次连接都不同。global_data是one框架内置的。 终端连接上服务器时触发事件 onConnect ,我们给这个终端取个名字,并把关系保存在 global_data。 通知所有终端有个新终端加入,并告诉刚加入的终端当前有哪些终端在线。处理 websocket 协议其中__construct,onHandShake,onOpen,onClose 不是必须的。onHandShake,onOpen 是配合使用的,如果onOpen返回false服务器会拒绝连接。在 onOpen,onMessage,onClose可以拿到当前用户的session信息和http是相通的。class Ws extends WsServer{ use Funs; private $users = []; /* * @var Client / public $global_data = null; public function __construct(\swoole_server $server, array $conf) { parent::__construct($server, $conf); $this->global_data = new Client(); } // 初始化session public function onHandShake(\swoole_http_request $request, \swoole_http_response $response) { return parent::onHandShake($request, $response); } // ws 发送消息 public function onMessage(\swoole_websocket_server $server, \swoole_websocket_frame $frame) { $data = $frame->data; $arr = json_decode($data, true); $n = $arr[’n’]; $d = $arr[’d’]; $this->sendTo($n, json_encode([‘v’ => 3, ’n’ => $d])); } // 判断用户是否登录 如果没有登录拒绝连接 public function onOpen(\swoole_websocket_server $server, \swoole_http_request $request) { $name = $this->session[$request->fd]->get(’name’); if ($name) { $this->users[$request->fd] = $name; $this->sendTo(‘all’, json_encode([‘v’ => 1, ’n’ => $name])); $this->global_data->bindId($request->fd, $name); return true; } else { return false; } } // ws 断开清除信息 public function onClose(\swoole_server $server, $fd, $reactor_id) { echo “ws close {$fd} \n”; $this->global_data->unBindFd($fd); $this->sendTo(‘all’, json_encode([‘v’ => 2, ’n’ => $this->users[$fd]])); unset($this->users[$fd]); }}处理 http 协议主要是 httpLoop 方法,轮训获取消息。因为http是短连接,发给http的信息我们是先存放在$global_data,然后直接这里读取。防止连接间隙丢信息。class HttpController extends Controller{ use Funs; /* * @var Ws / protected $server; /* * @var Client / protected $global_data; public function __construct($request, $response, $server = null) { parent::__construct($request, $response, $server); $this->global_data = $this->server->global_data; } /* * 首页 / public function index() { $code = sha1(uuid()); $this->session()->set(‘code’, $code); return $this->display(‘index’, [‘code’ => $code]); } /* * ws页面 / public function ws() { $name = $this->session()->get(’name’); if (!$name) { $name = uuid(); $this->session()->set(’name’, $name); } return $this->display(‘ws’,[‘users’ => $this->getAllName(),’name’ => $name]); } /* * http 页面 / public function http() { $name = $this->session()->get(’name’); if (!$name) { $name = uuid(); $this->session()->set(’name’, $name); } $this->global_data->set(“http.{$name}”, 1, time() + 60); $this->sendTo(‘all’, json_encode([‘v’ => 1, ’n’ => $name])); return $this->display(‘http’, [’list’ => $this->getAllName(), ’name’ => $name]); } /* * http轮训 / public function httpLoop() { $name = $this->session()->get(’name’); $this->global_data->set(“http.{$name}”, 1, time() + 60); $i = 0; do { $data = $this->global_data->getAndDel(“data.{$name}”); $i++; \co::sleep(0.1); } while ($data === null && $i < 300); if ($data) { foreach ($data as &$v) { $v = json_decode($v, true); } } else { $data = []; } return $this->json($data); } /* * http发送消息 */ public function httpSend() { $n = $this->request->post(’n’); $d = $this->request->post(’d’); if ($n && $d) { $this->sendTo($n, json_encode([‘v’ => 3, ’n’ => $d])); return ‘1’; } return ‘0’; } public function __destruct() { } public function __call($name, $arguments) { return $this->server->$name(…$arguments); }}到此基本就完成了。你可以去看完整的代码 : 点这里 其他的一些列子 : https://github.com/lizhichao/… ...

April 10, 2019 · 4 min · jiezi

Mix PHP V2 新特性:协程、定时器

协程Mix PHP V2 基于 Swoole 4 的 PHP Stream Hook 协程技术开发,协程使用方式与 Golang 几乎一致,包括框架封装的协程池、连接池、命令行处理都大量参考了 Golang 的系统库风格。除了缺少 select case 外,Mix PHP 与 Golang 的协程几乎一致,框架还提供了连接池、协程池、命令行处理这些开箱即用的封装。xgo + Channelxgo 类似 Golang 的 go 关键字,可启动一个新的协程,Channel 等于 Golang 的 chan 类,负责在不同协程中传递数据。<?phpnamespace Console\Commands;use Mix\Core\Coroutine\Channel;use Mix\Core\Event;/** * Class CoroutineCommand * @package Console\Commands * @author liu,jian <coder.keda@gmail.com> /class CoroutineCommand{ /* * 主函数 / public function main() { xgo(function () { $time = time(); $chan = new Channel(); for ($i = 0; $i < 2; $i++) { xgo([$this, ‘foo’], $chan); } for ($i = 0; $i < 2; $i++) { $result = $chan->pop(); } println(‘Total time: ’ . (time() - $time)); }); Event::wait(); } /* * 查询数据 * @param Channel $chan / public function foo(Channel $chan) { $db = app()->dbPool->getConnection(); $result = $db->createCommand(‘select sleep(5)’)->queryAll(); $db->release(); // 不手动释放的连接不会归还连接池,会在析构时丢弃 $chan->push($result); }}执行结果为 5s,说明是并行执行的。WaitGroup + xdeferWaitGroup 与 Golang 的完全一致,xdefer 方法也等同于 Golang 的 defer 关键字。当并行执行且不需要返回结果时,可以使用 WaitGroup + xdefer,xdefer 即使在方法抛出异常时,仍然会执行,这样能避免一直处于阻塞状态。<?phpnamespace Console\Commands;use Mix\Concurrent\Sync\WaitGroup;use Mix\Core\Event;/* * Class WaitGroupCommand * @package Console\Commands * @author liu,jian <coder.keda@gmail.com> /class WaitGroupCommand{ /* * 主函数 / public function main() { xgo(function () { $wg = WaitGroup::new(); for ($i = 0; $i < 2; $i++) { $wg->add(1); xgo([$this, ‘foo’], $wg); } $wg->wait(); println(‘All done!’); }); Event::wait(); } /* * 查询数据 * @param WaitGroup $wg / public function foo(WaitGroup $wg) { xdefer(function () use ($wg) { $wg->done(); }); println(‘work’); throw new \RuntimeException(‘ERROR’); }}即便抛出了 RuntimeException 异常,仍然能执行到 println(‘All done!’);,没有导致 wg 内的 chan 一直处于阻塞状态。定时器异步编程中,定时器的使用非常频繁。Timer::new() 可获得一个实例after 方法可设置一次性定时tick 方法可设置持续定时停止当前定时期,只需只需对象的 $timer->clear(); 方法。<?phpnamespace Console\Commands;use Mix\Core\Event;use Mix\Core\Timer;/* * Class TimerCommand * @package Console\Commands * @author liu,jian <coder.keda@gmail.com> /class TimerCommand{ /* * 主函数 */ public function main() { // 一次性定时 Timer::new()->after(1000, function () { println(time()); }); // 持续定时 $timer = new Timer(); $timer->tick(1000, function () { println(time()); }); // 停止定时 Timer::new()->after(10000, function () use ($timer) { $timer->clear(); }); Event::wait(); }} ...

April 10, 2019 · 2 min · jiezi

【宇润日常疯测-007】Swoole 协程与传统 fpm 同步模式比较

如果说数组是 PHP 的精髓,数组玩得不6的,根本不能算是会用PHP。那协程对于 Swoole 也是同理,不理解协程去用 Swoole,那就是在瞎用。首先,Swoole 只能运行在命令行(Cli)模式下,所以我们开发调试都是使用命令行,而不是 php-fpm/apache 等。在 Swoole 中,我们可以使用\Swoole\Coroutine::create()创建协程,或者你也可以使用简写go()。初识 Swoole 协程go(function(){ go(function(){ echo 0, PHP_EOL; }); echo 1, PHP_EOL;});go(function(){ echo 2, PHP_EOL;});go(function(){ echo 3, PHP_EOL;});执行结果:0123Swoole 协程与同步模式比较我们一直在说 Swoole 协程适合用于 I/O 密集场景,在同样的硬件配置环境下,它会比传统的同步模式承载更多的访问量。我们熟悉的文件读写、网络通讯请求(MySQL、Redis、Http等)都是属于 I/O 密集型场景。假设一次 SQL 查询为 100ms,在传统同步模式下,当前进程在这 100ms 的时间里,是不能做其它操作的。如果要执行十次这个 SQL,可能需要耗费 1s 以上。而如果用协程,虽然不同协程之间也是按顺序执行,但是在前一个等待 100ms 期间,底层会调度 CPU,去执行其它协程的操作。也就是说,可能第一个查询还没返回结果,其它几个查询就已经发送给了 MySQL 并正在执行中了。如果开启十个协程,分别执行这个 SQL,可能只需要耗费 100+ms 即可完成。测试代码如下:Swoole\Runtime::enableCoroutine(); // 开启一键协程化function work(){ $pdo = new \PDO(‘mysql:host=127.0.0.1;dbname=db_test’, ‘root’, ‘root’); $pdo->exec(‘select SLEEP(0.1)’); // 模拟sql需要执行 100ms 的情况}$time = microtime(true);for($i = 0; $i < 10; ++$i){ work();}echo ’time: ‘, (microtime(true) - $time), ’s’, PHP_EOL;$time = microtime(true);for($i = 0; $i < 10; ++$i){ go(‘work’);}swoole_event_wait(); // 等待所有协程执行完echo ’time: ‘, (microtime(true) - $time), ’s’, PHP_EOL;执行结果:time: 1.0326268672943stime: 0.10734605789185s上面的代码可以假想为,单进程处理 10 个请求所需的时间。每个请求需要执行一次耗费 100ms 的 SQL 语句。同步模式,耗费 1s 左右的是 fpm。可以看出,在等待 100ms 期间是不能做任何事情的。协程模型,耗费 0.1s 左右的是 Swoole。在等待 100ms 期间会挂起当前协程,底层调度会让 CPU 去执行其它协程的操作。 ...

April 9, 2019 · 1 min · jiezi

PHP并发IO编程之路

并发 IO 问题一直是服务器端编程中的技术难题,从最早的同步阻塞直接 Fork 进程,到 Worker 进程池/线程池,到现在的异步IO、协程。PHP 程序员因为有强大的 LAMP 框架,对这类底层方面的知识知之甚少,本文目的就是详细介绍 PHP 进行并发 IO 编程的各种尝试,最后再介绍 Swoole 的使用,深入浅出全面解析并发 IO 问题。多进程/多线程同步阻塞最早的服务器端程序都是通过多进程、多线程来解决并发IO的问题。进程模型出现的最早,从 Unix 系统诞生就开始有了进程的概念。最早的服务器端程序一般都是 Accept 一个客户端连接就创建一个进程,然后子进程进入循环同步阻塞地与客户端连接进行交互,收发处理数据。多线程模式出现要晚一些,线程与进程相比更轻量,而且线程之间是共享内存堆栈的,所以不同的线程之间交互非常容易实现。比如聊天室这样的程序,客户端连接之间可以交互,比聊天室中的玩家可以任意的其他人发消息。用多线程模式实现非常简单,线程中可以直接向某一个客户端连接发送数据。而多进程模式就要用到管道、消息队列、共享内存,统称进程间通信(IPC)复杂的技术才能实现。代码实例:多进程/线程模型的流程是创建一个 socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤,当然也可以使用更底层的sockets扩展分别实现。进入while循环,阻塞在accept操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程。accept函数返回客户端连接的socket主进程在多进程模型下通过fork(php: pcntl_fork)创建子进程,多线程模型下使用pthread_create(php: new Thread)创建子线程。下文如无特殊声明将使用进程同时表示进程/线程。子进程创建成功后进入while循环,阻塞在recv(php: fread)调用上,等待客户端向服务器发送数据。收到数据后服务器程序进行处理然后使用send(php: fwrite)向客户端发送响应。长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会close。当客户端连接关闭时,子进程退出并销毁所有资源。主进程会回收掉此子进程。这种模式最大的问题是,进程/线程创建和销毁的开销很大。所以上面的模式没办法应用于非常繁忙的服务器程序。对应的改进版解决了此问题,这就是经典的 Leader-Follower 模型。代码实例:它的特点是程序启动后就会创建N个进程。每个子进程进入 Accept,等待新的连接进入。当客户端连接到服务器时,其中一个子进程会被唤醒,开始处理客户端请求,并且不再接受新的TCP连接。当此连接关闭时,子进程会释放,重新进入 Accept ,参与处理新的连接。这个模型的优势是完全可以复用进程,没有额外消耗,性能非常好。很多常见的服务器程序都是基于此模型的,比如 Apache 、PHP-FPM。多进程模型也有一些缺点。这种模型严重依赖进程的数量解决并发问题,一个客户端连接就需要占用一个进程,工作进程的数量有多少,并发处理能力就有多少。操作系统可以创建的进程数量是有限的。启动大量进程会带来额外的进程调度消耗。数百个进程时可能进程上下文切换调度消耗占CPU不到1%可以忽略不计,如果启动数千甚至数万个进程,消耗就会直线上升。调度消耗可能占到 CPU 的百分之几十甚至 100%。另外有一些场景多进程模型无法解决,比如即时聊天程序(IM),一台服务器要同时维持上万甚至几十万上百万的连接(经典的C10K问题),多进程模型就力不从心了。还有一种场景也是多进程模型的软肋。通常Web服务器启动100个进程,如果一个请求消耗100ms,100个进程可以提供1000qps,这样的处理能力还是不错的。但是如果请求内要调用外网Http接口,像QQ、微博登录,耗时会很长,一个请求需要10s。那一个进程1秒只能处理0.1个请求,100个进程只能达到10qps,这样的处理能力就太差了。有没有一种技术可以在一个进程内处理所有并发IO呢?答案是有,这就是IO复用技术。IO复用/事件循环/异步非阻塞其实IO复用的历史和多进程一样长,Linux很早就提供了 select 系统调用,可以在一个进程内维持1024个连接。后来又加入了poll系统调用,poll做了一些改进,解决了 1024 限制的问题,可以维持任意数量的连接。但select/poll还有一个问题就是,它需要循环检测连接是否有事件。这样问题就来了,如果服务器有100万个连接,在某一时间只有一个连接向服务器发送了数据,select/poll需要做循环100万次,其中只有1次是命中的,剩下的99万9999次都是无效的,白白浪费了CPU资源。直到Linux 2.6内核提供了新的epoll系统调用,可以维持无限数量的连接,而且无需轮询,这才真正解决了 C10K 问题。现在各种高并发异步IO的服务器程序都是基于epoll实现的,比如Nginx、Node.js、Erlang、Golang。像 Node.js 这样单进程单线程的程序,都可以维持超过1百万TCP连接,全部归功于epoll技术。IO复用异步非阻塞程序使用经典的Reactor模型,Reactor顾名思义就是反应堆的意思,它本身不处理任何数据收发。只是可以监视一个socket句柄的事件变化。Reactor有4个核心的操作:add添加socket监听到reactor,可以是listen socket也可以使客户端socket,也可以是管道、eventfd、信号等set修改事件监听,可以设置监听的类型,如可读、可写。可读很好理解,对于listen socket就是有新客户端连接到来了需要accept。对于客户端连接就是收到数据,需要recv。可写事件比较难理解一些。一个SOCKET是有缓存区的,如果要向客户端连接发送2M的数据,一次性是发不出去的,操作系统默认TCP缓存区只有256K。一次性只能发256K,缓存区满了之后send就会返回EAGAIN错误。这时候就要监听可写事件,在纯异步的编程中,必须去监听可写才能保证send操作是完全非阻塞的。del从reactor中移除,不再监听事件callback就是事件发生后对应的处理逻辑,一般在add/set时制定。C语言用函数指针实现,JS可以用匿名函数,PHP可以用匿名函数、对象方法数组、字符串函数名。Reactor只是一个事件发生器,实际对socket句柄的操作,如connect/accept、send/recv、close是在callback中完成的。具体编码可参考下面的伪代码:Reactor模型还可以与多进程、多线程结合起来用,既实现异步非阻塞IO,又利用到多核。目前流行的异步服务器程序都是这样的方式:如Nginx:多进程ReactorNginx+Lua:多进程Reactor+协程Golang:单线程Reactor+多线程协程Swoole:多线程Reactor+多进程Worker协程是什么协程从底层技术角度看实际上还是异步IO Reactor模型,应用层自行实现了任务调度,借助Reactor切换各个当前执行的用户态线程,但用户代码中完全感知不到Reactor的存在。PHP并发IO编程实践PHP相关扩展Stream:PHP内核提供的socket封装Sockets:对底层Socket API的封装Libevent:对libevent库的封装Event:基于Libevent更高级的封装,提供了面向对象接口、定时器、信号处理的支持Pcntl/Posix:多进程、信号、进程管理的支持Pthread:多线程、线程管理、锁的支持PHP还有共享内存、信号量、消息队列的相关扩展PECL:PHP的扩展库,包括系统底层、数据分析、算法、驱动、科学计算、图形等都有。如果PHP标准库中没有找到,可以在PECL寻找想要的功能。PHP语言的优劣势PHP的优点:第一个是简单,PHP比其他任何的语言都要简单,入门的话PHP真的是可以一周就入门。C++有一本书叫做《21天深入学习C++》,其实21天根本不可能学会,甚至可以说C++没有3-5年不可能深入掌握。但是PHP绝对可以7天入门。所以PHP程序员的数量非常多,招聘比其他语言更容易。PHP的功能非常强大,因为PHP官方的标准库和扩展库里提供了做服务器编程能用到的99%的东西。PHP的PECL扩展库里你想要的任何的功能。另外PHP有超过20年的历史,生态圈是非常大的,在Github可以找到很多代码。PHP的缺点:性能比较差,因为毕竟是动态脚本,不适合做密集运算,如果同样的 PHP 程序使用 C/C++ 来写,PHP 版本要比它差一百倍。函数命名规范差,这一点大家都是了解的,PHP更讲究实用性,没有一些规范。一些函数的命名是很混乱的,所以每次你必须去翻PHP的手册。提供的数据结构和函数的接口粒度比较粗。PHP只有一个Array数据结构,底层基于HashTable。PHP的Array集合了Map,Set,Vector,Queue,Stack,Heap等数据结构的功能。另外PHP有一个SPL提供了其他数据结构的类封装。所以PHPPHP更适合偏实际应用层面的程序,业务开发、快速实现的利器PHP不适合开发底层软件使用C/C++、JAVA、Golang等静态编译语言作为PHP的补充,动静结合借助IDE工具实现自动补全、语法提示PHP的Swoole扩展基于上面的扩展使用纯PHP就可以完全实现异步网络服务器和客户端程序。但是想实现一个类似于多IO线程,还是有很多繁琐的编程工作要做,包括如何来管理连接,如何来保证数据的收发原子性,网络协议的处理。另外PHP代码在协议处理部分性能是比较差的,所以我启动了一个新的开源项目Swoole,使用C语言和PHP结合来完成了这项工作。灵活多变的业务模块使用PHP开发效率高,基础的底层和协议处理部分用C语言实现,保证了高性能。它以扩展的方式加载到了PHP中,提供了一个完整的网络通信的框架,然后PHP的代码去写一些业务。它的模型是基于多线程Reactor+多进程Worker,既支持全异步,也支持半异步半同步。Swoole的一些特点:Accept线程,解决Accept性能瓶颈和惊群问题多IO线程,可以更好地利用多核提供了全异步和半同步半异步2种模式处理高并发IO的部分用异步模式复杂的业务逻辑部分用同步模式底层支持了遍历所有连接、互发数据、自动合并拆分数据包、数据发送原子性。Swoole的进程/线程模型:Swoole程序的执行流程:使用PHP+Swoole扩展实现异步通信编程实例代码在https://github.com/swoole/swo… 主页查看。TCP服务器与客户端异步TCP服务器:在这里new swoole_server对象,然后参数传入监听的HOST和PORT,然后设置了3个回调函数,分别是onConnect有新的连接进入、onReceive收到了某一个客户端的数据、onClose某个客户端关闭了连接。最后调用start启动服务器程序。swoole底层会根据当前机器有多少CPU核数,启动对应数量的Reactor线程和Worker进程。异步客户端:客户端的使用方法和服务器类似只是回调事件有4个,onConnect成功连接到服务器,这时可以去发送数据到服务器。onError连接服务器失败。onReceive服务器向客户端连接发送了数据。onClose连接关闭。设置完事件回调后,发起connect到服务器,参数是服务器的IP,PORT和超时时间。同步客户端:同步客户端不需要设置任何事件回调,它没有Reactor监听,是阻塞串行的。等待IO完成才会进入下一步。异步任务:异步任务功能用于在一个纯异步的Server程序中去执行一个耗时的或者阻塞的函数。底层实现使用进程池,任务完成后会触发onFinish,程序中可以得到任务处理的结果。比如一个IM需要广播,如果直接在异步代码中广播可能会影响其他事件的处理。另外文件读写也可以使用异步任务实现,因为文件句柄没办法像socket一样使用Reactor监听。因为文件句柄总是可读的,直接读取文件可能会使服务器程序阻塞,使用异步任务是非常好的选择。异步毫秒定时器这2个接口实现了类似JS的setInterval、setTimeout函数功能,可以设置在n毫秒间隔实现一个函数或 n毫秒后执行一个函数。异步MySQL客户端swoole还提供一个内置连接池的MySQL异步客户端,可以设定最大使用MySQL连接数。并发SQL请求可以复用这些连接,而不是重复创建,这样可以保护MySQL避免连接资源被耗尽。异步Redis客户端异步的Web程序程序的逻辑是从Redis中读取一个数据,然后显示HTML页面。使用ab压测性能如下:同样的逻辑在php-fpm下的性能测试结果如下:WebSocket程序swoole内置了websocket服务器,可以基于此实现Web页面主动推送的功能,比如WebIM。有一个开源项目可以作为参考。https://github.com/matyhtf/ph…

April 8, 2019 · 1 min · jiezi

Swoole跟thinkphp5结合开发WebSocket在线聊天通讯系统

ThinkPHP使用Swoole需要安装 think-swoole Composer包,前提系统已经安装好了Swoole PECL 拓展tp5的项目根目录下执行composer命令安装think-swoole:composer require topthink/think-swoole话不多说,直接上代码:新建WebSocket.php控制器:(监听端口要确认服务器放行,宝塔环境还需要添加安全组规则)<?php namespace app\home\controller;use think\swoole\Server;class WebSocket extends Server{ protected $host = ‘0.0.0.0’; //监听所有地址 protected $port = 9501; //监听9501端口 protected $serverType = ‘socket’; protected $option = [ ‘worker_num’=> 4, //设置启动的Worker进程数 ‘daemonize’ => false, //守护进程化(上线改为true) ‘backlog’ => 128, //Listen队列长度 ‘dispatch_mode’ => 2, //固定模式,保证同一个连接发来的数据只会被同一个worker处理 //心跳检测:每60秒遍历所有连接,强制关闭10分钟内没有向服务器发送任何数据的连接 ‘heartbeat_check_interval’ => 60, ‘heartbeat_idle_time’ => 600 ]; //建立连接时回调函数 public function onOpen($server,$req) { $fd = $req->fd;//客户端标识 $uid = $req->get[‘uid’];//客户端传递的用户id $token = $req->get[’token’];//客户端传递的用户登录token //省略token验证逻辑…… if (!$token) { $arr = array(‘status’=>2,‘message’=>’token已过期’); $server->push($fd, json_encode($arr)); $server->close($fd); return; } //省略给用户绑定fd逻辑…… echo “用户{$uid}建立了连接,标识为{$fd}\n”; } //接收数据时回调函数 public function onMessage($server,$frame) { $fd = $frame->fd; $message = $frame->data; //省略通过fd查询用户uid逻辑…… $uid = 666; $data[‘uid’] = $uid; $data[‘message’] = ‘用户’.$uid.‘发送了:’.$message; $data[‘post_time’] = date(“m/d H:i”,time()); $arr = array(‘status’=>1,‘message’=>‘success’,‘data’=>$data); //仅推送给当前连接用户 //$server->push($fd, json_encode($arr)); //推送给全部连接用户 foreach($server->connections as $fd) { $server->push($fd, json_encode($arr)); } } //连接关闭时回调函数 public function onClose($server,$fd) { echo “标识{$fd}关闭了连接\n”; }}前端演示页面:(省略控制器判断登录状态、分配数据逻辑……)<!DOCTYPE html><html lang=“en”><head><meta charset=“UTF-8” /><meta http-equiv=“X-UA-Compatible” content=“IE=edge,chrome=1” /><meta name=“viewport” content=“width=device-width,initial-scale=1,minimum-scale=1,maximum-scale=1,user-scalable=no” /><title>Chat</title><link rel=“stylesheet” type=“text/css” href="/static/liaotian/chat.css" /><script src="/static/liaotian/js/jquery.min.js"></script><script src="/static/liaotian/js/flexible.js"></script></head><body> <header class=“header”> <a class=“back” href=“javascript:history.back()"></a> <h5 class=“tit”>在线聊天</h5> <a href=”"><div class=“right”>退出</div></a> </header> <!– 聊天内容 start–> <div class=“message”> </div> <!– 聊天内容 end–> <!– 底部 start–> <div class=“footer”> <img id=“setbtn” src="/static/liaotian/images/hua.png" alt="" /> <img src="/static/liaotian/images/xiaolian.png" alt="" /> <input type=“text” id=“msg” value="" maxlength=“300”> <p style=“background: rgb(17, 79, 142);” id=“sendBtn”>发送</p> </div> <!– 底部 end–></body></html><script src=“http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script><script src=“https://cdn.bootcss.com/layer/3.1.0/layer.js"></script><script type=“text/javascript”>$(function () { var uid = 666;//当前用户id var token = ‘abcdefg’;//用户token //判断浏览器是否支持WebSocket var supportsWebSockets = ‘WebSocket’ in window || ‘MozWebSocket’ in window; if (supportsWebSockets) { //建立WebSocket连接(ip地址换成自己主机ip) var ws = new WebSocket(“ws://127.0.0.1:9501?uid="+uid+"&token="+token); ws.onopen = function () { layer.msg(‘服务器连接成功’,{shade:0.1,icon:1,time:600}); }; ws.onerror = function () { layer.msg(‘服务器连接失败’,{shade:0.1,icon:2,time:600}); }; ws.onmessage = function (evt) { var data = $.parseJSON(evt.data); //错误提示 if(data.status != 1){ layer.alert(data.message,{icon:2}); return; } //消息返回 if (data.status==1 && data.data.message!=’’) { var html = “”; if (data.data.uid == uid) { html += “<div style=‘word-break:break-all’ class="show"><div class="time">"+data.data.post_time+"</div><div class="msg"><img src=""+data.data.head_img+”" alt="" /><p><i clas="msg_input"></i>"+data.data.message+"</p></div></div>”; }else{ html += “<div style=‘word-break:break-all’ class="send"><div class="time">"+data.data.post_time+"</div><div class="msg"><img src=""+data.data.head_img+”" alt="" /><p><i clas="msg_input"></i>"+data.data.message+"</p></div></div>”; } } $(".message”).append(html); setTimeout(function () { ($(’.message’).children(“div:last-child”)[0]).scrollIntoView();//向上滚动 },100); }; ws.onclose = function (res) { }; //按钮发送 $("#sendBtn").click(function () { var contents = $("#msg").val().trim(); if(contents == null || contents == “”){ layer.msg(‘内容为空’,{shade:0.1,icon:2,time:600}); return false; }else{ ws.send(contents); $("#msg").val(""); } }); //回车发送 $("#msg").keydown(function (evel) { var that = $(this); if (evel.keyCode == 13) { evel.cancelBubble = true; evel.preventDefault(); evel.stopPropagation(); var contents = that.val().trim(); if(contents == null || contents == “”){ layer.msg(‘内容为空’,{shade:0.1,icon:2,time:600}); return false; }else{ ws.send(contents); that.val(""); } } }); }else{ layer.alert(“您的浏览器不支持 WebSocket!”); }});</script>服务器移到项目根目录开启服务:php public/index.php Websocket/start这里的路径,是因为我绑定了home模块为默认模块,tp5默认情况是:php public/index.php index/Websocket/start)开启成功,查看端口已经被监听:lsof -i:9501演示效果如下:到了这里很多朋友想深入学习swoole和laravel、thinkphp,swoft微服务在使用中遇到很多困难,我为大家准备了一套精品PHP中高级进阶学习教程,需要可看下图详细内容,还可加入大牛学习圈子,分享tp,laravel,swoole,swoft微服务、SQL性能优化,分布式、高并发等教程,各种大牛都是1-7年PHP开发者,每天还有11年的架构师做课程讲解,助你进阶中高级PHP程序员,增值涨薪!需要可看上图内容,还可加入大牛学习圈子,分享tp,laravel,swoole,swoft微服务、SQL性能优化,分布式、高并发等教程,各种大牛都是1-7年PHP开发者,每天还有11年的架构师做课程讲解,助你进阶中高级PHP程序员,增值涨薪! ...

April 2, 2019 · 2 min · jiezi

协程 C/C++ 扩展开发指南(1):内存安全

Swoole4 协程的出现使得 PHP 底层上从原来串行模式变成了并发模式。有很多 PHP 的C/C++扩展在开发时未能考虑到并发性、可重入问题,导致无法在Swoole协程中使用。本文会详细讲解如何编写协程并发安全的C/C++代码。可重入性示例代码:int t;void test1(int *x, int *y) { t = *x; *x = *y; //fun1 函数中可能会存在协程切换 fun1(); //错误代码 y = t;}t是一个全局变量或者static静态变量在协程A中调用了test1函数,使用了全局变量t当函数内调用了fun1(),这个函数中如果发生了协程切换,这时假如另外一个协程B也执行了test1函数,那么t的值可能会被修改协程B挂起时,重新回到协程A,这时y = t,会得到一个错误的值引用栈内存这也是一个严重的风险点。协程1将自身栈内存的指针发送给另外一个协程2,协程1退出时会释放协程栈内存。协程2的生命周期长于1,继续读写此内存,就会导致segment fault。示例:void co1() { char buf[2048]; //这里启动一个新的协程,buf 是协程1栈上内存 co2(buf); //协程1 退出时会释放栈内存}void co2(char *buf) { for(int i=0; i<2048; i++) { Coroutine::sleep(1); //这里 buf 内存可能已经释放了 buf[i] = 1; }}协程安全代码为了保证安全性,在Swoole4协程编程中:不要使用static变量和全局变量,坚持只用局部变量若必须访问全局变量,必须保证只用于计算逻辑,不得存在任何IO或Sleep等引起协程切换的操作不调用其它任何不可重入的函数不要引用栈上内存

March 17, 2019 · 1 min · jiezi

基于Swoole的通用连接池 - 数据库连接池

连接池open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池。依赖依赖版本PHP>=7.0.0Swoole>=4.2.9 Recommend 4.2.13+安装通过Composer安装。composer require “open-smf/connection-pool:~1.0"使用更多示例。基本用法use Smf\ConnectionPool\ConnectionPool;use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;use Swoole\Coroutine\MySQL;go(function () { // MySQL连接数区间:[10, 30] $pool = new ConnectionPool( [ ‘minActive’ => 10, ‘maxActive’ => 30, ‘maxWaitTime’ => 5, ‘maxIdleTime’ => 20, ‘idleCheckInterval’ => 10, ], new CoroutineMySQLConnector, // 指明连接器实例,这里使用协程MySQL连接器,这样就可以创建一个协程MySQL的数据库连接池 [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘3306’, ‘user’ => ‘root’, ‘password’ => ‘xy123456’, ‘database’ => ‘mysql’, ’timeout’ => 10, ‘charset’ => ‘utf8mb4’, ‘strict_type’ => true, ‘fetch_mode’ => true, ] ); echo “初始化连接池…\n”; $pool->init(); defer(function () use ($pool) { echo “关闭连接池…\n”; $pool->close(); }); echo “从连接池中借出连接…\n”; /@var MySQL $connection */ $connection = $pool->borrow(); defer(function () use ($pool, $connection) { echo “向连接池归还连接…\n”; $pool->return($connection); }); // 执行查询语句 $status = $connection->query(‘SHOW STATUS LIKE “Threads_connected”’); var_dump($status);});在Swoole Server中的用法use Smf\ConnectionPool\ConnectionPool;use Smf\ConnectionPool\ConnectionPoolTrait;use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;use Smf\ConnectionPool\Connectors\PhpRedisConnector;use Swoole\Coroutine\MySQL;use Swoole\Http\Request;use Swoole\Http\Response;use Swoole\Http\Server;class HttpServer{ use ConnectionPoolTrait; protected $swoole; public function __construct(string $host, int $port) { $this->swoole = new Server($host, $port); $this->setDefault(); $this->bindWorkerEvents(); $this->bindHttpEvent(); } protected function setDefault() { $this->swoole->set([ ‘daemonize’ => false, ‘dispatch_mode’ => 1, ‘max_request’ => 8000, ‘open_tcp_nodelay’ => true, ‘reload_async’ => true, ‘max_wait_time’ => 60, ’enable_reuse_port’ => true, ’enable_coroutine’ => true, ‘http_compression’ => false, ’enable_static_handler’ => false, ‘buffer_output_size’ => 4 * 1024 * 1024, ‘worker_num’ => 4, // 每个Worker持有一个独立的连接池 ]); } protected function bindHttpEvent() { $this->swoole->on(‘Request’, function (Request $request, Response $response) { $pool1 = $this->getConnectionPool(‘mysql’); /@var MySQL $mysql */ $mysql = $pool1->borrow(); defer(function () use ($pool1, $mysql) { $pool1->return($mysql); }); $status = $mysql->query(‘SHOW STATUS LIKE “Threads_connected”’); $pool2 = $this->getConnectionPool(‘redis’); /**@var Redis $redis */ $redis = $pool2->borrow(); defer(function () use ($pool2, $redis) { $this->pools[‘redis’]->return($redis); }); $clients = $redis->info(‘Clients’); $json = [ ‘status’ => $status, ‘clients’ => $clients, ]; $response->header(‘Content-Type’, ‘application/json’); $response->end(json_encode($json)); }); } protected function bindWorkerEvents() { $createPools = function () { // 所有的MySQL连接数区间:[4 workers * 2 = 8, 4 workers * 10 = 40] $pool1 = new ConnectionPool( [ ‘minActive’ => 2, ‘maxActive’ => 10, ], new CoroutineMySQLConnector, [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘3306’, ‘user’ => ‘root’, ‘password’ => ‘xy123456’, ‘database’ => ‘mysql’, ’timeout’ => 10, ‘charset’ => ‘utf8mb4’, ‘strict_type’ => true, ‘fetch_mode’ => true, ]); $pool1->init(); $this->addConnectionPool(‘mysql’, $pool1); // 所有Redis连接数区间:[4 workers * 5 = 20, 4 workers * 20 = 80] $pool2 = new ConnectionPool( [ ‘minActive’ => 5, ‘maxActive’ => 20, ], new PhpRedisConnector, [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘6379’, ‘database’ => 0, ‘password’ => null, ]); $pool2->init(); $this->addConnectionPool(‘redis’, $pool2); }; $closePools = function () { $this->closeConnectionPools(); }; // Worker启动时创建MySQL和Redis连接池 $this->swoole->on(‘WorkerStart’, $createPools); // Worker正常退出或错误退出时,关闭连接池,释放连接 $this->swoole->on(‘WorkerStop’, $closePools); $this->swoole->on(‘WorkerError’, $closePools); } public function start() { $this->swoole->start(); }}// 启用协程Runtime来让PhpRedis扩展一键协程化Swoole\Runtime::enableCoroutine(true);$server = new HttpServer(‘0.0.0.0’, 5200);$server->start();贡献Github,欢迎 Star & PR。 ...

March 16, 2019 · 2 min · jiezi

one 1.4.6 新增参数验证器

one 1.4.6 新增参数验证器一个极简的高性能框架。支持在swoole协程环境和常规的apache,php-fpm下运行 github:https://github.com/lizhichao/one基本使用方法$_POST = [ ’email’ => ‘xxxx’, ‘age’ => 2];$vt = new Validator();$result = $vt->setAliases([ ’name’ => ‘用户名’, ’email’ => ‘邮箱’, ‘age’ => ‘年龄’])->validate($_POST, [ ’name’ => ‘required|min_len:5,max_len:10’, // 必填 5<= strlen(name) <=10 ’email’ => ‘required|email’, // 必填 email 格式 ‘age’ => ‘unsigned_int|min:18|max:200’ // 选填 正整数 18<= age <=200])->isOk();if ($result === false) { print_r($vt->getErrs());}//Array//(// [0] => 用户名不能为空// [1] => 邮箱格式不正确// [2] => 年龄不能小于18//)内置规则required 必填numeric 数字包括浮点数min 不能小于max 不能大于min_len 不能短于max_len 不能长于unsigned_int 正整数email 邮箱格式ip ip格式ip4 ip4格式ip6 ip6格式如果不够? 可自定义规则$vt->addRule(‘between’, [ ‘msg’ => ‘:attribute只能在:arg1-:arg2之间’, ‘fn’ => function ($value, $arg1, $arg2) { return $value >= $arg1 && $value <= $arg2; }]);$vt->validate([‘a’ => 10], [ ‘a’ => ‘required|between:3,10’ //必填 只能在3-10]); ...

March 8, 2019 · 1 min · jiezi

Swoole难上手?从EasySwoole开始

前言我一生的文章都会放在这里,我的博客,我希望每一行代码,每一段文字都能帮助你。https://github.com/CrazyCodes…大家好,我是CrazyCodes,我没有消失,最近在准备考试,所以文章出的比较慢,请见谅有些童鞋感觉对Swoole不从下手,也不知在什么业务上使用它,看它这么火却学不会也是挺让人捉急的一件事情。Swoole:面向生产环境的 PHP 异步网络通信引擎啥是异步网络通信?异步通信简单点来说,就是一个人和一群人的关系,一个人去做十件事,需要一件一件去做,一群人去做10件事,可以分配每个人做每件事。我们用Swoole可以做什么?聊天室并发的处理 (读大文件)异步MySQL异步Redis等等当然不去做,只在看,很难理解为何选择使用Swoole去做这些。从现在开始,我们暂时不关心上面的这些概念,啥话都不说,就是干EasySwooleEasySwoole ? 名副其实,作者为了让开发者更便捷的使用Swoole 而封装的开发框架,地址在下方EasySwoole https://www.easyswoole.com/使用EasySwoole你会发现有很多很难理解的概念及用法。没关系,跟着我,慢慢来~安装EasySwoole的环境要求保证 PHP 版本大于等于 7.1保证 Swoole 拓展版本大于等于 4.3.0需要 pcntl 拓展的任意版本使用 Linux / FreeBSD / MacOS 这三类操作系统使用 Composer 作为依赖管理工具如果你感觉以上要求太苛刻,你可以选择使用Docker快速部署一套开发环境或者使用更简单的 homestead在使用EasySwoole之前我们要安装Swoole,Swoole是PHP扩展,我们可以通过pecl install swoole快速安装,或者使用源码编译的形式安装安装完扩展后,接下来我们就使用万能composer来安装EasySwoolecomposer require easyswoole/easyswoole=3.xphp vendor/bin/easyswoole install服务管理EasySwoole(Swoole)与其他框架不同,他不擅长开发Web,请将目标定位在后端服务上。以下内容为引用官方文档php easyswoole start Hello World以下为官方文档内容 https://www.easyswoole.com/Ma…在项目根目录下创建如下的目录结构,这个目录是编写业务逻辑的应用目录,编辑 Index.php 文件,添加基础控制器的代码project 项目部署目录———————————-├─App 应用目录│ └─HttpController 应用的控制器目录│ └─Index.php 默认控制器文件———————————-<?phpnamespace App\HttpController;use EasySwoole\Http\AbstractInterface\Controller;class Index extends Controller{ function index() { // TODO: Implement index() method. $this->response()->write(‘hello world’); }}然后编辑根目录下的 composer.json 文件,注册应用的命名空间{ “autoload”: { “psr-4”: { “App\”: “App/” } }, “require”: { “easyswoole/easyswoole”: “3.x-dev” }}最后执行 composer dumpautoload 命令更新命名空间,框架已经可以自动加载 App 目录下的文件了,此时框架已经安装完毕,可以开始编写业务逻辑# 更新命名空间映射composer dumpautoload# 启动框架php easyswoole start启动框架后,访问 http://localhost:9501即可看到 Hello World 。组件EasySwoole提供了很多实用的组件包括控制台组件定时器触发器日志处理等等…致谢从下一章开始,我们逐步使用EasySwoole的各项功能并开发一个简单的并发版爬虫系统,感谢你看到这里,希望本文可以帮到你,谢谢 ...

March 7, 2019 · 1 min · jiezi

基于Swoole和Redis实现的并发队列处理系统

背景由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。在我们的系统中,主系统作为生产者,任务系统作为消费者。具体的工作流程如下: 1、主系统将需要需要处理的任务名称+任务参数push到队列中。 2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。具体代码如下:/** * 启动守护进程 /public function runAction() { Tools::log_message(‘ERROR’, ‘daemon/run’ . ’ | action: restart’, ‘daemon-’); while (true) { $this->fork_process(); } exit;} /* * 创建子进程 /private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo “ Process {$pid} was created \n\n”; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo “\n\n* Sub process: {$pid} exited with {$status}”; //Tools::log_message(‘INFO’, ‘daemon/run succ’ . ‘|status:’ . $status . ‘|pid:’ . $ppid . ‘|childpid:’ . $pid ); } else { Tools::log_message(‘ERROR’, ‘daemon/run fail’ . ‘|status:’ . $status . ‘|pid:’ . $ppid . ‘|childpid:’ . $pid, ‘daemon-’); } }} /** * 业务任务队列处理 */private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = ‘task’ . $data[‘worker’]; $class_name = isset($data[‘class’]) ? $data[‘class’] : ‘TaskproModel’; $params = $data[‘params’]; $class = new $class_name(); $class->$worker($params); return TRUE;}这是一个简单的任务处理系统。通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。这样很稳定。但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!第一个问题还好,但第二个问题就很严重。当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。新的设计为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。因为在PHP7之前不支持多线程,所以我们采用多进程。从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。显然这是不合适的。我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。遇到的问题1、如何控制最大进程数这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!所以,这里就需要了解一个知识点:信号。具体的可以自行Google,这里直接看代码。// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, “sig_handler”));这就安装了一个信号处理器。当然还缺少一点。declare(ticks = 1);declare是一个控制结构语句,具体的用法也请去Google。这句代码的意思就是每执行一条低级语句就调用一次信号处理器。这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。2、如何解决进程残留在多进程开发中,如果处理不当就会导致进程残留。为了解决进程残留,必须得将子进程回收。那么如何对子进程进行回收就是一个技术点了。在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。但我们是基于Redis的brpop的,而brpop是阻塞的。这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。进程回收也放到信号处理器中去。新系统的评估pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。所以这里采用Swoole扩展中的Process。具体代码如下:declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = ’task:wing’; //待处理队列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, “sig_handler”)); set_time_limit(0); ini_set(‘default_socket_timeout’, -1); //队列处理不超时,解决redis报错:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect(‘redis.master.host’,6379); return $rds; } public function process(swoole_process $worker){// 第一个处理 $GLOBALS[‘worker’] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS[‘worker’]; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo “From Master: $recv\n”; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ ‘abc’ => $i, ’timestamp’ => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){// echo “\t now we de have $this->child child processes\n”; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待 if (!$data_pop){ continue; } echo “\t Starting new child | now we de have $this->child child processes\n”; $this->child++; $process = new swoole_process([$this, ‘process’]); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) {// echo “Recive: $signo \r\n”; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) {// echo “PID={$ret[‘pid’]}\n”; $this->child–; } } }}最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。 ...

March 6, 2019 · 2 min · jiezi

swoole_process父子进程管道通信案例

话不多说直接上代码创建的子进程:public function __construct() { $this->redis = Container::get(SwooleRedis::class);//获取异步redis获取更高性能 $this->process = new swoole_process(function (swoole_process $process) { return $this->process($process); }, false, SOCK_DGRAM); $this->process->name(‘Test_Gateway’); $this->process->useQueue(); $this->process->start();//启动子进程 } /** * 子进程处理逻辑 * @param swoole_process $process / private function process(swoole_process $process) { $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); //异步非阻塞 $client->on(“connect”, function (swoole_client $cli) use ($process) { $process->write(‘connected’); }); $client->on(“receive”, function (swoole_client $cli, $data) use ($process) { $process->write($data); }); $client->on(“error”, function (swoole_client $cli) use ($process) { $process->write(’error’); }); $client->on(“close”, function (swoole_client $cli) use ($process) { $process->write(‘close’); }); if ($client->connect(‘127.0.0.1’, 90, -1)) { } else { $process->write(‘网关连接失败’); } swoole_event_add($process->pipe, function ($pipe) use ($process, $client) {//读取父进程管道消息 $client->send($process->read()); }); }父进程onWorkerStart:/* * @param swoole_server $serv * @param $worker_id */ public function onWorkerStart(\swoole_server $serv, $worker_id) { if ($worker_id === 0) { swoole_timer_tick(1000, function () { $this->process->write(‘ping’); }); $process = $this->process; swoole_event_add($process->pipe, function ($pipe) use ($process) {//获取子进程的管道消息 echo “子进程消息:” . $process->read() . PHP_EOL; }); } }子进程的client客户端可以忽略不计,本demo只是掩饰管道通信的例子使用管道就不可以使用消息队列:$process_push()和$process->pop();理论上在父子进程各注册一个event_loop即可实现一边发消息一边接收其他的后续补充 ...

March 4, 2019 · 1 min · jiezi

(一)如何实现一个单进程阻塞的网络服务器

概述想要更好的理解,网络编程,写出一个高性能的服务,我们需要花点时间来理解下对于服务器处理客户端的整个流程并且理解一些关键的术语,本来想在本文中补充一些基础理论知识,担心篇幅过长不利于阅读,所以以后补发一些基础知识,接下来进入正题。理论主要介绍下实现一个网络服务器的基本步骤,代码会在实践环节复现一次。第一步我们需要创建一个socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤。第二步进入while循环,阻塞在accept操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程。accept函数返回客户端连接的socket第三步利用fread读取客户端socket当中的数据收到数据后服务器程序进行处理然后使用fwrite向客户端发送响应。长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会close。实践在这里我们用代码来实现下基本一个流程,在开始写代码之前介绍介几个php函数,是我们代码中可能会用到的,方便大家理解。函数stream_socket_serverstream_socket_acceptcall_user_funcis_callablefread点击函数了解用法代码废话少说直接开撸~<?php class Worker{ //监听socket protected $socket = NULL; //连接事件回调 public $onConnect = NULL; //接收消息事件回调 public $onMessage = NULL; public function __construct($socket_address) { } public function run(){ } }$worker = new Worker(’tcp://0.0.0.0:9810’);//提前注册了一个连接事件回调$worker->onConnect = function ($data) { echo ‘新的连接来了’, $data, PHP_EOL;};//提前注册了一个接收消息事件回调$worker->onMessage = function ($conn, $message) {};$worker->run();按照之前的流程我们需要监听端口+地址public function __construct($socket_address) { //监听地址+端口 $this->socket=stream_socket_server($socket_address); }下一步就需要阻塞在accept操作,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程public function run(){ while (true) { //循环监听 $client = stream_socket_accept($this->socket);//在服务端阻塞监听 } }当新的连接进入唤醒进程并且触发连接事件回调 public function run(){ while (true) { //循环监听 $client = stream_socket_accept($this->socket);//在服务端阻塞监听 if(!empty($client) && is_callable($this->onConnect)){//socket连接成功并且是我们的回调 //触发事件的连接的回调 call_user_func($this->onConnect,$client); } } }这里的连接回调实际上触发的就是之前准备好类库的这里下面这段代码$worker->onConnect = function ($data) { echo ‘连接事件:’, $data, PHP_EOL;};当连接成功后利用fread获取到客户端的内容,并触发接收消息事件 public function run(){ while (true) { //循环监听 $client = stream_socket_accept($this->socket);//在服务端阻塞监听 if(!empty($client) && is_callable($this->onConnect)){//socket连接成功并且是我们的回调 //触发事件的连接的回调 call_user_func($this->onConnect,$client); } //从连接中读取客户端内容 $buffer=fread($client,65535);//参数2:在缓冲区当中读取的最大字节数 //正常读取到数据。触发消息接收事件,进行响应 if(!empty($buffer) && is_callable($this->onMessage)){ //触发时间的消息接收事件 call_user_func($this->onMessage,$this,$client,$buffer);//传递到接收消息事件》当前对象、当前连接、接收到的消息 } } }到此处基本的一个网络服务接收基本完成,还需要对请求做出一个响应,以HTTP请求为例,这里封装了一个http响应的方法(http://127.0.0.1:9810) class Worker{ … … … public function send($conn,$content){ $http_resonse = “HTTP/1.1 200 OK\r\n”; $http_resonse .= “Content-Type: text/html;charset=UTF-8\r\n”; $http_resonse .= “Connection: keep-alive\r\n”; $http_resonse .= “Server: php socket server\r\n”; $http_resonse .= “Content-length: “.strlen($content)."\r\n\r\n”; $http_resonse .= $content; fwrite($conn, $http_resonse); } }当触发接收消息事件时对http请求做出响应$worker->onMessage = function ($server,$conn, $message) { echo ‘来自客户端消息:’,$message,PHP_EOL; $server->send($conn,‘来自服务端消息’);};到这就结束了~,完整代码直通车缺点一次只能处理一个连接,不支持多个连接同时处理 ...

February 27, 2019 · 1 min · jiezi

Swoole跟thinkphp5结合开发WebSocket在线聊天通讯系统教程

ThinkPHP使用Swoole需要安装 think-swoole Composer包,前提系统已经安装好了Swoole PECL 拓展tp5的项目根目录下执行composer命令安装think-swoole:composer require topthink/think-swoole话不多说,直接上代码:新建WebSocket.php控制器:(监听端口要确认服务器放行,宝塔环境还需要添加安全组规则)<?php namespace app\home\controller;use think\swoole\Server;class WebSocket extends Server{ protected $host = ‘0.0.0.0’; //监听所有地址 protected $port = 9501; //监听9501端口 protected $serverType = ‘socket’; protected $option = [ ‘worker_num’=> 4, //设置启动的Worker进程数 ‘daemonize’ => false, //守护进程化(上线改为true) ‘backlog’ => 128, //Listen队列长度 ‘dispatch_mode’ => 2, //固定模式,保证同一个连接发来的数据只会被同一个worker处理 //心跳检测:每60秒遍历所有连接,强制关闭10分钟内没有向服务器发送任何数据的连接 ‘heartbeat_check_interval’ => 60, ‘heartbeat_idle_time’ => 600 ]; //建立连接时回调函数 public function onOpen($server,$req) { $fd = $req->fd;//客户端标识 $uid = $req->get[‘uid’];//客户端传递的用户id $token = $req->get[’token’];//客户端传递的用户登录token //省略token验证逻辑…… if (!$token) { $arr = array(‘status’=>2,‘message’=>’token已过期’); $server->push($fd, json_encode($arr)); $server->close($fd); return; } //省略给用户绑定fd逻辑…… echo “用户{$uid}建立了连接,标识为{$fd}\n”; } //接收数据时回调函数 public function onMessage($server,$frame) { $fd = $frame->fd; $message = $frame->data; //省略通过fd查询用户uid逻辑…… $uid = 666; $data[‘uid’] = $uid; $data[‘message’] = ‘用户’.$uid.‘发送了:’.$message; $data[‘post_time’] = date(“m/d H:i”,time()); $arr = array(‘status’=>1,‘message’=>‘success’,‘data’=>$data); //仅推送给当前连接用户 //$server->push($fd, json_encode($arr)); //推送给全部连接用户 foreach($server->connections as $fd) { $server->push($fd, json_encode($arr)); } } //连接关闭时回调函数 public function onClose($server,$fd) { echo “标识{$fd}关闭了连接\n”; }}前端演示页面:(省略控制器判断登录状态、分配数据逻辑……)<!DOCTYPE html><html lang=“en”><head><meta charset=“UTF-8” /><meta http-equiv=“X-UA-Compatible” content=“IE=edge,chrome=1” /><meta name=“viewport” content=“width=device-width,initial-scale=1,minimum-scale=1,maximum-scale=1,user-scalable=no” /><title>Chat</title><link rel=“stylesheet” type=“text/css” href="/static/liaotian/chat.css" /><script src="/static/liaotian/js/jquery.min.js"></script><script src="/static/liaotian/js/flexible.js"></script></head><body> <header class=“header”> <a class=“back” href=“javascript:history.back()"></a> <h5 class=“tit”>在线聊天</h5> <a href=”"><div class=“right”>退出</div></a> </header> <!– 聊天内容 start–> <div class=“message”> </div> <!– 聊天内容 end–> <!– 底部 start–> <div class=“footer”> <img id=“setbtn” src="/static/liaotian/images/hua.png" alt="" /> <img src="/static/liaotian/images/xiaolian.png" alt="" /> <input type=“text” id=“msg” value="" maxlength=“300”> <p style=“background: rgb(17, 79, 142);” id=“sendBtn”>发送</p> </div> <!– 底部 end–></body></html><script src=“http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script><script src=“https://cdn.bootcss.com/layer/3.1.0/layer.js"></script><script type=“text/javascript”>$(function () { var uid = 666;//当前用户id var token = ‘abcdefg’;//用户token //判断浏览器是否支持WebSocket var supportsWebSockets = ‘WebSocket’ in window || ‘MozWebSocket’ in window; if (supportsWebSockets) { //建立WebSocket连接(ip地址换成自己主机ip) var ws = new WebSocket(“ws://127.0.0.1:9501?uid="+uid+"&token="+token); ws.onopen = function () { layer.msg(‘服务器连接成功’,{shade:0.1,icon:1,time:600}); }; ws.onerror = function () { layer.msg(‘服务器连接失败’,{shade:0.1,icon:2,time:600}); }; ws.onmessage = function (evt) { var data = $.parseJSON(evt.data); //错误提示 if(data.status != 1){ layer.alert(data.message,{icon:2}); return; } //消息返回 if (data.status==1 && data.data.message!=’’) { var html = “”; if (data.data.uid == uid) { html += “<div style=‘word-break:break-all’ class="show"><div class="time">"+data.data.post_time+"</div><div class="msg"><img src=""+data.data.head_img+”" alt="" /><p><i clas="msg_input"></i>"+data.data.message+"</p></div></div>”; }else{ html += “<div style=‘word-break:break-all’ class="send"><div class="time">"+data.data.post_time+"</div><div class="msg"><img src=""+data.data.head_img+”" alt="" /><p><i clas="msg_input"></i>"+data.data.message+"</p></div></div>”; } } $(".message”).append(html); setTimeout(function () { ($(’.message’).children(“div:last-child”)[0]).scrollIntoView();//向上滚动 },100); }; ws.onclose = function (res) { }; //按钮发送 $("#sendBtn").click(function () { var contents = $("#msg").val().trim(); if(contents == null || contents == “”){ layer.msg(‘内容为空’,{shade:0.1,icon:2,time:600}); return false; }else{ ws.send(contents); $("#msg").val(""); } }); //回车发送 $("#msg").keydown(function (evel) { var that = $(this); if (evel.keyCode == 13) { evel.cancelBubble = true; evel.preventDefault(); evel.stopPropagation(); var contents = that.val().trim(); if(contents == null || contents == “”){ layer.msg(‘内容为空’,{shade:0.1,icon:2,time:600}); return false; }else{ ws.send(contents); that.val(""); } } }); }else{ layer.alert(“您的浏览器不支持 WebSocket!”); }});</script>服务器移到项目根目录开启服务:php public/index.php Websocket/start这里的路径,是因为我绑定了home模块为默认模块,tp5默认情况是:php public/index.php index/Websocket/start)开启成功,查看端口已经被监听:lsof -i:9501演示效果如下:到了这里很多朋友想深入学习swoole和laravel、thinkphp,swoft微服务在使用中遇到很多困难,我为大家准备了一套精品PHP中高级进阶学习教程,需要可看下图详细内容,还可加入大牛学习圈子,分享tp,laravel,swoole,swoft微服务、SQL性能优化,分布式、高并发等教程,各种大牛都是1-7年PHP开发者,每天还有11年的架构师做课程讲解,助你进阶中高级PHP程序员,增值涨薪!需要可看上图内容,还可加入大牛学习圈子,分享tp,laravel,swoole,swoft微服务、SQL性能优化,分布式、高并发等教程,各种大牛都是1-7年PHP开发者,每天还有11年的架构师做课程讲解,助你进阶中高级PHP程序员,增值涨薪! ...

February 26, 2019 · 2 min · jiezi

Swoft 系列教程:(1)使用 Docker 安装部署 Swoft

之前有写过一篇 Docker 安装部署 Swoft 的文章,但有些冗余混乱,故重写作为教程的开篇。要不读读看?Swoft项目:https://github.com/swoft-clou…Swoft文档:https://doc.swoft.org/Swoft镜像:https://hub.docker.com/r/swof…Swoft 简介首个基于 Swoole 原生协程的新时代 PHP 高性能协程全栈框架,内置协程网络服务器及常用的协程客户端,常驻内存,不依赖传统的 PHP-FPM,全异步非阻塞 IO 实现,以类似于同步客户端的写法实现异步客户端的使用,没有复杂的异步回调,没有繁琐的 yield, 有类似 Go 语言的协程、灵活的注解、强大的全局依赖注入容器、完善的服务治理、灵活强大的 AOP、标准的 PSR 规范实现等等,可以用于构建高性能的Web系统、API、中间件、基础服务等等。即异步非阻塞IO,EventLoop,事件驱动。cpu_num 个 worker 即可承载高并发请求,提供协程/异步IO客户端,数据库连接池,对象连接池,任务进程池。优雅的注解声明,IOC/DI容器,严格遵循PSR规范。Swoft 镜像的主要用途Swoft 官方提供了基于 Debine 的 Docker 镜像。镜像中已安装配置好运行 Swoft 的所需组件及依赖:PHP 7.0+ / Swoole / Composer / Pecl。虽然不使用镜像从头安装部署以上几项组件也不难,但镜像内置可以开箱即用,免去了这些略繁琐的工作,让我们尽可能快的投入到 Swoft 的开发中去。此外Swoft 镜像与开发的配合如果只是单纯的想快速体验 Swoft,使用 docker run -p 80:80 swoft/swoft 拉取创建容器访问即可。如何正确的在 Swoft 项目的开发中使用镜像呢?如果是要将镜像好好利用到开发工作中,则需要清楚一下几点。镜像内置完全安装的 Swoft 框架,但它只是用来快速演示的,并不是要你拿去修改,开发还是要对本地的 Swoft 项目开发。我们应该做的是将本地的 Swoft 框架 挂载到镜像的工作目录 /var/www/swoft 从而替换掉镜像自带的,这样启动 Swoft服务 就会启动映射到本地的 Swoft 项目了镜像的容器启动时默认会启动 Swoft 服务 作为前置进程,这就要求我们在挂载了本地 Swoft 项目时需要保证已完全安装了各项依赖(github 拉取下来的 Swoft 源码 并没有安装库依赖,需要使用 Composer install 一下)好像咬到尾巴了,为了开发需要挂载本地 Swoft 项目到镜像工作目录,因为容器启动时还会一并启动 Swoft 服务,所以要求挂载的本地 Swoft项目 必须使用 Composer 安装好依赖,嗯?这不还是得在本地装 PHP + Composer 嘛,镜像不是都提供了嘛,重复劳动了。修改 Swoft 镜像的 entrypoint 使得 Swoft 容器启动时不同时启动 Swoft 服务,这就不需要要求我们挂载的本地 Swoft 项目必须完全安装好依赖了。容器创建好后,登入容器 sh,使用镜像内置的 Composer 安装依赖启动 Swoft 服务这样就能充分利用镜像内置的环境和工具,快乐的开始 Swoft 的开发了工作了,下面给出具体的实例。Swoft 镜像的使用前面夸赞了那么多镜像的便利之处,下面如果不完全把镜像用到极致那就不太好了 O(∩_∩)O哈哈1、首先我们从 github 上拉取最新的 Swoft 源码到本地cd ~ && git clone git@github.com:swoft-cloud/swoft.git && cd swoft2、查看 swoft 镜像的 Dockerfile# 在文件尾设定了 entrypoint 命令为 启动 swoft服务ENTRYPOINT [“php”, “/var/www/swoft/bin/swoft”, “start”]entrypoint 就是我们后面需要改掉的参数3、直接使用镜像创建容器docker run -p 8081:80 \ #映射宿主机808-v $(pwd):/var/www/swoft #挂载本地 Swoft 项目到镜像工作目录-it -d \ #重要 开启 stdin tty 并以daemon模式运行–entrypoint="" #重要 覆盖掉镜像内设定的 entrypoint 参数–name my_swoft #容器命令–privileges=true #赋予权限swoft/swoft bash4、使用 docker-compose 更为简洁#编辑 docker-compose 编排文件vim docker-compose.yml#内容修改如下version: ‘3’services: swoft: image: swoft/swoft:latest container_name: my_swoft # 给容器自定义个名称便于管理 #build: ./ ports: - “8081:80” #端口映射 volumes: - ./:/var/www/swoft # 挂载本地swoft项目到镜像工作目录 stdin_open: true #打开标准输出 -i tty: true # 打开 tty 会话 -t privileged: true # 给与权限 比如创建文件夹之类的 #entrypoint: [“php”, “/var/www/swoft/bin/swoft”, “start”] # 入口启动命令 即启动 swoft 服务 entrypoint: [“bash”] 创建容器docker-compose up -d swoft ./5、登入容器,安装依赖,开启 Swoft 服务使用3或4创建的Swoft容器,便以 bash 作为启动的前置进程,而非启动 Swoft 服务,我们登入容器使用内置的 Composer 安装依赖后,启动Swoft服务即可。docker exec -it my_swoft bash# 安装框架依赖composer install# 启动/停止/重启 Swoft 服务php bin/swoft start|stop|restar6、开启热重载,关闭 daemon,让框架调试信息输出到 stderr 方便开发调试编辑本地的 Swoft 项目 .env 文件# ApplicationAPP_DEBUG=true# Server…AUTO_RELOAD=true…# Swoole Settings…DAEMONIZE=0…保存并重新启动 Swoft服务小提示:可以使用 PHPStorm IDE 配置 FTP/SFTP 文件改动自动上传的方式,开发起飞 ...

February 21, 2019 · 2 min · jiezi

php变量及参数传递

术语zval容器,符号表,is_ref,refcount,cow,引用分离<?php# 值传递$a =1; // 一个zval容器$b=$a; // 两个zval容器,由于cow优化,仍然是一个zval容器?><?php# cow$a = 1;$b =$a; // cow优化,一个容器$b=2; // 写操作,分离成两个zval容器了?><?php# 闭包use值传递$var = ‘Hello World’;$func=function ()use($var) { // 闭包use,值传递(copy) var_dump($var); // Hello World $var = “new value”; // 闭包内的值修改,不影响闭包外部的变量};$func();var_dump($var); // 不变,仍然是Hello World?><?php# 闭包use通过引用改变外部变量$var = ‘Hello World’;$func=function ()use(&$var) { // 闭包use,试试传引用 var_dump($var); // Hello World $var = new stdClass();};$func();var_dump($var); // 引用发生了作用,输出object(stdClass)#2 (0) {}?><?php# 函数传递默认是值传递,对于对象则使用引用传递$a = new stdClass();function test($a) { $a->b=1; var_dump($a);}test($a);var_dump($a); // $a被改变了,输出:object(stdClass)#1 (1) {[“b”]=>int(1) }?><?php# swoole下的参数传递class Pool { private $_pool; public function __construct() { $this->_pool = new SplQueue(); $this->_init(); } private function _init(){ for($i=0;$i<100;$i++) { $this->put($i); } } public function get() { return $this->_pool->dequeue(); } public function put($item){ $this->_pool->enqueue($item); } public function length(){ return $this->_pool->count(); }}$pool = new Pool();class Server { private $_server; private $_pool; public function __construct(Pool $_pool) { $this->_pool = $_pool; $this->_server = new Swoole\Http\Server(“0.0.0.0”, 9500); $this->_server->set( [ ‘worker_num’ => 2,// ‘daemonize’ => true, // daemon ’log_level’ => SWOOLE_LOG_INFO, ] ); $this->_server->on(“request”, [$this, “onRequest”]); $this->_server->start(); } public function onRequest(\Swoole\Http\Request $request, \Swoole\Http\Response $response) { echo $this->_pool->get().PHP_EOL; }}$server = new Server($pool);# 压测 ab -c 4 -n 4 http://127.0.0.1:9500/# 输出:#0#1#0#1# 分析:2个worker,4个并发,共4个请求,由于进程隔离,$pool对象发生了复制,而不是期望中输出0/1/2/3?>Refer:http://php.net/manual/en/func… ...

January 25, 2019 · 1 min · jiezi

php异步编程

前言我对 php 异步的知识还比较混乱,写这篇是为了整理,可能有错。传统的 php-fpm 一个进程执行一个请求,要达到多少并发,就要生成多少个进程。更糟糕的是每次请求都需要重新编译执行,导致并发一直上不来。因此出现了 Swoole 和 WorkerMan 两个国内流行的常驻内存框架[1]。这两个框架原理都是通过事件循环,让程序一直停留在内存,等待外部请求,达到高并发。为什么需要异步先来看一个例子在工作目录下新建文件 slowServer.php<?phpsleep(5); // 5秒后才能返回请求echo ‘done’;开启服务$ php -S localhost:8081 slowServer.php开另一个终端,安装依赖$ pecl install event # 安装 event 扩展$ composer require workerman/workerman$ composer require react/http-client:^0.5.9新建文件 worker.phprequire_once DIR . ‘/vendor/autoload.php’;use Workerman\Worker;use Workerman\Connection\AsyncTcpConnection;use Amp\Artax\Response;$http_worker = new Worker(“http://0.0.0.0:8082”);$http_worker->count = 1; // 只开一个进程$http_worker->onMessage = function($connection, $host) { echo 1; $data = file_get_contents(‘http://localhost:8081’); $connection->send($data);};Worker::runAll();开启服务器php worker.php start在浏览器开启两个标签,都打开网址 http://localhost:8082 。这时可以看到终端输出“1”,过了一会儿又输出“1”,原因是8081服务器在处理第一个请求的时候阻塞在了等待8081返回之中,等第一个请求结束后,才开始处理第二个请求。也就是说请求是一个一个执行的,要达到多少个并发,就要建立多少个进程,跟 php-fpm 一样。现在修改一下代码$http_worker->onMessage = function($connection, $host) { echo 1; $loop = Worker::getEventLoop(); $client = new \React\HttpClient\Client($loop); $request = $client->request(‘GET’, ‘http://localhost:8081’); $request->on(’error’, function(Exception $e) use ($connection) { $connection->send($e); }); $request->on(‘response’, function ($response) use ($connection) { $response->on(‘data’, function ($data) use ($connection) { $connection->send($data); }); }); $request->end();};现在打开服务,再在浏览器发起请求,发现第二个“1”在请求后就马上输出了,而这时第一个请求还没结束。这表明进程不再阻塞,并发量取决于 cpu 和 内存,而不是进程数。为什么需要异步通过上面的例子已经很明白了,reactphp 框架通过把 http 请求变成异步,让 onMessage 函数变成非阻塞,cpu 可以去处理下一个请求。即从 cpu 循环等待 8081 返回,变成了 epoll 等待。异步的意义在于把 cpu 从 io 等待中解放出来,可以处理其他计算任务。 如果你想知道怎么用框架实现异步,看到这里就可以了。WorkerMan 配合 ReactPHP 或者自身的 AsyncTcpConnection 已经可以满足很多 io 请求异步化的需求。下面继续讨论这些框架是怎么做到异步的。哪些地方应该被做成异步通过上面的例子已经知道一旦执行到不需要 cpu,但是要等待 io 的时候,应该把 io 的过程做成异步。实现事件循环上面的例子是通过 reactphp 把 http 请求变成了异步,其实 WorkerMan 框架本身也是异步的,下面来看看 WorkerMan 是怎么使 onMessage 函数可以异步接受请求。先来新建下面这个文件 react.php<?php$context = stream_context_create();$socket = stream_socket_server(’tcp://0.0.0.0:8081’, $errno, $errmsg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context); // 注册一个 fd(file descriptor)function react($socket){ $new_socket = stream_socket_accept($socket, 0, $remote_address); echo 1;}$eventBase = new EventBase();$event = new Event($eventBase, $socket, Event::READ | Event::PERSIST, ‘react’, $socket); // 注册一个事件,检测 fd 有没有写入内容$event->add();$eventBase->loop(); // 开始循环开始执行$ php react.php在另一个终端执行telnet 127.0.0.1 8081这时就会看到第一个终端输出'1’。我之前写过一篇文章《php使用epoll》,是这篇文章的基础。那篇文章里事件回调是通过定时来实现,即$event->add($seconds);而这里,事件回调是通过检测 fd 是否有写入内容来实现,这个过程不需要 cpu 参与。当 fd 有内容写入时,会调函数 ‘react’,这时开始使用 cpu。如果这时候进程执行另一个异步请求,比如用 reactphp 框架请求一个网页,那么程序会让出 cpu,此时如果有另一个请求进来,就可以回调执行另一个 ‘react’ 函数。由此提高了并发量。协程生成器 Generater这是生成器的 PHP 官方文档 http://php.net/manual/zh/lang...<?phpfunction gen_one_to_three() { for ($i = 1; $i <= 3; $i++) { //注意变量$i的值在不同的yield之间是保持传递的。 yield $i; }}$generator = gen_one_to_three();foreach ($generator as $value) { echo “$value\n”;}生成器就是每次程序执行到 yield 的时候保存状态,然后返回 $i,是否继续执行 gen_one_to_three 里的循环,取决于主程序是否继续调用什么是协程上面的程序另一种写法是<?php$i = 1;function gen_one_to_three() { global $i; if ($i<=3){ return $i++; }}while ($value = gen_one_to_three()) { echo “$value\n”;}由此可见,协程就是一种对函数的封装,使其变成一种可以被中断的函数,行为更像是子进程或子线程,而不是函数。协程的具体写法这里不细写,因为协程的写法十分复杂,可能需要再做一层封装才能好用。协程与异步既然协程可以被中断,那么只要在程序发起请求后发起事件循环,然后用 yield 返回,然后程序继续执行主程序部分,等事件返回后触发函数,执行 Generatot::next() 或 Generator::send() 来继续执行协程部分。封装好后就好像没有异步回调函数一样,和同步函数很像。现在已经有 ampphp 和 swoole 两个框架封装了协程,有兴趣可以了解一下。国外还有 https://amphp.org 和 https://reactphp.org 这两个框架博客地址:http://b.ljj.pub ...

January 22, 2019 · 2 min · jiezi

Swoole 2019 :化繁为简、破茧成蝶

Swoole开源项目从2012年开始发布第一个版本,到现在已经有近7年的历史。在这七年的时间里:提交了8821次代码变更发布了287个版本收到并解决1161次issue反馈合并了603次pull request共有100位开发者贡献代码在GitHub收获了11940颗星协程2018年我们推出了全新的Swoole4版本,在此之前Swoole主要的编程方式还是同步阻塞模式或异步回调。新的基于协程实现的CSP编程逐渐成为我们唯一推荐使用的编程模式。协程将纷繁复杂异步编程大大简化。使用Swoole4协程,既简单又强大。在未来的Swoole5版本,我们计划删除非协程的相关特性和代码,减少历史包袱,提升稳定性,降低复杂度,减少不必要的选项,纯粹协程化。 过去前六我们的团队主要以兼职开发为主,团队成员大多来自于腾讯、阿里、滴滴、百度、360、小米等国内一线互联网企业,还有一部分是国外的PHP开发者,甚至PHP语言ZendVM内核作者Dmitry Stogov也曾向Swoole贡献了代码。除此之外,我们还招募了一些在校大学生为Swoole编写代码,逐步培养年轻一代开发者。在2018年7月份我们组建了全职开发团队,专注于Swoole内核以及Swoole Cloud云原生组件和生态链的开发。告别过去的草莽班子,转变为专业化的开源技术研发团队。我们的目标是让Swoole项目成为Node.js、Go这样的工业级技术,成为PHP编程语言的在异步IO和网络通信方面的基石。研发管理成立全职研发团队后,我们逐渐建立了非常完善的研发管理体系,提升Swoole的软件质量。主要包括以下几个方面:测试驱动(TDD)现在我们投入大量精力实现单元测试脚本、压测脚本、自动化测试,提升单元测试覆盖率。目前已有680项测试用例,17项压测项目,在Travis-CI平台可以看到每一次Commit和Pull Request的编译、测试结果。研发工作也基于TDD进行,在开发新特性、重构、Bug Fix时,会先编写对应的单元测试脚本,测试覆盖到代码变更的所有场景。代码审查(Code Review)团队成员之间进行代码交叉审查、互相Code Review,对于代码变更的细节进行充分的评估和讨论。重大变更,会进行团队Review,花费数小时甚至数天讨论每一行代码变更细节。RFC 机制对于非Bug Fix、非性能提升、非重构,新特性或有可能改变底层行为的变更,我们会分为4个步骤进行。发起RFC的提案,https://github.com/swoole/rfc…,提案内容会详细阐述此项变更的前因后果、相关配置项、影响的范围、使用方法、示例。提案讨论,我们会对提案进行充分的讨论,刨根问底,分析优劣,推敲细节。所有问题均讨论清楚后,最终立项,开始实现。开发负责人创建git分支,编写单元测试脚本,编写代码,实现提案中的所有内容,最终发起Pull Request交叉评审,检查代码,提出改进意见,反馈给开发负责人,继续完善细节。最终合并到主干。整个过程均是在GitHub平台公开进行的,对Swoole项目感兴趣的PHPer均可参与。灰度测试为了保证正式版本的稳定性,我们在发布前会在内部项目上进行灰度测试,检验新版本的稳定性。另外我们与大部分Swoole框架作者建立了联系,新版本会先发给各大框架的作者提前试用。有重大底层变更、或不兼容项会提前与其他Swoole之上的开源项目作者进行沟通。重构2018年下半年我们对底层的代码进行了多次重构,在代码结构、可读性、复用性、封装度方面进行了很多优化。使得Swoole软件更为简洁、优雅。编程语言方面,我们现在逐渐使用C++替代C语言。C++提供的面向对象、智能指针、容器、模板等特性能够帮助我们进一步提升我们团队的开发效率。在此也欢迎各位PHPer参与Swoole项目,贡献代码。文档Swoole的文档也是广为开发者诟病的一个方面。在2018年我们团队在文档方面逐渐加大投入。重新编写梳理文档,加入丰富的例子程序,加入更详细的配图,修复细节问题,删除带有感情色彩的语句,更加客观中立严谨。总结在过去的几年,Swoole项目做的并不是很专业,存在较多BUG和难用的地方,也让很多使用者踩到了不少坑。最近半年成立全职研发团队后,我们在研发管理方面进步飞快,Swoole的稳定性、成熟度方面已今非昔比。稳定性始终是第一位的,我们在未来将会更加谨慎、严谨,保证质量。2019 未来新的一年我们主要有3个方向上发力。做减法删除非协程的特性,删除不必要的模块,减少历史包袱,提升稳定性、降低复杂度,减少不必要的选项,化繁为简,更简单。Swoole内核层面仍然会继续不断重构、精简,减少代码行数,清理冗余代码,尽可能地实现代码复用。深入项目在2018年底,我们已经开始逐渐与在生产环境上大量使用Swoole的企业建立联系,包括腾讯云、阅文、好未来、陌陌、优信等企业。了解实际应用场景、业务模式,进行深度交流合作,提供建议,帮助企业技术团队更好的解决业务问题,接受反馈改进底层。生态链2019年我们会基于Swoole4协程开发一些配套的工具和组件,弥补PHP在Cloud Native时代生态链方面的不足。

January 21, 2019 · 1 min · jiezi

基于swoole的极简框架-1.4.1

one 1.4.1版本更新:优化优化uuid生成规则修复缓存驱动为file时 notice错误增加允许在模型自己创建查询构造器链式调用class Article extends Model{ CONST TABLE = ‘articles’; public function week() { return $this->where(‘create_at’, ‘>’, strtotime(’-1 week’)); } /** * 根据点赞排序 */ public function orderByLikeCount() { return $this->orderBy(’like_count’, ‘desc’); }}// 获取周排行榜 按照点赞数量Article::column([‘id’,’title’])->where(‘create_at’, ‘>’, strtotime(’-1 week’))->orderBy(’like_count’, ‘desc’)->limit(10)->findAll();// 通过自己创建的查询构造器Article::column([‘id’,’title’])->week()->orderByLikeCount()->limit(10)->findAll();添加rpc 方法 支持数组// 添加方法method1,method2 供远程客户端调用RpcServer::add(Abc::class,‘method1’);RpcServer::add(Abc::class,‘method2’);// 现在可以这么写RpcServer::add(Abc::class,[‘method1’,‘method2’]);队列固定长度$global_data = new \App\GlobalData\Client();// 设置队为固定长度$global_data->setQueueLimit(3);$arr = [1, 2, 3, 4, 5];foreach ($arr as $i) { $global_data->push(‘abc’, $i);}while (1) { $ret = $global_data->pop(‘abc’); if ($ret !== null) { echo $ret . PHP_EOL; } else { break; }}//以上输出//3//4//5//删除固定长度限制$global_data->delQueueLimit(‘abc’);github: https://github.com/lizhichao/one码云: https://gitee.com/vicself/one ...

January 21, 2019 · 1 min · jiezi

swoole之协程channel元素个数

前言channel用于进程内跨协程通讯,按照角色分为生产协程和消费协程。生产协程,在channel已满时,会被挂起;消费协程,在channel为空是,也会被挂起。看例子<?php$chan = new \Swoole\Coroutine\Channel(50);function t4(\Swoole\Coroutine\Channel $chan){ Co::sleep(0.005); $chan->push([METHOD => LINE]);}function t5(\Swoole\Coroutine\Channel $chan){ Co::sleep(0.005); $chan->push([METHOD => LINE]);}go(“t4”, $chan);go(“t5”, $chan);go(function () use ($chan) { // chan元素个数 $chanNum = 1; while ($chanNum > 0) { $item = $chan->pop(); var_dump($item); $chanNum–; }});分析上面的例子,如果赋值$chanNum=1,会导致channel中有数据未被消费;如果赋值$chanNum=3,由于channel数据不足,消费协程会挂起,程序无法正常退出。准确设置channel元素个数,是很重要的事。实践中,有些场景无法预测channel元素个数(例如请求第三方接口,如果有数据则push到channel,无数据则不push),那有什么解决办法嘛?有!保证生产者协程不挂起的前提下,在php的register_shutdown_function()函数中,去实现未完成的消费者功能<?phpregister_shutdown_function(function() use ($chan) { go(function()use($chan){ $queue_num = $chan->stats()[“queue_num”]; for($i=0;$i<$queue_num;$i++) { var_dump($chan->pop()); } });});这个办法能解决问题,但是显然不是那么优雅消费者函数要重复写一遍,由于swoole协程的语法,无法复用这个问题,应该在swoole协程层面来处理,在register_shutdown_function()中处理,也只是临时解决办法总结swoole协程的push/pop机制,决定了需要设置一个合理的channel元素个数。实践中某些场景,又无法准确评估这个值,只能用临时办法解决,希望swoole能提供更优雅的解决方式。

January 18, 2019 · 1 min · jiezi

swoole协程之channel

前言通过swoole协程入门,了解到协程的基本写法。更准确一点,是独立、无执行顺序的任务。那有依赖关系或者执行顺序有关的任务怎么办呢?靠channel了!Channel特点与容量有关如果channel未满,push不阻塞,如果已满,push让出控制流;如果channel为空,pop让出控制流看例子:depend_co.php<?php// 设置一个容量为50的channel$chan = new \Swoole\Coroutine\Channel(50);function t4(\Swoole\Coroutine\Channel $chan) { Co::sleep(0.005); #1 $chan->push([METHOD=>LINE]); #2}function t5(\Swoole\Coroutine\Channel $chan) { Co::sleep(0.005); #3 $chan->push([METHOD=>LINE]); #4}function t6(\Swoole\Coroutine\Channel $chan) { Co::sleep(0.005); #5 $chan->push([METHOD=>LINE]); #6}go(“t4”, $chan);go(“t5”, $chan);go(“t6”, $chan);// cousume协程:c1go(function() use($chan) { // chan元素个数 $chanNum = 3; // chan有数据时 while($chanNum>0) { #7 $item = $chan->pop(); #8 var_dump($item); #9 $chanNum –; }});分析3个生产者协程(t4/t5/t6),1个消费者协程(用c1描述)#1 t4遇到Co::sleep,让出控制流,挂起;#3 t5,类似于t4,挂起#5 t6,类似于t4,挂起#7 c1开始执行,while循环为真,执行channel::pop()#8 可能情况:channel为空,c1让出控制流,挂起;#8 可能情况:channel非空,pop弹出数据,while循环继续如果while循环为假,c1执行结束如果while循环为真,进入channel::pop()流程…5ms后…t4恢复执行(t4/t5/t6 sleep时间相同,因此都有可能先恢复执行,但同时只能有一个恢复,为描述简单以t4为例)#2 t4写入channel数据,此时channel非空#8 如果有消费者协程,控制流发生变化,消费者协程c1恢复执行(思考:如果c1的while循环为假已经结束,会发生什么呢?)c1协程运行直到让出控制流或者结束;控制流回到t4协程,t4协程没有后续代码,执行结束;(问题:控制流有可能回到其他协程t5/t6嘛?还是一定会回到t4协程)t5(t6)恢复执行,流程类似于t4的执行流程注意事项channel的容量很重要,过小的容量导致生产者自动让出控制流而不能执行;消费者,需要判断生产者个数,来确定循环次数或循环结束边界,如果判断错误(太小,导致channel数据未消费;太大,消费者会让出控制流),会带来意外的结果。总结回到文章的开头,好像我们是想介绍有依赖关系、调用顺序的任务怎么写?你肯定已经猜到了,那就是先导任务放在生产者协程,后续任务放在消费者协程通过channel的机制,保障任务的先后执行顺序channel解决了协程间通信的问题,同时也提供了一种任务调度的方式。

January 18, 2019 · 1 min · jiezi

swoole协程入门

看了韩天峰PHP 协程:Go + Chan + Defer的文章,觉得swoole大大降低了协程的开发门槛,同时提供了php yield原生协程不具备的功能,尝试着写一些简单的例子。前言:协程是什么?协程是协作式任务(用户来决定控制流),相对的是抢占式任务(例如cpu来调度资源)。简单点说,协程的精髓就是控制流的让出与恢复。环境:ubuntu18.04.1 LTSphp7.3.0swoole4.2.10看例子:simple_co.php<?php// simple cofunction t1() { Co::sleep(0.05); echo METHOD.PHP_EOL;}function t2() { Co::sleep(0.05); echo METHOD.PHP_EOL;}function t3() { Co::sleep(0.05); echo METHOD.PHP_EOL;}go(“t1”);go(“t2”);go(“t3”);$ time php simple_co.php # 执行时间在0.07s左右分析这个简单的例子,定义了三个任务(t1/t2/t3),通过短名称go来运行三个任务。运行过程如下:t1执行,遇到Co::sleep,主动让出控制流,t1暂停(等待Co::sleep结束后恢复执行)t2执行,类似t1情况,暂停t3执行,类似t1情况,暂停没有其他程序了,3个协程的执行都处于暂停状态…0.05s后…协程上t1的sleep先结束,恢复执行t1t2/t3的恢复执行类似t1然后整个程序结束特别说明t1/t2/t3的恢复执行并非严格按照挂起顺序恢复(本地测试中恢复执行顺序是t1/t3/t2)恢复执行没有确定的顺序适用于无序、独立的任务并行执行结束这个例子,主要来描述控制流的让出和恢复执行,这也是协程的精髓。当然,程序的实现依赖于swoole的封装,感谢swoole!

January 18, 2019 · 1 min · jiezi

你真的了解RPC吗?

现微服务盛行,服务之间通信大概就两种方式Api和Rpc。下面两个列子来让你了解Api和Rpc的区别。列子一 文章的增删改查。Api 实现:Router::get(’/article/{id}’,‘ArticleController@get’);Router::post(’/article’,‘ArticleController@create’);Router::put(’/article/{id}’,‘ArticleController@edit’);Router::delete(’/article/{id}’,‘ArticleController@delete’);然后在控制器Article调用模型return Article::find($id)->toArray();Rpc 实现RpcServer::add(‘Article’);没错就一行代码列子二 计算器假如机器A上面一个计算器 Counter,以Rpc的方式提供给其他机器使用.计算器Counter代码class Counter{ private $i = 0; public function __construct($i = 0) { $this->i = $i; } // 加法 public function add($v) { $this->i += $v; return $this; } // 减法 public function sub($v) { $this->i -= $v; return $this; } // 乘法 public function mul($v) { $this->i *= $v; return $this; } // 除法 public function div($v) { $this->i /= $v; return $this; } // 获取结果 public function get() { return $this->i; }}Rpc 实现RpcServer::add(‘Counter’);Rpc客户端调用$c = new ClientCounter(10);echo $c->add(3)->mul(2)->sub(10)->div(5)->get();Api 实现:你觉得 Api 应该怎么实现?以上代码是我在设计 one框架的一些思考?如你喜欢请star https://github.com/lizhichao/one如其他观点,欢迎留言讨论。 ...

December 24, 2018 · 1 min · jiezi

初识 swoole【上】

前言:都是为了生存有收获的话请加颗小星星,没有收获的话可以 反对 没有帮助 举报三连代码仓库初始swoole【上】一、什么是swooleSwoole:面向生产环境的 PHP 异步网络通信引擎使 PHP 开发人员可以编写高性能的异步并发 TCP、UDP、Unix Socket、HTTP,WebSocket 服务。Swoole 可以广泛应用于互联网、移动通信、企业软件、云计算、网络游戏、物联网(IOT)、车联网、智能家居等领域。 使用 PHP + Swoole 作为网络通信框架,可以使企业 IT 研发团队的效率大大提升,更加专注于开发创新产品。二、安装1、peclpecl install swoole2、源码安装curl -O https://pecl.php.net/get/swoole-4.2.5.tgztar -zxvf swoole-4.2.5.tgzcd swoole-4.2.5.tgzphpize./configuremake && make install# 加入到php.ini中php –iniextension=swoole.so三、基本入门注意:示例代码都引入了"symfony/var-dumper"包进行美化打印Swoole的绝大部分功能只能用于cli命令行环境1、TCP服务器<?php// new$server = new swoole_server(“0.0.0.0”, “9501”,SWOOLE_PROCESS, SWOOLE_SOCK_TCP);// connect 连接$server->on(“connect”, function (swoole_server $server, $fd) { dump("{$fd}连接"); $server->send($fd, “欢迎{$fd}大山驴\n”);});// receive 回调$server->on(“receive”, function (swoole_server $server, $fd, $from_id, $data) { $server->send($fd, “服务端回复:{$data}\n”); foreach ($server->connections as $connection) { if ($connection != $fd){ $server->send($connection, “{$fd}说{$data}”); } }});// close$server->on(“close”, function (swoole_server $server, $fd) { dump("{$fd}关闭"); foreach ($server->connections as $connection) { if ($connection != $fd){ $server->send($connection, “{$fd}断开连接”); } }});// start$server->start();php tcp_server.phptelnet 127.0.0.1 95012、UDP服务器// new<?php$server = new swoole_server(“0.0.0.0”, “9502”, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);// packet$server->on(“Packet”, function (swoole_server $server, $data, $clientInfo) { $server->sendto($clientInfo[‘address’], $clientInfo[‘port’], “服务器回复: {$data}”); dump($clientInfo);});// start$server->start();php udp_server.phpnetcat -u 127.0.0.1 95023、Web服务器<?php// new$http = new swoole_http_server(HOST, “9503”);// request$http->on(“request”, function (\Swoole\Http\Request $request, \Swoole\Http\Response $response) { dump($request->get, $request->post); $response->header(“Content-Type”, “text/html; charset=utf-8”); $response->end("<h1>Hello Swoole. #" . mt_rand(1000, 9999) . “</h1>\n”);});// start$http->start();php web_server.phpcurl -XGET “127.0.0.1:9503?id=1&name=aa&age=26"curl -XPOST “127.0.0.1:9503?id=1&name=aa&age=26” -d “love=like"4、WebSocket服务器服务端<?php// new$ws = new swoole_websocket_server(HOST, “9504”);// open$ws->on(“open”, function (Swoole\WebSocket\Server $ws, \Swoole\Http\Request $request) { dump($request->fd, $request->get, $request->server); $ws->push($request->fd, “你是大山驴\n”);});// message$ws->on(“message”, function (\Swoole\WebSocket\Server $ws, $frame) { dump(“消息: {$frame->data}\n”); $ws->push($frame->fd, “服务端回复: {$frame->data}\n”);});// close$ws->on(“close”, function (Swoole\WebSocket\Server $ws, $fd) { dump(”{$fd}关闭”);});// start$ws->start();客户端let ws = new WebSocket(“ws://127.0.0.1:9504”);ws.onopen = function (ws) { console.log(“连接服务器”);};ws.onclose = function (ws) { console.log(“断开连接”);};ws.onmessage = function (ws) { console.log(‘接收来自服务器的消息:’ + ws.data);};ws.onerror = function (ws, event) { console.log(‘错误了:’ + ws.data);};5、定时器<?php// 每2秒执行一次swoole_timer_tick(2000, function ($time_id){ dump($time_id);});// 3秒后执行swoole_timer_after(3000, function (){ dump(“这是3s后”);}); ...

December 20, 2018 · 2 min · jiezi

初识 swoole【下】

前言:接初识swoole【上】,这篇主要是异步问题有收获的话请加颗小星星,没有收获的话可以 反对 没有帮助 举报三连代码仓库初识swoole【下】6、异步任务<?php// new$server = new swoole_server(HOST, “9505”,SWOOLE_PROCESS, SWOOLE_SOCK_TCP);//设置异步任务的工作进程数量$server->set([’task_worker_num’ => 4]);// connect 连接$server->on(“connect”, function (swoole_server $server, $fd) { dump("{$fd}连接"); $server->send($fd, “欢迎{$fd}大山驴\n”);});// receive 回调$server->on(“receive”, function (swoole_server $server, $fd, $from_id, $data) { //投递异步任务 $task_id = $server->task($data); dump(“触发异步任务ID={$task_id}”); $server->send($fd, “服务端回复:{$data}\n”); foreach ($server->connections as $connection) { if ($connection != $fd){ $server->send($connection, “{$fd}说{$data}”); } }});// task 处理异步任务$server->on(“task”, function (swoole_server $server, $task_id, $from_id, $data){ dump(“新的异步任务[ID={$task_id}]”); //返回任务执行的结果 $server->finish("{$data}完成了");});// finish 处理异步任务的结果$server->on(“finish”, function (swoole_server $server, $task_id, $data){ dump(“异步任务[{$task_id}]已经完成[{$data}]”);});// close$server->on(“close”, function (swoole_server $server, $fd) { dump("{$fd}关闭"); foreach ($server->connections as $connection) { if ($connection != $fd){ $server->send($connection, “{$fd}断开连接”); } }});// start$server->start();php async.phptelnet 127.0.0.1 95057、同步TCP客户端<?php// new 同步$client = new \Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);// connectif (! $client->connect(HOST, 9501, 0.5)){ dump(“连接失败”);}// sendif (! $client->send(“你个山驴逼\n”)) { dump(“发送失败”);}// receiveif (! $data = $client->recv()) { dump(“接收失败”);}dump($data);// close$client->close();php tcp_server.phpphp tcp_sync_client.php8、异步TCP客户端<?php// new 异步$client = new \Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);// connect$client->on(“connect”, function (\Swoole\Client $cli){ $cli->send(“你个山驴逼\n”);});// receive$client->on(“receive”, function (\Swoole\Client $cli, $data){ dump(“接收:{$data}”);});// error$client->on(“error”, function (\Swoole\Client $cli){ dump(“连接失败”);});// close$client->on(“close”, function (\Swoole\Client $cli){ dump(“连接关闭”);});$client->connect(HOST, 9501, 0.5);php tcp_server.phpphp tcp_async_client.php9、异步客户端mysql// 异步mysql客户端$db = new \Swoole\Mysql();$config = [ ‘host’ => ‘127.0.0.1’, ‘user’ => ‘root’, ‘password’ => ‘root’, ‘database’ => ‘mac’, ‘port’ => ‘3307’,];$db->connect($config, function (\Swoole\Mysql $db, $rs) { $db->query(‘SELECT * FROM user’, function (\Swoole\Mysql $db, $rs) { dump($rs); $db->close(); });});http// 异步http客户端$cli = new Swoole\Http\Client(‘127.0.0.1’, 80);$cli->setHeaders(array(‘User-Agent’ => ‘swoole-http-client’));$cli->setCookies(array(’test’ => ‘value’));$cli->get(’/login’, function (\Swoole\Http\Client $cli) { dump($cli->statusCode); dump($cli->cookies); dump($cli->headers);});10、协程客户端// 协程mysql客户端$http = new swoole_http_server(HOST, 9506);$http->on(‘request’, function ($request, $response) { $db = new \Swoole\Coroutine\Mysql(); $db->connect([ ‘host’ => ‘127.0.0.1’, ‘user’ => ‘root’, ‘password’ => ‘root’, ‘database’ => ‘mac’, ‘port’ => ‘3307’, ]); $data = $db->query(‘select * from user’); dump($data); $response->end(json_encode($data));});$http->start();php coroutine_client.phpcurl -XGET “127.0.0.1:9506” ...

December 20, 2018 · 2 min · jiezi

实战 swoole【聊天室】

前言:了解概念之后就应该练练手啦,不然就是巨婴有收获的话请加颗小星星,没有收获的话可以 反对 没有帮助 举报三连代码仓库实战swoole【聊天室】在线体验准备工作需要先看初识swoole【上】,了解基本的服务端WebSocket使用js WebSocket客户端简单使用使用# 命令行1php src/websocket/run.php# 命令行2cd public && php -S localhost:8000# 客户端,多开几个查看效果访问http://localhost:8000/WebSocket官方示例$server = new swoole_websocket_server(“0.0.0.0”, 9501);$server->on(‘open’, function (swoole_websocket_server $server, $request) { echo “server: handshake success with fd{$request->fd}\n”; });$server->on(‘message’, function (swoole_websocket_server $server, $frame) { echo “receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n”; $server->push($frame->fd, “this is server”); });$server->on(‘close’, function ($ser, $fd) { echo “client {$fd} closed\n”; });$server->on(‘request’, function (swoole_http_request $request, swoole_http_response $response) { global $server;//调用外部的server // $server->connections 遍历所有websocket连接用户的fd,给所有用户推送 foreach ($server->connections as $fd) { $server->push($fd, $request->get[‘message’]); } });$server->start();详解:swoole_websocket_server 继承自 swoole_http_server设置了onRequest回调,websocket服务器也可以同时作为http服务器未设置onRequest回调,websocket服务器收到http请求后会返回http 400错误页面如果想通过接收http触发所有websocket的推送,需要注意作用域的问题,面向过程请使用global对swoole_websocket_server进行引用,面向对象可以把swoole_websocket_server设置成一个成员属性function onOpen(swoole_websocket_server $svr, swoole_http_request $req);当WebSocket客户端与服务器建立连接并完成握手后会回调此函数。$req 是一个Http请求对象,包含了客户端发来的握手请求信息onOpen事件函数中可以调用push向客户端发送数据或者调用close关闭连接onOpen事件回调是可选的function onMessage(swoole_websocket_server $server, swoole_websocket_frame $frame)当服务器收到来自客户端的数据帧时会回调此函数。$frame 是swoole_websocket_frame对象,包含了客户端发来的数据帧信息onMessage回调必须被设置,未设置服务器将无法启动客户端发送的ping帧不会触发onMessage,底层会自动回复pong包swoole_websocket_frame 属性$frame->fd,客户端的socket id,使用$server->push推送数据时需要用到$frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断$frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档$frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送(底层已经实现了自动合并数据帧,现在不用担心接收到的数据帧不完整)聊天室服务端示例目录结构:configsocket.phpsrcwebsocketConfig.phprun.phpWebSocketServer.php 内存表版本WsRedisServer.php redis版本WebSocketServer.php 内存表版本<?phpnamespace App\WebSocket;class WebSocketServer{ private $config; private $table; private $server; public function __construct() { // 内存表 实现进程间共享数据,也可以使用redis替代 $this->createTable(); // 实例化配置 $this->config = Config::getInstance(); } public function run() { $this->server = new \swoole_websocket_server( $this->config[‘socket’][‘host’], $this->config[‘socket’][‘port’] ); $this->server->on(‘open’, [$this, ‘open’]); $this->server->on(‘message’, [$this, ‘message’]); $this->server->on(‘close’, [$this, ‘close’]); $this->server->start(); } public function open(\swoole_websocket_server $server, \swoole_http_request $request) { $user = [ ‘fd’ => $request->fd, ’name’ => $this->config[‘socket’][’name’][array_rand($this->config[‘socket’][’name’])] . $request->fd, ‘avatar’ => $this->config[‘socket’][‘avatar’][array_rand($this->config[‘socket’][‘avatar’])] ]; // 放入内存表 $this->table->set($request->fd, $user); $server->push($request->fd, json_encode( array_merge([‘user’ => $user], [‘all’ => $this->allUser()], [’type’ => ‘openSuccess’]) ) ); } private function allUser() { $users = []; foreach ($this->table as $row) { $users[] = $row; } return $users; } public function message(\swoole_websocket_server $server, \swoole_websocket_frame $frame) { $this->pushMessage($server, $frame->data, ‘message’, $frame->fd); } /** * 推送消息 * * @param \swoole_websocket_server $server * @param string $message * @param string $type * @param int $fd / private function pushMessage(\swoole_websocket_server $server, string $message, string $type, int $fd) { $message = htmlspecialchars($message); $datetime = date(‘Y-m-d H:i:s’, time()); $user = $this->table->get($fd); foreach ($this->table as $item) { // 自己不用发送 if ($item[‘fd’] == $fd) { continue; } $server->push($item[‘fd’], json_encode([ ’type’ => $type, ‘message’ => $message, ‘datetime’ => $datetime, ‘user’ => $user ])); } } /* * 客户端关闭的时候 * * @param \swoole_websocket_server $server * @param int $fd / public function close(\swoole_websocket_server $server, int $fd) { $user = $this->table->get($fd); $this->pushMessage($server, “{$user[’name’]}离开聊天室”, ‘close’, $fd); $this->table->del($fd); } /* * 创建内存表 / private function createTable() { $this->table = new \swoole_table(1024); $this->table->column(‘fd’, \swoole_table::TYPE_INT); $this->table->column(’name’, \swoole_table::TYPE_STRING, 255); $this->table->column(‘avatar’, \swoole_table::TYPE_STRING, 255); $this->table->create(); }}WsRedisServer.php redis版本<?phpnamespace App\WebSocket;use Predis\Client;/* * 使用redis代替table,并存储历史聊天记录 * * Class WsRedisServer * @package App\WebSocket /class WsRedisServer{ private $config; private $server; private $client; private $key = “socket:user”; public function __construct() { // 实例化配置 $this->config = Config::getInstance(); // redis $this->initRedis(); // 初始化,主要是服务端自己关闭不会清空redis foreach ($this->allUser() as $item) { $this->client->hdel("{$this->key}:{$item[‘fd’]}", [‘fd’, ’name’, ‘avatar’]); } } public function run() { $this->server = new \swoole_websocket_server( $this->config[‘socket’][‘host’], $this->config[‘socket’][‘port’] ); $this->server->on(‘open’, [$this, ‘open’]); $this->server->on(‘message’, [$this, ‘message’]); $this->server->on(‘close’, [$this, ‘close’]); $this->server->start(); } public function open(\swoole_websocket_server $server, \swoole_http_request $request) { $user = [ ‘fd’ => $request->fd, ’name’ => $this->config[‘socket’][’name’][array_rand($this->config[‘socket’][’name’])] . $request->fd, ‘avatar’ => $this->config[‘socket’][‘avatar’][array_rand($this->config[‘socket’][‘avatar’])] ]; // 放入redis $this->client->hmset("{$this->key}:{$user[‘fd’]}", $user); // 给每个人推送,包括自己 foreach ($this->allUser() as $item) { $server->push($item[‘fd’], json_encode([ ‘user’ => $user, ‘all’ => $this->allUser(), ’type’ => ‘openSuccess’ ])); } } private function allUser() { $users = []; $keys = $this->client->keys("{$this->key}:"); // 所有的key foreach ($keys as $k => $item) { $users[$k][‘fd’] = $this->client->hget($item, ‘fd’); $users[$k][’name’] = $this->client->hget($item, ’name’); $users[$k][‘avatar’] = $this->client->hget($item, ‘avatar’); } return $users; } public function message(\swoole_websocket_server $server, \swoole_websocket_frame $frame) { $this->pushMessage($server, $frame->data, ‘message’, $frame->fd); } /** * 推送消息 * * @param \swoole_websocket_server $server * @param string $message * @param string $type * @param int $fd / private function pushMessage(\swoole_websocket_server $server, string $message, string $type, int $fd) { $message = htmlspecialchars($message); $datetime = date(‘Y-m-d H:i:s’, time()); $user[‘fd’] = $this->client->hget("{$this->key}:{$fd}", ‘fd’); $user[’name’] = $this->client->hget("{$this->key}:{$fd}", ’name’); $user[‘avatar’] = $this->client->hget("{$this->key}:{$fd}", ‘avatar’); foreach ($this->allUser() as $item) { // 自己不用发送 if ($item[‘fd’] == $fd) { continue; } $is_push = $server->push($item[‘fd’], json_encode([ ’type’ => $type, ‘message’ => $message, ‘datetime’ => $datetime, ‘user’ => $user ])); // 删除失败的推送 if (!$is_push) { $this->client->hdel("{$this->key}:{$item[‘fd’]}", [‘fd’, ’name’, ‘avatar’]); } } } /* * 客户端关闭的时候 * * @param \swoole_websocket_server $server * @param int $fd / public function close(\swoole_websocket_server $server, int $fd) { $user[‘fd’] = $this->client->hget("{$this->key}:{$fd}", ‘fd’); $user[’name’] = $this->client->hget("{$this->key}:{$fd}", ’name’); $user[‘avatar’] = $this->client->hget("{$this->key}:{$fd}", ‘avatar’); $this->pushMessage($server, “{$user[’name’]}离开聊天室”, ‘close’, $fd); $this->client->hdel("{$this->key}:{$fd}", [‘fd’, ’name’, ‘avatar’]); } /* * 初始化redis */ private function initRedis() { $this->client = new Client([ ‘scheme’ => $this->config[‘socket’][‘redis’][‘scheme’], ‘host’ => $this->config[‘socket’][‘redis’][‘host’], ‘port’ => $this->config[‘socket’][‘redis’][‘port’], ]); }}config.php<?phpnamespace App\WebSocket;class Config implements \ArrayAccess{ private $path; private $config; private static $instance; public function __construct() { $this->path = DIR . ‘/../../config/’; } // 单例模式 public static function getInstance() { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function offsetSet($offset, $value) { // 阉割 } public function offsetGet($offset) { if (empty($this->config)) { $this->config[$offset] = require $this->path . $offset . “.php”; } return $this->config[$offset]; } public function offsetExists($offset) { return isset($this->config[$offset]); } public function offsetUnset($offset) { // 阉割 } // 禁止克隆 final private function __clone(){}}config/socket.php<?phpreturn [ ‘host’ => ‘0.0.0.0’, ‘port’ => 9501, ‘redis’ => [ ‘scheme’ => ’tcp’, ‘host’ => ‘0.0.0.0’, ‘port’ => 6380 ], ‘avatar’ => [ ‘./images/avatar/1.jpg’, ‘./images/avatar/2.jpg’, ‘./images/avatar/3.jpg’, ‘./images/avatar/4.jpg’, ‘./images/avatar/5.jpg’, ‘./images/avatar/6.jpg’ ], ’name’ => [ ‘科比’, ‘库里’, ‘KD’, ‘KG’, ‘乔丹’, ‘邓肯’, ‘格林’, ‘汤普森’, ‘伊戈达拉’, ‘麦迪’, ‘艾弗森’, ‘卡哇伊’, ‘保罗’ ]];run.php<?phprequire DIR . ‘/../bootstrap.php’;$server = new App\WebSocket\WebSocketServer();$server->run();总结完整示例:聊天室学完后发现生活中所谓的聊天室其实也不过如此,当然这只是简单的demo,很多功能都没有实现,想进一步学习的话可以去github上找完整的项目进行深入学习参考swoolePHP + Swoole 实现的简单聊天室 ...

December 20, 2018 · 4 min · jiezi

一个基于 swoole 的异步 mqtt 客户端库,可用于接收或者发送 mqtt 协议的消息

一个基于 swoole 的异步 mqtt 客户端库,可用于接收或者发送 mqtt 协议的消息。支持 QoS 0、QoS 1、QoS 2。支持 MQTT 3.1 和 3.1.1 版本.安装composer require try-to/swoole_mqttExamplesubscribe.php<?phpuse TrytoMqtt\Client;require_once DIR . ‘/vendor/autoload.php’;$options = [ ‘clean_session’ => false, ‘client_id’ => ‘demo-subscribe-123456’, ‘username’ => ‘’, ‘password’ => ‘’,];$mqtt = new Client(‘127.0.0.1’, 1883, $options);$mqtt->onConnect = function ($mqtt) { $mqtt->subscribe(’/World’);};$mqtt->onMessage = function ($topic, $content) { var_dump($topic, $content);};$mqtt->onError = function ($exception) use ($mqtt) { echo “error\n”; // $mqtt->reconnect(1000);};$mqtt->onClose = function () { echo “close\n”;};$mqtt->connect();命令行运行 php subscribe.php 启动publish.php<?phpuse TrytoMqtt\Client;require_once DIR . ‘/../vendor/autoload.php’;$options = [ ‘clean_session’ => false, ‘client_id’ => ‘demo-publish-123456’, ‘username’ => ‘’, ‘password’ => ‘’,];$mqtt = new Client(‘127.0.0.1’, 1883, $options);$mqtt->onConnect = function ($mqtt) { $mqtt->publish(’/World’, ‘hello swoole mqtt’);};$mqtt->onError = function ($exception) { echo “error\n”;};$mqtt->onClose = function () { echo “close\n”;};$mqtt->connect();命令行运行 php publish.php 启动实现的接口Client::__construct()Client::connect()Client::reconnect()Client::publish()Client::subscribe()Client::unsubscribe()Client::disconnect()Client::close()callback onConnectcallback onMessagecallback onErrorcallback onClose地址github地址码云地址 ...

December 20, 2018 · 1 min · jiezi

编程语言的心智负担

很多编程语言对比的文章,总喜欢比较各种编程语言的性能、语法、IO模型。本文将从心智负担这个角度去比较下不同的编程语言和技术。因本人所擅长的编程语言有限,如有不对的地方,欢迎指正。内存越界如:C语言、C++(C with class)C/C++可以直接操作内存,但编程必须要面对内存越界问题。发生内存越界后,程序会直接core dump,开发者需要使用gdb工具分析内存错误的原因,如果内存越界是偶发的,比如由于数据同步问题造成,数亿次中会出现一次,解决起来非常困难,甚至需要顶级专家才能找到问题原因。心智负担:10现代C++提供了STL库包含大量容器,另外C++支持引用语法,不再需要直接操作指针,降低了内存错误读写的风险。使用现代C++的编程风格可以避免此问题。但由于C++没有完全从语法层面移除指针,不够彻底。宏C/C++程序中经常使用预定义宏实现一些逻辑,导致可读性变差。有些情况下会嵌套多次宏的使用,展开后变得极其难读。心智负担:6因此在C/C++中建议使用enum或static inline函数代替宏。内存管理如:C语言、C++C/C++语言,需要手工管理内存,malloc/new申请的内存要与free/delete成对使用。申请的内存忘记释放,就会出现内存泄漏。心智负担:8Java/PHP/Go等有GC的编程语言,不需要手工管理内存,不会因为代码错误引起内存泄漏。心智负担:0数值类型C/C++/GO等编程语言,提供了有符号、无符号整型和浮点型,8/16/32/64不同尺寸的整型。编程时需要额外处理,避免数值溢出。心智负担:6PHP/Java等编程语言,默认整数为有符号int64,降低了心智负担。一般业务项目中很难有超过2^63的数字,不会遇到问题。但如果是做科学计算,int64就难以满足需求了。在PHP中超过2^63底层会转为浮点型,计算将丢失精度。心智负担:1而Python整数是不限长度的,可以做任意位数的数值计算。心智负担:0类型约束Java是静态强类型编程语言,因此在编程中存在类型约束,某些情况下可能不是特别方便。如JSON序列化。不同类型的变量互相操作时可能需要进行显式类型转换。心智负担:2PHP/JS是动态弱类型编程语言,底层自动进行隐式类型转换。编程更方便。心智负担:0。但在大型项目,或对项目进行代码重构,以及项目代码更换开发者时,弱类型带来可维护性、可读性的难题,反而与Java/Go/C++这样的静态强类型编程语言,增加了大量心智负担。心智负担:5多线程编程Java/C++/Go提供了多线程并行编程、无锁编程,在编程中会存在数据同步问题。因此需要对临界资源进行加锁。而错误的锁操作又会带来,死锁和热点争抢问题。需要开发者具备极高的素质,否则难以做到正确无误并性能良好,这可能需要耗费大量心智。心智负担:10内存泄漏除PHP(php-fpm)之外的其他编程语言和技术(包括PHP + Swoole),在服务器端程序中均为长生命周期。对全局/静态变量操作可能会导致内存或资源句柄泄漏。编程时需要注意。心智负担:3而PHP(php-fpm)是短生命周期的,在请求结束后会立即释放所有内存和句柄,无需担心泄漏。心智负担:0IO 超时同步阻塞IO模型的编程语言和技术,在遇到某个慢IO会导致整个进程或线程挂起。极端情况下会出现所有进程/线程挂起,引起线上服务不可用。开发者需要格外注意设置IO操作的超时时间,避免慢请求带来进程/线程阻塞。心智负担:2而且异步IO的Go/Node.js/Swoole等无需担心此问题。心智负担:0汇总

December 18, 2018 · 1 min · jiezi

一个极简的基于swoole常驻内存框架

背景在用过laravel框架,发现它的路由和数据库ORM确实非常好用,但是整体确实有点慢,执行到控制器大于需要耗时60ms左右。于是打算做一个拥有非常好用的路由和orm又非常简单的框架。所以你会发现one框的路由和ORM有laravel的影子。但也有一些自己的特色,例如ORM支持自动化缓存(自动化读、写、刷新)保持与数据库同步,对外使用无感知。one框架也支持在fpm下运行,在fpm下框架自身整体耗时在1ms左右。hello world安装composer create-project lizhichao/one-app appcd appphp App/swoole.php 测试curl http://127.0.0.1:8081/主要功能RESTful路由中间件websocket/tcp/http……任意协议路由ORM模型统一的session处理mysql连接池redis连接池tcp连接池HTTP/TCP/WEBOSCKET/UDP服务器缓存进程间内存共享RPC(http,tcp,udp)日志RequestId跟踪路由Router::get(’/’, \App\Controllers\IndexController::class . ‘@index’);// 带参数路由Router::get(’/user/{id}’, \App\Controllers\IndexController::class . ‘@user’);// 路由分组 Router::group([’namespace’=>‘App\Test\WebSocket’],function (){ // websocket 路由 Router::set(‘ws’,’/a’,‘TestController@abc’); Router::set(‘ws’,’/b’,‘TestController@bbb’); });// 中间件Router::group([ ‘middle’ => [ \App\Test\MixPro\TestMiddle::class . ‘@checkSession’ ]], function () { Router::get(’/mix/ws’, HttpController::class . ‘@ws’); Router::get(’/mix/http’, HttpController::class . ‘@http’); Router::post(’/mix/http/loop’, HttpController::class . ‘@httpLoop’); Router::post(’/mix/http/send’, HttpController::class . ‘@httpSend’);});orm 模型定义模型namespace App\Model;use One\Database\Mysql\Model;// 模型里面不需要指定主键,框架会缓存数据库结构// 自动匹配主键,自动过滤非表结构里的字段class User extends Model{ // 定义模型对应的表名 CONST TABLE = ‘users’; // 定义关系 public function articles() { return $this->hasMany(‘id’,Article::class,‘user_id’); } // 定义事件 // 是否开启自动化缓存 // ……}使用模型在fpm下数据库连接为单列,在swoole模式下数据库连接自动切换为连接池// 查询一条记录$user = User::find(1);// 关联查询$user_list = User::whereIn(‘id’,[1,2,3])->with(‘articles’)->findAll()->toArray();// 更新$r = $user->update([’name’ => ‘aaa’]);// 或者$r = user::where(‘id’,1)->update([’name’ => ‘aaa’]);// $r 为影响记录数量缓存// 设置缓存Cache::set(‘ccc’,1);// 获取Cache::get(‘ccc’);// 或者 缓存ccc 过期10s 在tag1下面Cache::get(‘ccc’,function (){ return ‘缓存的信息’;},10,[’tag1’]);// 刷新tag1下的所有缓存Cache::flush(’tag1’);HTTP/TCP/WEBOSCKET/UDP服务器启动一个websocket服务器,添加http服务监听,添加tcp服务监听[ // 主服务器 ‘server’ => [ ‘server_type’ => \One\Swoole\OneServer::SWOOLE_WEBSOCKET_SERVER, ‘port’ => 8082, // 事件回调 ‘action’ => \One\Swoole\Server\WsServer::class, ‘mode’ => SWOOLE_PROCESS, ‘sock_type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, // swoole 服务器设置参数 ‘set’ => [ ‘worker_num’ => 5 ] ], // 添加监听 ‘add_listener’ => [ [ ‘port’ => 8081, // 事件回调 ‘action’ => \App\Server\AppHttpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, // 给监听设置参数 ‘set’ => [ ‘open_http_protocol’ => true, ‘open_websocket_protocol’ => false ] ], [ ‘port’ => 8083, // 打包 解包协议 ‘pack_protocol’ => \One\Protocol\Text::class, // 事件回调 ‘action’ => \App\Test\MixPro\TcpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, // 给监听设置参数 ‘set’ => [ ‘open_http_protocol’ => false, ‘open_websocket_protocol’ => false ] ] ]];RPC像调用本项目的方法一样调用远程服务器的方法。跨语言,跨机器。服务端启动rpc服务,框架已经内置了各个协议的rpc服务,添加到到上面配置文件的action即可。列如: 支持http调用,又支持tpc调用。// http 协议 rpc服务[ ‘port’ => 8082, ‘action’ => \App\Server\RpcHttpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘ip’ => ‘0.0.0.0’, ‘set’ => [ ‘open_http_protocol’ => true, ‘open_websocket_protocol’ => false ]],// tpc 协议 rpc服务[ ‘port’ => 8083, ‘action’ => \App\Server\RpcTcpPort::class, ’type’ => SWOOLE_SOCK_TCP, ‘pack_protocol’ => \One\Protocol\Frame::class, // tcp 打包 解包协议 ‘ip’ => ‘0.0.0.0’, ‘set’ => [ ‘open_http_protocol’ => false, ‘open_websocket_protocol’ => false, ‘open_length_check’ => 1, ‘package_length_func’ => ‘\One\Protocol\Frame::length’, ‘package_body_offset’ => \One\Protocol\Frame::HEAD_LEN, ]]添加具体服务到rpc,例如有个类Abcclass Abc{ private $a; // 初始值 public function __construct($a = 0) { $this->a = $a; } // 加法 public function add($a, $b) { return $this->a + $a + $b; } public function time() { return date(‘Y-m-d H:i:s’); } // 重新设初始值 public function setA($a) { $this->a = $a; return $this; }}把Abc添加到rpc服务// 添加Abc到rpc服务RpcServer::add(Abc::class);// 如果你不希望把Abc下的所有方法都添加到rpc服务,也可以指定添加。// 未指定的方法客户端无法调用.//RpcServer::add(Abc::class,‘add’);// 分组添加//RpcServer::group([// // 中间件 在这里可以做 权限验证 数据加解密 等等// ‘middle’ => [// TestMiddle::class . ‘@aa’// ],// // 缓存 如果设置了 当以同样的参数调用时 会返回缓存信息 不会真正调用 单位:秒// ‘cache’ => 10//], function () {// RpcServer::add(Abc::class);// RpcServer::add(User::class);//});客户端调用为了方便调用我们建立一个映射类(one框架可自动生成)class ClientAbc extends RpcClientHttp { // rpc服务器地址 protected $_rpc_server = ‘http://127.0.0.1:8082/’; // 远程的类 不设置 默认为当前类名 protected $_remote_class_name = ‘Abc’;}调用rpc服务的远程方法, 和调用本项目的方法一样的。你可以想象这个方法就在你的项目里面。$abc = new ClientAbc(5);// $res === 10$res = $abc->add(2,3);// 链式调用 $res === 105$res = $abc->setA(100)->add(2,3);// 如果把上面的模型的User添加到rpc// RpcServer::add(User::class);// 下面运行结果和上面一样// $user_list = User::whereIn(‘id’,[1,2,3])->with(‘articles’)->findAll()->toArray();上面是通过http协议调用的。你也可以通过其他协议调用。例如Tpc协议class ClientAbc extends RpcClientTcp { // rpc服务器地址 protected $_rpc_server = ’tcp://127.0.0.1:8083/’; // 远程的类 不设置 默认为当前类名 protected $_remote_class_name = ‘Abc’;}其中类 RpcClientHttp,RpcClientTcp在框架里。 你也可以复制到任何其他地方使用。githubQQ交流群: 731475644 ...

December 17, 2018 · 2 min · jiezi

swoole安装

swoole安装git clone https://gitee.com/swoole/swoole.gitcd swoole/phpize./configure –with-php-config=/usr/local/php/bin/php-configmake && make install修改php配置文件,使swoole生效#编辑vim /usr/local/php/etc/php.ini#添加extension=swoole.so

December 14, 2018 · 1 min · jiezi

使用 Docker / Docker Compose 部署 Swoft 应用

Swoft首个基于 Swoole 原生协程的新时代 PHP 高性能协程全栈框架,内置协程网络服务器及常用的协程客户端,常驻内存,不依赖传统的 PHP-FPM,全异步非阻塞 IO 实现,以类似于同步客户端的写法实现异步客户端的使用,没有复杂的异步回调,没有繁琐的 yield, 有类似 Go 语言的协程、灵活的注解、强大的全局依赖注入容器、完善的服务治理、灵活强大的 AOP、标准的 PSR 规范实现等等,可以用于构建高性能的Web系统、API、中间件、基础服务等等。Swoft 的 Docker 镜像突然白话文 使用 docker 安装 swoft 其实听起来比较怪怪的,swoft 是一套 php 框架,依赖 swoole 扩展,说 docker 安装 swoft,其实是 docker 安装 swoft 运行所需的组件依赖和环境。swoft 框架运行环境所需的依赖和环境挨个安装搭建还是需要一些时间的,比如 php 版本 >= 7.1, swoole 版本 >= 2.1, 而且还要安装 hiredis 来协助开启 swoole 的异步 redis 客户端,同时要求 swoole 开启协程模式等。所以呢,为了节省我们的时间,官方提供了一个 docker 镜像包,里面包含了 swoft 运行环境所需要的各项组件:php 7.1+swoole 2.1+ –enable-async-redis-client –enable-coroutinecomposerpecl我们只需要下载镜像并新建一个容器,这个容器就提供了 swoft 框架所需的所有依赖和环境,我们只需要将本地的 sowft 项目挂载到镜像的 swoft 工作目录 /var/www/swoft 下,就可以继续我们的开发或生产工作了。让你从 swoft 略繁琐的依赖和环境搭建中解放出来,直接进入业务开发工作中去。一开始我没理解好这个 swoft 镜像,镜像里自带的框架其实是单纯的用来体验的,我一直误以为要编辑镜像的 swoft 框架源码做开发….需要特别注意的是,sowft 镜像的 entrypoint 命令(运行初始化命令)是ENTRYPOINT [“php”, “/var/www/swoft/bin/swoft”, “start”]即容器启动时会同时启动 swoft 框架,这就需要如果我们挂载本地 swoft 项目到镜像工作目录时,本地的 swoft 项目需已安装好框架的各依赖组件。我们从 git 上直接拉取的 swoft 项目是没有安装这些组件的,需使用 composer install [–no-dev] 安装,框架才能正常启动,这就要求宿主机上至少要有基础的 php + composer 的环境。当然,镜像内是工作目录下是有一套完全安装的 swoft 框架的,如果你只是为了体验,直接启动容器就好,但没什么直接的意义。在后面我们将给出一个只需要在宿主机上安装运维所需的 docker docker-compose git 即可完全借助 swoft 镜像去部署开发或生产环境的方法(修改镜像 entrypoint 到 bash 模式,然后进入镜像后使用 composer 安装依赖,启动 swoft)Docker 部署 swoft宿主机仍需安装基本的 php / composer(或者你把自己本地开发的项目cp过来,但这样可能会导致部分组件版本不一致,还是提交业务代码 + composer.json + composer.lock 文件,排除 vendor 目录,在线上服务器再 composer install 一遍最为规范)1、在宿主机创建 swoft 项目(宿主机需实安装基础的 php 环境来使用 composer)composer create-project –prefer-dist swoft/swoft swoft [–dev] && cd swoft或者git clone git@github.com:swoft-cloud/swoft.git && cd swoft && composer install && cd swoft2、拉取 swoft 镜像 创建 swoft 容器 并将宿主机上安装好的 swoft 项目挂载到 swoft 容器的工作目录// 拉取 swoft 镜像// 关联本地 swoft 项目目录到镜像的项目目录(/var/www/swoft)// 映射主机 8081 端口 到 容器 80 端口// 容器命名为 mySwoft// 守护模式启动docker run -v $(pwd):/var/www/swoft -p 8081:80 –name mySwoft -d swoft/swoft// 查看容器是否运行docker ps// 查看容器日志docker logs mySwoft3、进入 swoft 容器 shell// 交互模式执行 mySwoft 容器的 bashdocker exec -it mySwoft bash// stop 会停止容器所以会退出 shell 后用 docker start mySwoft 启动就好root@cce12db9add3:/var/www/swoft# php bin/swoft start|stop|reload// 因我们将宿主机上的swoft项目挂载到了swoft容器的项目目录/var/www/swoft 所以后期开发修改宿主机上的项目即可// 可以使用PS的FTP同步工具可以在 swoft 的容器 shell 里通过命令查看相应的组件版本root@cce12db9add3:/var/www/swoft# php -v root@cce12db9add3:/var/www/swoft# php –ri swooleroot@cce12db9add3:/var/www/swoft# composer -V root@cce12db9add3:/var/www/swoft# pecl -VDocker Composer 部署 Swoft宿主机仍需安装基本的 php / composer(或者你把自己本地开发的项目cp过来,但这样可能会导致部分组件版本不一致,还是提交业务代码 + composer.json + composer.lock 文件,排除 vendor 目录,在线上服务器再 composer install 一遍最为规范)swoft 项目中是有 docker-compose.yml 文件的version: ‘3’services: swoft: image: swoft/swoft:latest# build: ./ ports: - “80:80” #端口映射 volumes: - ./:/var/www/swoft # 挂载当前路径下的本地swoft项目到镜像项目路径 stdin_open: true #打开标准输出 tty: true # 打开 tty 会话 privileged: true # 给与权限 比如创建文件夹之类的 entrypoint: [“php”, “/var/www/swoft/bin/swoft”, “start”] # 入口启动命令 即启动 swoft 服务使用方法自然比直接用 docker 方便些,不过依旧是要在宿主机上先创建一个 swoft 项目1、在宿主机创建 swoft 项目(宿主机需实安装基础的 php 环境来使用 composer)composer create-project –prefer-dist swoft/swoft swoft [–dev] && cd swoft或者git clone git@github.com:swoft-cloud/swoft.git && cd swoft && composer install && cd swoft2、使用 docker-compose 来编排启动容器编辑 docker-compose.yaml 文件 给容器自定义个名字version: ‘3’services: swoft: image: swoft/swoft:latest container_name: mySwoft # 给容器自定义个名称便于管理# build: ./ ports: - “80:80” #端口映射 volumes: - ./:/var/www/swoft # 挂载当前路径下的本地swoft项目到镜像项目路径 stdin_open: true #打开标准输出 tty: true # 打开 tty 会话 privileged: true # 给与权限 比如创建文件夹之类的 entrypoint: [“php”, “/var/www/swoft/bin/swoft”, “start”] # 入口启动命令 即启动 swoft 服务# 启动容器docker-compose up -d swoft# 查看容器是否成功运行docker ps# 进入容器shelldocker exec -it mySwoft bash如何在未安装 PHP 环境的宿主机上部署 swoft前面两种部署 swoft 的方法都需要在宿主机上安装 php 基础环境来使用 composer 安装好本地 swoft 项目的依赖组件,才能与 swoft 镜像的工作目录挂载,启动容器(因为容器的入口命令就是直接启动 swoft,如果我们挂载本地未安装好依赖的 swoft 项目到镜像工作目录,那容器就会启动失败退出了),下面我们介绍一种不需要在宿主机上安装 php / composer 的方法。git+ docker + docker-composer1、拉取 swoft(拉取就好,不需要安装)git clone git@github.com:swoft-cloud/swoft.git && cd swoft && cd swoft2、编辑 docker-composer.yaml 文件修改容器启动的 entrypoint 命令,让容器启动,但不启动 swoft,等我们进入容器,使用容器中自带的 composer 安装好框架组件依赖后再启动 swoftversion: ‘3’services: swoft: container_name: mySwoft image: swoft/swoft:latest# build: ./ ports: - “8082:80” # 映射宿主机 8082 端口到 容器 80 volumes: - ./:/var/www/swoft stdin_open: true # 一定要开启此项 否则容器会因 bash 执行完退出 tty: true privileged: true# entrypoint: [“php”, “/var/www/swoft/bin/swoft”, “start”] entrypoint: [“bash”] # 改为此命令后 启动容器时默认不会启动 swoft 所以即使框架依赖未安装 也不会影响容器启动3、保存 docker-composer.yaml 后启动容器docker-composer up -d swoft4、进入容器 shell 使用容器种的 composer 安装框架依赖docker exec -it mySwoft bashcomposer install [–no-dev]5、启动 swoftphp bin/swoft start|stop|restart这种方法使得宿主机完全省去了还要事先简单安装下 php / composer 的工作,完全发挥镜像的功能~ ...

December 7, 2018 · 3 min · jiezi

PHP 协程:Go + Chan + Defer

Swoole4提供了强大的PHP CSP协程编程模式。底层提供了3个关键词,可以方便地实现各类功能。本文基于Swoole-4.2.9和PHP-7.2.9版本关键词go :创建一个协程chan :创建一个通道defer :延迟任务,在协程退出时执行,先进后出这3个功能底层实现全部为内存操作,没有任何IO资源消耗。就像PHP的Array一样是非常廉价的。如果有需要就可以直接使用。这与socket和file操作不同,后者需要向操作系统申请端口和文件描述符,读写可能会产生阻塞的IO等待。协程并发使用go函数可以让一个函数并发地去执行。在编程过程中,如果某一段逻辑可以并发执行,就可以将它放置到go协程中执行。顺序执行function test1() { sleep(1); echo “b”;} function test2() { sleep(2); echo “c”;}test1();test2();执行结果:htf@LAPTOP-0K15EFQI:$ time php b1.phpbcreal 0m3.080suser 0m0.016ssys 0m0.063shtf@LAPTOP-0K15EFQI:$上述代码中,test1和test2会顺序执行,需要3秒才能执行完成。并发执行使用go创建协程,可以让test1和test2两个函数变成并发执行。Swoole\Runtime::enableCoroutine();go(function () { sleep(1); echo “b”;}); go(function () { sleep(2); echo “c”;});Swoole\Runtime::enableCoroutine()作用是将PHP提供的stream、sleep、pdo、mysqli、redis等功能从同步阻塞切换为协程的异步IO执行结果:bchtf@LAPTOP-0K15EFQI:$ time php co.phpbcreal 0m2.076suser 0m0.000ssys 0m0.078shtf@LAPTOP-0K15EFQI:$可以看到这里只用了2秒就执行完成了。顺序执行耗时等于所有任务执行耗时的总和 :t1+t2+t3…)并发执行耗时等于所有任务执行耗时的最大值 :max(t1, t2, t3, …)协程通信有了go关键词之后,并发编程就简单多了。与此同时又带来了新问题,如果有2个协程并发执行,另外一个协程,需要依赖这两个协程的执行结果,如果解决此问题呢?答案就是使用通道(Channel),在Swoole4协程中使用new chan就可以创建一个通道。通道可以理解为自带协程调度的队列。它有两个接口push和pop:push:向通道中写入内容,如果已满,它会进入等待状态,有空间时自动恢复pop:从通道中读取内容,如果为空,它会进入等待状态,有数据时自动恢复使用通道可以很方便地实现并发管理。$chan = new chan(2);# 协程1go (function () use ($chan) { $result = []; for ($i = 0; $i < 2; $i++) { $result += $chan->pop(); } var_dump($result);});# 协程2go(function () use ($chan) { $cli = new Swoole\Coroutine\Http\Client(‘www.qq.com’, 80); $cli->set([’timeout’ => 10]); $cli->setHeaders([ ‘Host’ => “www.qq.com”, “User-Agent” => ‘Chrome/49.0.2587.3’, ‘Accept’ => ’text/html,application/xhtml+xml,application/xml’, ‘Accept-Encoding’ => ‘gzip’, ]); $ret = $cli->get(’/’); // $cli->body 响应内容过大,这里用 Http 状态码作为测试 $chan->push([‘www.qq.com’ => $cli->statusCode]);});# 协程3go(function () use ($chan) { $cli = new Swoole\Coroutine\Http\Client(‘www.163.com’, 80); $cli->set([’timeout’ => 10]); $cli->setHeaders([ ‘Host’ => “www.163.com”, “User-Agent” => ‘Chrome/49.0.2587.3’, ‘Accept’ => ’text/html,application/xhtml+xml,application/xml’, ‘Accept-Encoding’ => ‘gzip’, ]); $ret = $cli->get(’/’); // $cli->body 响应内容过大,这里用 Http 状态码作为测试 $chan->push([‘www.163.com’ => $cli->statusCode]);});执行结果:htf@LAPTOP-0K15EFQI:/swoole-src/examples/5.0$ time php co2.phparray(2) { [“www.qq.com”]=> int(302) [“www.163.com”]=> int(200)}real 0m0.268suser 0m0.016ssys 0m0.109shtf@LAPTOP-0K15EFQI:/swoole-src/examples/5.0$这里使用go创建了3个协程,协程2和协程3分别请求qq.com和163.com主页。协程1需要拿到Http请求的结果。这里使用了chan来实现并发管理。协程1循环两次对通道进行pop,因为队列为空,它会进入等待状态协程2和协程3执行完成后,会push数据,协程1拿到了结果,继续向下执行延迟任务在协程编程中,可能需要在协程退出时自动实行一些任务,做清理工作。类似于PHP的register_shutdown_function,在Swoole4中可以使用defer实现。Swoole\Runtime::enableCoroutine();go(function () { echo “a”; defer(function () { echo “a”; }); echo “b”; defer(function () { echo “b”; }); sleep(1); echo “c”;});执行结果:htf@LAPTOP-0K15EFQI:/swoole-src/examples/5.0$ time php defer.phpabcbareal 0m1.068suser 0m0.016ssys 0m0.047shtf@LAPTOP-0K15EFQI:/swoole-src/examples/5.0$结语Swoole4提供的Go + Chan + Defer为PHP带来了一种全新的CSP并发编程模式。灵活使用Swoole4提供的各项特性,可以解决工作中各类复杂功能的设计和开发。 ...

December 4, 2018 · 1 min · jiezi

PHP协程:并发 shell_exec

在PHP程序中经常需要用shell_exec执行一些命令,而普通的shell_exec是阻塞的,如果命令执行时间过长,那可能会导致进程完全卡住。在Swoole4协程环境下可以用Co::exec并发地执行很多命令。本文基于Swoole-4.2.9和PHP-7.2.9版本协程示例<?php$c = 10;while($c–) { go(function () { //这里使用 sleep 5 来模拟一个很长的命令 co::exec(“sleep 5”); });}协程结果htf@htf-ThinkPad-T470p:/workspace/debug$ time php t.phpreal 0m5.089suser 0m0.067ssys 0m0.038shtf@htf-ThinkPad-T470p:/workspace/debug$只用了 5秒,程序就跑完了。下面换成 PHP 的 shell_exec 来试试。阻塞代码<?php$c = 10;while($c–) { //这里使用 sleep 5 来模拟一个很长的命令 shell_exec(“sleep 5”);}阻塞结果htf@htf-ThinkPad-T470p:/workspace/debug$ time php s.php real 0m50.119suser 0m0.066ssys 0m0.058shtf@htf-ThinkPad-T470p:/workspace/debug$ 可以看到阻塞版本花费了50秒才完成。Swoole4提供的协程,是并发编程的利器。在工作中很多地方都可以使用协程,实现并发程序,大大提升程序性能。

November 29, 2018 · 1 min · jiezi

Swoole 4.1.0 正式版发布,支持原生 Redis/PDO/MySQLi 协程化

重大新特性支持 Redis/PDO/MySQLi从4.1.0版本开始支持了对PHP原生Redis、PDO、MySQLi协程化的支持。可使用SwooleRuntime::enableCorotuine()将普通的同步阻塞Redis、PDO、MySQLi操作变为协程调度的异步非阻塞IOSwooleRuntime::enableCoroutine();go(function () { $redis = new redis; $retval = $redis->connect(“127.0.0.1”, 6379); var_dump($retval, $redis->getLastError()); var_dump($redis->get(“key”)); var_dump($redis->set(“key”, “value”)); $redis->close();});协程跟踪新版本增加了两个方法用于跟踪协程运行。Coroutine::listCoroutines()可遍历当前所有协程Coroutine::getBackTrace($cid)可获取某个协程的函数调用栈function test1() { test2();}function test2() { while(true) { co::sleep(10); echo FUNCTION." n"; }}$cid = go(function () { test1();});go(function () use ($cid) { while(true) { echo “BackTrace[$cid]:n———————————————–n”; //返回数组,需要自行格式化输出 var_dump(co::getBackTrace($cid)).“n”; co::sleep(3); }});BackTrace[1]:———————————————–#0 SwooleCoroutine::sleep(10) called at [/home/htf/workspace/swoole/examples/coroutine/backtrace.php:8]#1 test2() called at [/home/htf/workspace/swoole/examples/coroutine/backtrace.php:3]#2 test1() called at [/home/htf/workspace/swoole/examples/coroutine/backtrace.php:14]其他修改重构 CoChannel C底层代码为C++, 解决复杂场景的非预期结果, 实现高稳定重构 CoHttpClient C底层代码为C++协程模式, 解决异步时序问题, 实现高稳定支持在协程和Server中使用exit, 此时将会抛出可捕获的SwooleExitException异常移除所有迭代器(table/connection/coroutine_list)的PCRE依赖限制增加open_websocket_close_frame配置, 可以在onMessage事件中接收close帧废弃HttpResponse->gzip()方法,改为使用http_compression配置项。底层会自动判断客户端传入的Accept-Encoding选择合适的压缩方法, 新增谷歌BR压缩支持增加CoHttpClient->addData()方法,可将内存中的数据作为上传文件内容进行发送Solaris系统支持Http2支持MAX_FRAME_SIZE分帧发送和MAX_HEADER_LIST_SIZE处理, 客户端增加isStreamExist方法检测是否存在对应流swoole_http_response->status增加reason参数修复MySQL prepare 中无符号参数使用了有符号值导致数值溢出的问题修复HTTP2的onRequest回调中没有协程的问题修复tasking_num某些特殊情况下变为-1的问题修复HTTP2-server的window-update帧构造错误修复所有PHP版本下的所有级别的编译warningGCC版本小于4.8时将会产生编译错误修复MySQL使用prepare时未使用参数绑定导致的内存分配不断增长修复HTTP2重连时旧stream内存丢失泄露底层开发相关统一文件命名 #970CoHttpClient使用了create_obj和free_obj保证内存安全, 防止错误的PHP代码引发内存问题 ...

September 1, 2018 · 1 min · jiezi