github:http://github.com/brewlin/im-…
- im-cloud 基于 swoole 原生协程构建分布式推送中间件
- im-cloud 分布式中间件的安装部署
- im-cloud <> goim 分布式中间件并发压测对比
- im-cloud 分布式中间件分析(一)- 通讯协议
- im-cloud 分布式中间件分析(二)-cloud 节点实现
- im-cloud 分布式中间件分析(三)-job 节点实现
im-cloud 分布式中间件分析(四)-logic 节点实现
1. 概述
logic 节点 作为生产者和 client 端,作为业务节点,提供 push 推送 resetapi 接口,可以扩容多个节点做 nginx 负载均衡
2.@Producer
默认启动 10 个消息队列连接池,在 task 进程为每个人物创建协程异步生产任务
异步 task 任务
直接调用组件 task 接口进行投递到 task 进程执行,task 投递为非阻塞操作,执行完毕会直接返回,大大的提升了 worker 处理并发请求的能力,唯一的影响是,如果多个 task 进程的消费能力更不上 worker 的投递速度也会影响 worker 的处理能力,所以需要做取舍
- 具体的消息队列生成在 task 进程中执行
- task 进程启用了协程模式,投递的每个任务都默认创建一个协程
use Task\Task;
/**
* @var LogicPush
*/
Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
- 相关异步任务 存放在命名空间
App\Task
下
相关优化
容器化
单个请求流程执行的生命周期会调用生成多个对象,多达 10 多个。并发大的情况下 GC 几乎会首先挂掉,而且会耗时等待,所以 new 对象也有优化的空间
项目在初始化也就是主进程启动期间就扫描相关代码,有注解的就进行收集,然后实例化到容器 container 中,以后再多次使用的时候直接复用代码,而无需多次 new 对象,大大节省空间和时间,如下图为创建一个协程去执行任务,相关的对象都从容器中获取
Co::create(function ()use($op,$mids,$msg){
/** @var RedisDao $servers */
$servers = \container()->get(RedisDao::class)->getKeysByMids($mids);
$keys = [];
foreach($servers as $key => $server){$keys[$server][] = $key;}
foreach($keys as $server => $key){
// 丢到队列里去操做,让 job 去处理
\container()->get(QueueDao::class)->pushMsg($op,$server,$key,$msg);
}
},true);
// 第二个参数为 true 表示使用 Context::waitGroup() 等待任务执行完成
如上图所示可以调用组件提供的多个方法获取容器对象
container()->get(class)
bean(class)
- 两种都可以获取容器对象,
提高并发性能
即使将主要的耗时任务放到 task 进程中执行,worker 进程中依然会有少量的等待时间,现在采取的方式,是请求到来时获取数据后,直接回复结束当前连接,然后在继续执行任务,这样就不用等到投递 task 任务后再结束当前连接,大大提高并发能力,虽然可能耗时性能没有发生太大的改变,但是并发能力大大的提升。如下所示:
/**
* @return \Core\Http\Response\Response|static
*/
public function mids()
{Context::get()->getResponse()->end();
$post = Context::get()->getRequest()->input();
if(empty($post["operation"]) || empty($post["mids"]) ||empty($post["msg"])){return $this->error("缺少参数");
}
$arg = ["op" => $post["operation"],
"mids" => is_array($post["mids"])?$post["mids"]:[$post["mids"]],
"msg" => $post["msg"]
];
Log::debug("push mids post data:".json_encode($arg));
/**
* @var LogicPush
*/
Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
}
- 如上图直接使用
Context::get()->getResponse()->end();
通过协程上下文获取 reponse 对象直接结束当前连接,然后在继续执行当前任务,并释放内存