imcloud分布式中间件分析三job节点实现

9次阅读

共计 1189 个字符,预计需要花费 3 分钟才能阅读完成。

github:http://github.com/brewlin/im-…

  • im-cloud 基于 swoole 原生协程构建分布式推送中间件
  • im-cloud 分布式中间件的安装部署
  • im-cloud <> goim 分布式中间件并发压测对比
  • im-cloud 分布式中间件分析 (一)- 通讯协议
  • im-cloud 分布式中间件分析 (二)-cloud 节点实现
  • im-cloud 分布式中间件分析 (三)-job 节点实现
  • im-cloud 分布式中间件分析 (四)-logic 节点实现

1. 概述

job 节点 作为消费端,消费 logic 生产的数据,然后通过 grpc 推送至 cloud 节点,cloud 点真正处理客户端数据,job 节点默认多进程消费启动 4 个 worker 进程,以及默认 10 个 grpc 连接池

  • 数据流程图

2.@Consumer 消费中心

默认启动 4 个 worker 进程消费 logic 请求,耗时处理投放至 task 进程处理,并转发至 cloud 节点

监听 worker 启动事件

需要在 config/queue.php ,config/event.php 注册相应的事件和相关配置

use App\Consumer\Consumer;
use Core\App;
use Core\Swoole\WorkerStartInterface;
use Swoole\Server as SwooleServer;

class WorkerStartListener implements WorkerStartInterface
{
    const INIT_LOGIC = 1;

    public function onWorkerStart(SwooleServer $server, int $workerId): void
    {if(App::isWorkerStatus()){
            // 启动的 n 个 worker 进程 分别作为消费者进程消费,每个进程会直接阻塞直到消费到数据
            consumer()->consume(new Consumer());
        }
    }

}

消费主流程

  • 1. 为每个消费数据请求建立一个协程,处理相关数据
  • 2. 将每个数据投递至 worker 进程进行真正的 grpc 与 cloud 推送请求
Co::create(function()use($data){if(empty(CloudClient::$serviceList)){Log::error("cancle task deliver discovery cloud node is empty");
        return;
    }
    Task::deliver(Job::class,"push",[CloudClient::$serviceList,$data]);
},false);
return Result::ACK;
  • 3. 通过以上做法能加快并发是消费速度,task 进程也进行协程处理,增加并行处理能力,如果 task 进程阻塞也会造成 task 任务投递阻塞,所以在 worker 进程也需要加一个协程处理
正文完
 0