imcloud分布式中间件分析二cloud节点实现

56次阅读

共计 5950 个字符,预计需要花费 15 分钟才能阅读完成。

http://github.com/brewlin/im-…

  • im-cloud 基于 swoole 原生协程构建分布式推送中间件
  • im-cloud 分布式中间件的安装部署
  • im-cloud <> goim 分布式中间件并发压测对比
  • im-cloud 分布式中间件分析(一)- 通讯协议
  • im-cloud 分布式中间件分析(二)-cloud 节点实现
  • im-cloud 分布式中间件分析(三)-job 节点实现
  • im-cloud 分布式中间件分析(四)-logic 节点实现

1. 概述

cloud 节点对外提供websockettcp client 注册。并维护每个连接对应的客户端信息。作为 Grpc server,接受 grpc 推送数据,并推送到 client 端

  • 数据流程图

2.@Grpc server

grpc server 基于 swoole 的 http2 协议,然后通过 config/router.php 配置项注册路由既可以使用如 rest 模式下的交互流程

grpc 路由注册

配置文件 config/router.php

<?php
//Grpc server router
HttpRouter::post('/im.cloud.Cloud/Ping', '/Grpc/Cloud/ping');
HttpRouter::post('/im.cloud.Cloud/Close', '/Grpc/Cloud/close');
HttpRouter::post('/im.cloud.Cloud/PushMsg', '/Grpc/Cloud/pushMsg');
HttpRouter::post('/im.cloud.Cloud/Broadcast', '/Grpc/Cloud/broadcast');
HttpRouter::post('/im.cloud.Cloud/Rooms', '/Grpc/Cloud/rooms');

和 rest 路由一样只需要注册路由到对应的方法即可,当使用 grpc-client 进行请求时,就能分发到最远的控制器去,

grpc 参数解析

当接收到请求后,可以根据协程上下文获取当前连接的请求参数,grpc传输的协议是二进制,所以不能通过 get,post 方法直接获得对应的参数,需要采用 grpc 提供的方法进行解包

use Grpc\Parser;
use Core\Context\Context;
public function pushMsg()
{$rawbody = Context::get()->getRequest()->getRawBody();
    /** @var PushMsgReq $pushMsgReq */
    $pushMsgReq = Parser::deserializeMessage([PushMsgReq::class,null],
            $rawbody
    );
}
  • 获取请求参数可以通过协程上下文获取

    • Context::get()->getRequest()->getRawBody();
    • request()->getRawBody();
  • Grpc\Parser 方法使用的是 swoolegrpc-client 组件包提供的方法,使用 swoole 对原生 grpc 进行了封装

3.@websocket server

基于 websocket 协议注册到 cloud 节点,cloud 进行认证,通过 grpc 将注册信息传递到 logic 统一管理,认证成功后 cloud 节点将保存改连接的基础信息

握手阶段

命名空间:App/Websocket/HandshakeListener.class

该事件为 swoole 监听事件,所以需要注册监听回调函数, 配置文件为config/event.php

use \Core\Swoole\SwooleEvent;
use \App\Websocket\HandshakeListener;

return [
    //websocket 握手事件
    SwooleEvent::HANDSHAKE        => new HandshakeListener(),];

接下来是握手流程

    /**
     * token check '{"mid":123,"room_id":"live://1000","platform":"web","accepts":[1000,1001,1002]}'
     * @param Request $request
     * @param Response $response
     * @return bool
     */
    public function onHandshake(Request $request, Response $response): bool
    {$httpRequest = HttpRequest::new($request);
        // 握手失败
        if($httpRequest->getUriPath() != self::upgradeUrl){$response->end();
            return false;
        }
        // websocket 握手连接算法验证
        $secWebSocketKey = $request->header['sec-websocket-key'];
        $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
        if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {$response->end();
            return false;
        }
        $key = base64_encode(sha1($request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
            true
        ));

        $headers = [
            'Upgrade' => 'websocket',
            'Connection' => 'Upgrade',
            'Sec-WebSocket-Accept' => $key,
            'Sec-WebSocket-Version' => '13',
        ];

        // WebSocket connection to 'ws://127.0.0.1:9502/'
        // failed: Error during WebSocket handshake:
        // Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket
        if (isset($request->header['sec-websocket-protocol'])) {$headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
        }

        foreach ($headers as $key => $val) {$response->header($key, $val);
        }
        $response->status(101);
        $response->end();
        return true;
    }

该方法在握手阶段对于 http 请求进行校验,如果路径不为 ‘/sub’ 则认证失败关闭连接,成功后校验 websocekt 协议并升级为 websocket,

主事件处理

同样需要注册 websocket 的 onmessage 事件 配置文件:config/envent.php

@step1 解包

使用 App\Packet\Packet::class 进行解包,
通讯协议为二进制传输,会有单独一章分析 im-cloud 通讯协议的设计

@step2 处理分发(注册)

根据协议,如果为注册请求,则进行注册流程,心跳则进行心跳流程
im-cloud 暂时不支持双向推送,也就是该连接不支持接受推送消息,推送请走 logic 节点 push

@step3 注册

  • 1. 进行 auth 参数校验
  • 2. 通过 grpc 注册到 logic 节点
$server = LogicClient::getLogicClient();
if(empty($server))
    throw  new \Exception("not find any logic node");
$connectReq = new ConnectReq();
/** @var \Im\Logic\LogicClient $rpcClient */
$rpcClient  = null;
$serverId = env("APP_HOST","127.0.0.1").":".env("GRPC_PORT",9500);
$connectReq->setServer($serverId);
$connectReq->setCookie("");
$connectReq->setToken(json_encode($data));
/** @var ConnectReply $rpy */
$rpy = GrpcLogicClient::Connect($server,$connectReq)[0];
  • 3. 注册成功后将当前用户信息 写入 bucket 进程,独立维护所有的用户信息和连接

[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);

4.@tcp server

tcp 处理流程和 websocket 大致相似,走同样的流程, 只是监听对应的 api 有些区别

5. 自定义进程

cloud 节点 默认启动了两个自定义进程伴随 swoole 启动而启动

discoveryProcess 注册发现进程

该进程 在启动时注册到 注册中心(默认 consul,可以扩展其他的注册中心), 然后进行事件轮训,获取健康状态的实例节点

  • 配置文件

    • config/process.php 注册进程到进程管理器
    • config/consul.php 配置发现中心的配置
  • 获取到实例节点后 更新 swoole 所有的 worker 进程里的实例节点信息使用 sendMessage() 进行进程间通信
/**
* 自定义子进程 执行入口
* @param Process $process
*/
public function run(Process $process)
{provider()->select()->registerService();
    $config = config("discovery");
    $discovery = $config["consul"]["discovery"]["name"];
    while (true){$services = provider()->select()->getServiceList($discovery);
        if(empty($services)){CLog::error("not find any instance node:$discovery");
            goto SLEEP;
        }
        for($i = 0; $i < (int)env("WORKER_NUM",4);$i++)
        {
            // 将可以用的服务同步到所有的 worker 进程
            Cloud::server()->getSwooleServer()->sendMessage($services,$i);
        }
SLEEP:
        sleep(10);
    }
}

bucketProcess 用户缓存池

配置文件 config/process.php 注册该进程

该进程两个任务:

  • 1 注册成功后缓存用户信息,管理用户连接
//step 1
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
//step 2
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
使用 deliver 进程间通信,发送到 bucketProcess 进程处理
  • 2. 作为主要的推送进程

当 cloud 节点 grpcserver 接收到推送请求,则创建一个协程写入 bucketprocess 进程,当前进程消费管道里的数据,每个数据创建一个协程,处理推送问题

  • 3. 使用自定义进程管理用户信息的选择
出版采用的 redis 缓存用户信息,在实际压测的时候发现即使是 redis 缓存还是会影响并发处理。导致慢了 4 - 5 倍,而采用自定义进程处理的好处有如下两点,多进程下对数据不需要加锁。针对每个请求单独创建一个协程反而效率要高些

5. 监听事件,生命周期管理

swoole 相关生命周期执行管理都依赖监听事件,例如 进程启动 请求事件 握手连接 关闭连接 等等。。

/**
 * set event to base swoole
 * 给 swoole 设置基础的监听事件,*/
use \Core\Swoole\SwooleEvent;
use \App\Event\PipeMessageListener;
use \App\Event\WorkerStopListener;
use \App\Event\ShutdownListener;
use \App\Websocket\MessageListener;
use \App\Websocket\HandshakeListener;
use App\Tcp\ReceiveListener;
use App\Event\OnCloseListener;
use App\Event\WorkerStartListener;

return [
    // 监听 onpipmessage 事件
    SwooleEvent::PIPE_MESSAGE => new PipeMessageListener(),
    
    // 监听进程启动事件
    SwooleEvent::WORKER_START => new WorkerStartListener(),
    
    // 监听进程关闭事件
    SwooleEvent::WORKER_STOP  => new WorkerStopListener(),
    SwooleEvent::SHUTDOWN     => new ShutdownListener(),

    // 监听 tcp 事件
    SwooleEvent::RECEIVE      => new ReceiveListener(),

    // 监听 websocket 事件
    SwooleEvent::MESSAGE      => new MessageListener(),
    //websocket 握手事件
    SwooleEvent::HANDSHAKE    => new HandshakeListener(),

    //server 监听关闭连接事件然后 grpc 通知 logic 销毁连接信息
    SwooleEvent::CLOSE        => new OnCloseListener(),];

正文完
 0