乐趣区

关于grpc:gRPC系列二-异步服务使用

gRPC 系列(二) 异步服务应用

相干文章:gRPC 系列(一)装置和入门

异步的实现次要围绕的是 grpc 提供的队列:grpc::CompletionQueue

客户端代码

异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步 rpc 调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。

#include <iostream>
#include <grpcpp/completion_queue.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/grpcpp.h>
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Status;
using grpc::Channel;
using grpc::CompletionQueue;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using Simple::EchoRequest;
using Simple::EchoResponse;

int main()
{std::shared_ptr<Channel> chan = grpc::CreateChannel("localhost:12345",grpc::InsecureChannelCredentials());
    std::unique_ptr<Simple::Server::Stub> stub(Simple::Server::NewStub(chan));

    ClientContext context;
    EchoRequest req;
    req.set_msg("hello world!");
    EchoResponse resp;
    CompletionQueue cq;
    // 实现 rpc 调用会将 tag 增加到 cq 队列中
    std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> rpc(stub->AsyncEcho(&context, req, &cq));
    Status status;
    // 第三个参数是一个上下文标签,用于帮咱们标识这个申请
    // grpc 框架只会将其保存起来
    rpc->Finish(&resp, &status, (void*)1);
    void* got_tag;
    bool ok = false;
    // 从队列中获取,申请的标签以及状态
    cq.Next(&got_tag, &ok);
    if(ok && got_tag == (void*)1){
        // check 一下后果
        std::cout << resp.msg() << std::endl;}
    return 0;
}

服务端代码

异步服务端不不便了解,能够参考:grpc 应用记录 (三) 简略异步服务实例
这里次要波及到的类包含 grpc::ServerCompletionQueuegrpc::ServerAsyncResponseWritergrpc::ServerAsyncResponseWriterSimple::Server::AsyncService
次要的解决流程是,

  1. 注册一个申请,传入上下文内容,包含 context、req、resp 以及你本人定义的上下文数据对象(能够作为 tag)。
  2. 主循环生产队列,取出一个数据对象,调用解决逻辑。如果是还未解决,则进行 rpc 逻辑解决,而后用 grpc::ServerAsyncResponseWriter 异步写响应,并且调配一个新的数据对象(结构的时候会调用 Proceed 函数注册解决的申请);如果是以及解决好的申请,开释数据对象的空间。
    能够这么说,cq 的作用就是寄存 CallData,而后主循环不断读取 CallData,而后依据其中的上下文信息,做出相应的解决。
    RequestXXX()其实就是注册一个 rpc 申请,而后咱们会传入 CallData 的地址参数,是为了再承受到指定 rpc 之后写入数据对象到音讯队列中。
    Finish()是异步写回响应,这里的传入 this 指针也是为了让其实现写回后讲 this 增加到音讯队列中。

代码如下:

#include <grpcpp/security/server_credentials.h>
#include <grpcpp/support/async_unary_call.h>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerBuilder;
using Simple::EchoRequest;
using Simple::EchoResponse;

class ServerImpl final {
    public:
        ~ServerImpl(){_server->Shutdown();
            _cq->Shutdown();}
        void Run(){std::string server_address("localhost:12345");
            ServerBuilder builder;
            builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
            builder.RegisterService(&_service);
            _cq = builder.AddCompletionQueue();
            _server = builder.BuildAndStart();
            std::cout << "Serfer listening on" << server_address << std::endl;
            // main loop
            HandleRpcs();}
    private:
        class CallData {
            public:
                CallData(Simple::Server::AsyncService* service, ServerCompletionQueue* cq)
                    :_service(service), _cq(cq), _responder(&_ctx), _status(CREATE) {Proceed();
                    }
                void Proceed() {if (_status == CREATE) {
                        _status = PROCESS;
                        // 注册申请解决
                        // RequestEcho 其实就三确定注册办法 Echo
                        // 而后传入解决申请的数据对象
                        // ctx、req、responder_、_cq、this
                        // this 对于队列来说是标签
                        _service->RequestEcho(&_ctx, &_req,&_responder,_cq,_cq,this);
                    } else if (_status == PROCESS) {
                        // 曾经开始解决一个申请了,生成一个新对象供下一个应用
                        new CallData(_service, _cq); // 会调用 proceed 注册申请
                        _resp.set_msg(_req.msg());
                        _status = FINISH;
                        _responder.Finish(_resp, Status::OK, this);
                    } else {GPR_ASSERT(_status == FINISH);
                        delete this;
                    }
                }
            private:
                Simple::Server::AsyncService* _service;
                ServerCompletionQueue* _cq;
                ServerContext _ctx;
                EchoRequest _req;
                EchoResponse _resp;
                ServerAsyncResponseWriter<EchoResponse> _responder;
                enum CallStatus {CREATE, PROCESS, FINISH};
                CallStatus _status;
        };

        void HandleRpcs() {new CallData(&_service, _cq.get());
            void* tag;
            bool ok;
            while(true){GPR_ASSERT(_cq->Next(&tag, &ok));
                GPR_ASSERT(ok);
                static_cast<CallData*>(tag)->Proceed();}
        }
        std::unique_ptr<ServerCompletionQueue> _cq;
        Simple::Server::AsyncService _service;
        std::unique_ptr<grpc::Server> _server;
};

int main()
{
    ServerImpl server;
    server.Run();
    return 0;
}

退出移动版