乐趣区

imcloud分布式中间件分析四logic节点实现

github:http://github.com/brewlin/im-…

  • im-cloud 基于 swoole 原生协程构建分布式推送中间件
  • im-cloud 分布式中间件的安装部署
  • im-cloud <> goim 分布式中间件并发压测对比
  • im-cloud 分布式中间件分析(一)- 通讯协议
  • im-cloud 分布式中间件分析(二)-cloud 节点实现
  • im-cloud 分布式中间件分析(三)-job 节点实现
  • im-cloud 分布式中间件分析(四)-logic 节点实现

1. 概述

logic 节点 作为生产者和 client 端,作为业务节点,提供 push 推送 resetapi 接口,可以扩容多个节点做 nginx 负载均衡

2.@Producer

默认启动 10 个消息队列连接池,在 task 进程为每个人物创建协程异步生产任务

异步 task 任务

直接调用组件 task 接口进行投递到 task 进程执行,task 投递为非阻塞操作,执行完毕会直接返回,大大的提升了 worker 处理并发请求的能力,唯一的影响是,如果多个 task 进程的消费能力更不上 worker 的投递速度也会影响 worker 的处理能力,所以需要做取舍

  • 具体的消息队列生成在 task 进程中执行
  • task 进程启用了协程模式,投递的每个任务都默认创建一个协程
use Task\Task;
/**
* @var LogicPush
*/
Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
  • 相关异步任务 存放在命名空间 App\Task

相关优化

容器化

单个请求流程执行的生命周期会调用生成多个对象,多达 10 多个。并发大的情况下 GC 几乎会首先挂掉,而且会耗时等待,所以 new 对象也有优化的空间

项目在初始化也就是主进程启动期间就扫描相关代码,有注解的就进行收集,然后实例化到容器 container 中,以后再多次使用的时候直接复用代码,而无需多次 new 对象,大大节省空间和时间,如下图为创建一个协程去执行任务,相关的对象都从容器中获取

Co::create(function ()use($op,$mids,$msg){
    /** @var RedisDao $servers */
    $servers = \container()->get(RedisDao::class)->getKeysByMids($mids);
    $keys = [];
    foreach($servers as $key => $server){$keys[$server][] = $key;}
    foreach($keys as $server => $key){
        // 丢到队列里去操做,让 job 去处理
        \container()->get(QueueDao::class)->pushMsg($op,$server,$key,$msg);
    }

},true);
// 第二个参数为 true 表示使用 Context::waitGroup() 等待任务执行完成

如上图所示可以调用组件提供的多个方法获取容器对象

  • container()->get(class)
  • bean(class)
  • 两种都可以获取容器对象,

提高并发性能

即使将主要的耗时任务放到 task 进程中执行,worker 进程中依然会有少量的等待时间,现在采取的方式,是请求到来时获取数据后,直接回复结束当前连接,然后在继续执行任务,这样就不用等到投递 task 任务后再结束当前连接,大大提高并发能力,虽然可能耗时性能没有发生太大的改变,但是并发能力大大的提升。如下所示:

    /**
     * @return \Core\Http\Response\Response|static
     */
    public function mids()
    {Context::get()->getResponse()->end();
        $post  = Context::get()->getRequest()->input();
        if(empty($post["operation"]) || empty($post["mids"]) ||empty($post["msg"])){return $this->error("缺少参数");
        }
        $arg = ["op" => $post["operation"],
            "mids" => is_array($post["mids"])?$post["mids"]:[$post["mids"]],
            "msg" => $post["msg"]
        ];
        Log::debug("push mids post data:".json_encode($arg));
        /**
         * @var LogicPush
         */
        Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
    }
  • 如上图直接使用 Context::get()->getResponse()->end(); 通过协程上下文获取 reponse 对象直接结束当前连接,然后在继续执行当前任务,并释放内存
退出移动版