基于swoole的swoolefy实现类似go的waitGroup多并发协程调度

48次阅读

共计 2428 个字符,预计需要花费 7 分钟才能阅读完成。

swoolefy 是一个基于 swoole 实现的轻量级高性能的常驻内存型的 API 和 Web 应用服务框架,高度封装了 http,websocket,udp 服务器,以及基于 tcp 实现可扩展的 rpc 服务,同时支持 composer 包方式安装部署项目。基于实用,swoolefy 抽象 Event 事件处理类,实现与底层的回调的解耦,支持协程调度,同步 | 异步调用,全局事件注册,心跳检查,异步任务,多进程 (池) 等,内置 view、log、session、mysql、redis、mongodb 等常用组件等。

目前 swoolefy4.2+ 版本完全支持 swoole4.2.13+ 的协程,推荐使用 swoole4.3+

GitHub:https://github.com/bingcool/s…

下面主要讲解一下如何实现了类似 go 的 waitGroup 的功能
1、定义 GoWaitGroup 的类:

<?php
/**
+----------------------------------------------------------------------
| swoolefy framework bases on swoole extension development, we can use it easily!
+----------------------------------------------------------------------
| Licensed (https://opensource.org/licenses/MIT)
+----------------------------------------------------------------------
| Author: bingcool <bingcoolhuang@gmail.com || 2437667702@qq.com>
+----------------------------------------------------------------------
 */

namespace Swoolefy\Core;

use Swoole\Coroutine\Channel;

class GoWaitGroup {
    /**
     * @var int
     */
    private $count = 0;

    /**
     * @var Channel
     */
    private $chan;

    /**
     * @var array
     */
    private $result = [];

    /**
     * WaitGroup constructor
     */
    public function __construct() {$this->chan = new Channel;}

    /**
     * add
     */
    public function go(\Closure $go_func = null) {
        $this->count++;
        if($go_func instanceof \Closure) {go($go_func);
        }
    }

    /**
     * start
     */
    public function start() {
        $this->count++;
        return $this->count;
    }

    /**
     * done
     */
    public function done(string $key, $data = null) {if(!empty($data)) {$this->result[$key] = $data;
        }
        $this->chan->push(1);
    }

    /**
     * wait
     */
    public function wait() {while($this->count--) {$this->chan->pop();
        }
        $result = $this->result;
        $this->result = [];
        $this->count = 0;
        return $result;
    }

}

2、在 swoolefy 中调用

class GroupController extends BController {public function waitgroup() {
        // 创建一个 waitGroup 实例
        $wg = new \Swoolefy\Core\GoWaitGroup();
         
         // 第一种方式,直接 $wg->go()函数中执行 go 的协程函数
        $wg->go(function() use ($wg) {
            // 挂起协程
            $fp = stream_socket_client("tcp://www.baidu.com:80", $errno, $errstr, 30);
            // 协程返回的数据
            $wg->done('mysql', 'mysql');
        });

        $wg->go(function() use ($wg) {sleep(1);
            $wg->done('tengxun', 'weixin and qq');
        });

        // 挂起当前协程,等待所有任务完成后恢复
        //$result = $wg->wait();
        // 这里 $result 包含了 1 个任务执行结果
        //var_dump($result);
        
        // 第二种方式,添加 $wg->start(),启动协程,然后使用 swoole 的原生 go 执行协程函数
        $wg->start();
        go(function () use ($wg) {
            // 挂起协程
            sleep(1);
            $wg->done('taobao', 'ali baba');
        });
        
         // 第二种方式,添加 $wg->start(),启动协程,然后使用 swoole 的原生 go 执行协程函数
        $wg->start();
        go(function () use ($wg) {
            // 挂起协程
            sleep(1);
            $wg->done('baidu', 'baidu');
        });
        // 以上三个协程将会并发调用,wait()函数实现等待三个协程数据返回
        // 挂起当前协程,让出 cpu 控制权,cpu 可以做其他的事情,直到待所有任务完成后恢复
        $result = $wg->wait();
        // 这里 $result 包含了 2 个任务执行结果
        var_dump($result);
    }
}

至此一个最简单的并发调用就完成了,你可以愉快使用 gowaitGroup 的协程调用了

正文完
 0