最近使用 swoole 做了项目,里面设计推送信息给界面前端,和无登陆用户的状态监控,以下是本人从中获取的一点心得,有改进的地方请留言评论。

需求分析

我们假设有一个需求,我在后端点击按钮1,首页弹出“后端触发了按钮1”。后端点了按钮2,列表页弹出“后端触发了按钮2”。做到根据不同场景推送到不同页面。

代码思路

  • Swoole fd

客户端浏览器打开或者刷新界面,在swoole服务会生成一个进程句柄 fd ,每次浏览器页面有打开链接websocket的js代码,便会生成,每次刷新的时候,会关闭之前打开的 fd,重新生成一个新的,关闭界面的时候会生成一个新的。swoole的 fd生成规则是从1开始递增。

  • Redis Hash存储 fd

我们建立一个key为swoole:fds redis哈希类型数据,fd 为hash的字段,每个字段的值我们存储前端websocket请求的url参数信息(根据业务复杂度自己灵活变通,我在项目中会在url带上sessionId)。每次链接打开swoole服务的时候我们存储其信息,每次关闭页面时候我们清除其字段。在redis存储如下

  • 触发分场景推送

    在界面上当进行了触发操作的时候,通过后台curl请求swoole http服务,swoole http服务根据你向我传递的参数分发给对应的逻辑处理。如curl请求127.0.0.1:9502page=back&func=pushHomeLogic&token=123456 我们可以根据传入的func参数,在后台分发给对应逻辑处理。如分发给pushHomeLogic方法。在其里面实现自己的逻辑。为防止过多的if else 以及 foreach 操作,我们采用的是闭包,call_user_func等方法实现如下

    public function onRequest($request,$response)   {       if ($this->checkAccess("", $request)) {           $param = $request->get;           // 分发处理请求逻辑           if (isset($param['func'])) {               if (method_exists($this,$param['func'])) {                   call_user_func([$this,$param['func']],$request);               }           }       }   }// 往首页推送逻辑处理   public function pushHomeLogic($request)   {       $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {           if ($aContent && $aContent['page'] == "home") {               $aRes['message'] = "后端按了按钮1";               $aRes['code'] = "200";               $oSwoole::$server->push($fd,xss_json($aRes));           }       };       $this->eachFdLogic($callback);   }

完整代码

swool脚本代码逻辑

<?phpnamespace App\Console\Commands;use Closure;use Illuminate\Console\Command;use Illuminate\Support\Facades\Redis;class SwooleDemo extends Command{    // 命令名称    protected $signature = 'swoole:demo';    // 命令说明    protected $description = '这是关于swoole websocket的一个测试demo';    // swoole websocket服务    private static $server = null;    public function __construct()    {        parent::__construct();    }    // 入口    public function handle()    {        $this->redis = Redis::connection('websocket');        $server = self::getWebSocketServer();        $server->on('open',[$this,'onOpen']);        $server->on('message', [$this, 'onMessage']);        $server->on('close', [$this, 'onClose']);        $server->on('request', [$this, 'onRequest']);        $this->line("swoole服务启动成功 ...");        $server->start();    }    // 获取服务    public static function getWebSocketServer()    {        if (!(self::$server instanceof \swoole_websocket_server)) {            self::setWebSocketServer();        }        return self::$server;    }    // 服务处始设置    protected static function setWebSocketServer():void    {        self::$server  = new \swoole_websocket_server("0.0.0.0", 9502);        self::$server->set([            'worker_num' => 1,            'heartbeat_check_interval' => 60,    // 60秒检测一次            'heartbeat_idle_time' => 121,        // 121秒没活动的        ]);    }    // 打开swoole websocket服务回调代码    public function onOpen($server, $request)    {        if ($this->checkAccess($server, $request)) {            self::$server->push($request->fd,xss_json(["code"=>200,"message"=>"打开swoole服务成功"]));        }    }    // 给swoole websocket 发送消息回调代码    public function onMessage($server, $frame)    {    }    // http请求swoole websocket 回调代码    public function onRequest($request,$response)    {        if ($this->checkAccess("", $request)) {            $param = $request->get;            // 分发处理请求逻辑            if (isset($param['func'])) {                if (method_exists($this,$param['func'])) {                    call_user_func([$this,$param['func']],$request);                }            }        }    }    // websocket 关闭回调代码    public function onClose($serv,$fd)    {        $this->redis->hdel('swoole:fds', $fd);        $this->line("客户端 {$fd} 关闭");    }    // 校验客户端连接的合法性,无效的连接不允许连接    public function checkAccess($server, $request):bool    {        $bRes = true;        if (!isset($request->get) || !isset($request->get['token'])) {            self::$server->close($request->fd);            $this->line("接口验证字段不全");            $bRes = false;        } else if ($request->get['token'] != 123456) {            $this->line("接口验证错误");            $bRes = false;        }        $this->storeUrlParamToRedis($request);        return $bRes;    }    // 将每个界面打开websocket的url 存储起来    public function storeUrlParamToRedis($request):void    {        // 存储请求url带的信息        $sContent = json_encode(            [                'page' => $request->get['page'],                'fd' => $request->fd,            ], true);        $this->redis->hset("swoole:fds", $request->fd, $sContent);    }    /**     * @param $request     * @see 循环逻辑处理     */    public function eachFdLogic(Closure $callback = null)    {        foreach (self::$server->connections as $fd) {            if (self::$server->isEstablished($fd)) {                $aContent = json_decode($this->redis->hget("swoole:fds",$fd),true);                $callback($aContent,$fd,$this);            } else {                $this->redis->hdel("swoole:fds",$fd);            }        }    }    // 往首页推送逻辑处理    public function pushHomeLogic($request)    {        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {            if ($aContent && $aContent['page'] == "home") {                $aRes['message'] = "后端按了按钮1";                $aRes['code'] = "200";                $oSwoole::$server->push($fd,xss_json($aRes));            }        };        $this->eachFdLogic($callback);    }    // 往列表页推送逻辑处理    public function pushListLogic($request)    {        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {            if ($aContent && $aContent['page'] == "list") {                $aRes['message'] = "后端按了按钮2";                $aRes['code'] = "200";                $oSwoole::$server->push($fd,xss_json($aRes));            }        };        $this->eachFdLogic($callback);    }    // 启动websocket服务    public function start()    {        self::$server->start();    }}

控制器代码

<?phpnamespace App\Http\Controllers;use Illuminate\Http\Request;use Illuminate\Support\Facades\Redis;class TestController extends Controller{    // 首页    public function home()    {        return view("home");    }    // 列表    public function list()    {        return view("list");    }    // 后端控制    public function back()    {        if (request()->method() == 'POST') {           $this->curl_get($this->getUrl());           return json_encode(['code'=>200,"message"=>"成功"]);        } else {            return view("back");        }    }    // 获取要请求swoole websocet服务地址    public function getUrl():string    {        // 域名 端口 请求swoole服务的方法        $sBase = request()->server('HTTP_HOST');        $iPort = 9502;        $sFunc = request()->post('func');        $sPage = "back";        return $sBase.":".$iPort."?func=".$sFunc."&token=123456&page=".$sPage;    }    // curl 推送    public function curl_get(string $url):string    {        $ch_curl = curl_init();        curl_setopt ($ch_curl, CURLOPT_TIMEOUT_MS, 3000);        curl_setopt($ch_curl, CURLOPT_SSL_VERIFYPEER, 0);        curl_setopt ($ch_curl, CURLOPT_HEADER,false);        curl_setopt($ch_curl, CURLOPT_HTTPGET, 1);        curl_setopt($ch_curl, CURLOPT_RETURNTRANSFER,true);        curl_setopt ($ch_curl, CURLOPT_URL,$url);        $str  = curl_exec($ch_curl);        curl_close($ch_curl);        return $str;    }}

页面js代码

  • 后端控制页

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>后端界面</title>    <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"></head><body><button class="push" data-func="pushHomeLogic">按钮1</button><button class="push" data-func="pushListLogic">按钮2</button></body><script src="{{ asset("/vendor/tw/global/jQuery/jquery-2.2.3.min.js")}} "></script><script>    $(function () {        $(".push").on('click',function(){            var func = $(this).attr('data-func').trim();            ajaxGet(func)        })        function ajaxGet(func) {            url = "{{route('back')}}";            token = "{{csrf_token()}}";            $.ajax({                url: url,                type: 'post',                dataType: "json",                data:{func:func,_token:token},                error: function (data) {                    alert("服务器繁忙, 请联系管理员!");                    return;                },                success: function (result) {                },            })        }    })</script></html>
  • 首页

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>swoole首页</title>    <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"></head><body><h1>这是首页</h1></body><script>    var ws;//websocket实例    var lockReconnect = false;//避免重复连接    var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=home&token=123456';    function initEventHandle() {        ws.onclose = function () {            reconnect(wsUrl);        };        ws.onerror = function () {            reconnect(wsUrl);        };        ws.onopen = function () {            //心跳检测重置            heartCheck.reset().start();        };        ws.onmessage = function (event) {            //如果获取到消息,心跳检测重置            //拿到任何消息都说明当前连接是正常的            var data = JSON.parse(event.data);            if (data.code == 200) {                console.log(data.message)            }            heartCheck.reset().start();        }    }    createWebSocket(wsUrl);    /**     * 创建链接     * @param url     */    function createWebSocket(url) {        try {            ws = new WebSocket(url);            initEventHandle();        } catch (e) {            reconnect(url);        }    }    function reconnect(url) {        if(lockReconnect) return;        lockReconnect = true;        //没连接上会一直重连,设置延迟避免请求过多        setTimeout(function () {            createWebSocket(url);            lockReconnect = false;        }, 2000);    }    //心跳检测    var heartCheck = {        timeout: 60000,//60秒        timeoutObj: null,        serverTimeoutObj: null,        reset: function(){            clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this;        },        start: function(){            var self = this;            this.timeoutObj = setTimeout(function(){                //这里发送一个心跳,后端收到后,返回一个心跳消息,                //onmessage拿到返回的心跳就说明连接正常                ws.send("heartbeat");                self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了                    ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次                }, self.timeout);            }, this.timeout);        },        header:function(url) {            window.location.href=url        }    }</script></html>
  • 列表页面

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>swoole列表页</title>    <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"></head><body><h1>swoole列表页</h1></body><script>    var ws;//websocket实例    var lockReconnect = false;//避免重复连接    var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=list&token=123456';    function initEventHandle() {        ws.onclose = function () {            reconnect(wsUrl);        };        ws.onerror = function () {            reconnect(wsUrl);        };        ws.onopen = function () {            //心跳检测重置            heartCheck.reset().start();        };        ws.onmessage = function (event) {            //如果获取到消息,心跳检测重置            //拿到任何消息都说明当前连接是正常的            var data = JSON.parse(event.data);            if (data.code == 200) {                console.log(data.message)            }            heartCheck.reset().start();        }    }    createWebSocket(wsUrl);    /**     * 创建链接     * @param url     */    function createWebSocket(url) {        try {            ws = new WebSocket(url);            initEventHandle();        } catch (e) {            reconnect(url);        }    }    function reconnect(url) {        if(lockReconnect) return;        lockReconnect = true;        //没连接上会一直重连,设置延迟避免请求过多        setTimeout(function () {            createWebSocket(url);            lockReconnect = false;        }, 2000);    }    //心跳检测    var heartCheck = {        timeout: 60000,//60秒        timeoutObj: null,        serverTimeoutObj: null,        reset: function(){            clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this;        },        start: function(){            var self = this;            this.timeoutObj = setTimeout(function(){                //这里发送一个心跳,后端收到后,返回一个心跳消息,                //onmessage拿到返回的心跳就说明连接正常                ws.send("heartbeat");                self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了                    ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次                }, self.timeout);            }, this.timeout);        },        header:function(url) {            window.location.href=url        }    }</script></html>

界面效果

后台控制点击按钮1


后端界面点击按钮2