gRPC系列(二) 异步服务应用

相干文章:gRPC系列(一)装置和入门

异步的实现次要围绕的是grpc提供的队列:grpc::CompletionQueue

客户端代码

异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步rpc调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。

#include <iostream>#include <grpcpp/completion_queue.h>#include <grpcpp/security/credentials.h>#include <grpcpp/support/async_unary_call.h>#include <grpcpp/grpcpp.h>#include "../protos/simple/simple.grpc.pb.h"using grpc::Status;using grpc::Channel;using grpc::CompletionQueue;using grpc::ClientContext;using grpc::ClientAsyncResponseReader;using Simple::EchoRequest;using Simple::EchoResponse;int main(){    std::shared_ptr<Channel> chan = grpc::CreateChannel("localhost:12345",grpc::InsecureChannelCredentials());    std::unique_ptr<Simple::Server::Stub> stub(Simple::Server::NewStub(chan));    ClientContext context;    EchoRequest req;    req.set_msg("hello world!");    EchoResponse resp;    CompletionQueue cq;    // 实现rpc调用会将tag增加到cq队列中    std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> rpc(stub->AsyncEcho(&context, req, &cq));    Status status;    // 第三个参数是一个上下文标签,用于帮咱们标识这个申请    // grpc框架只会将其保存起来    rpc->Finish(&resp, &status, (void*)1);    void* got_tag;    bool ok = false;    // 从队列中获取,申请的标签以及状态    cq.Next(&got_tag, &ok);    if(ok && got_tag == (void*)1){        // check一下后果        std::cout << resp.msg() << std::endl;    }    return 0;}

服务端代码

异步服务端不不便了解,能够参考:grpc应用记录(三)简略异步服务实例
这里次要波及到的类包含grpc::ServerCompletionQueuegrpc::ServerAsyncResponseWritergrpc::ServerAsyncResponseWriterSimple::Server::AsyncService
次要的解决流程是,

  1. 注册一个申请,传入上下文内容,包含context、req、resp以及你本人定义的上下文数据对象(能够作为tag)。
  2. 主循环生产队列,取出一个数据对象,调用解决逻辑。如果是还未解决,则进行rpc逻辑解决,而后用grpc::ServerAsyncResponseWriter异步写响应,并且调配一个新的数据对象(结构的时候会调用Proceed函数注册解决的申请);如果是以及解决好的申请,开释数据对象的空间。
    能够这么说,cq的作用就是寄存CallData,而后主循环不断读取CallData,而后依据其中的上下文信息,做出相应的解决。
    RequestXXX()其实就是注册一个rpc申请,而后咱们会传入CallData的地址参数,是为了再承受到指定rpc之后写入数据对象到音讯队列中。
    Finish()是异步写回响应,这里的传入this指针也是为了让其实现写回后讲this增加到音讯队列中。

代码如下:

#include <grpcpp/security/server_credentials.h>#include <grpcpp/support/async_unary_call.h>#include <iostream>#include <memory>#include <string>#include <thread>#include <grpc/support/log.h>#include <grpcpp/grpcpp.h>#include "../protos/simple/simple.grpc.pb.h"using grpc::Server;using grpc::ServerAsyncResponseWriter;using grpc::ServerBuilder;using grpc::ServerCompletionQueue;using grpc::ServerContext;using grpc::Status;using grpc::ServerBuilder;using Simple::EchoRequest;using Simple::EchoResponse;class ServerImpl final {    public:        ~ServerImpl(){            _server->Shutdown();            _cq->Shutdown();        }        void Run(){            std::string server_address("localhost:12345");            ServerBuilder builder;            builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());            builder.RegisterService(&_service);            _cq = builder.AddCompletionQueue();            _server = builder.BuildAndStart();            std::cout << "Serfer listening on" << server_address << std::endl;            // main loop            HandleRpcs();        }    private:        class CallData {            public:                CallData(Simple::Server::AsyncService* service, ServerCompletionQueue* cq)                    :_service(service), _cq(cq), _responder(&_ctx), _status(CREATE) {                        Proceed();                    }                void Proceed() {                    if (_status == CREATE) {                        _status = PROCESS;                        // 注册申请解决                        // RequestEcho其实就三确定注册办法Echo                        // 而后传入解决申请的数据对象                        // ctx、req、responder_、_cq、this                        // this对于队列来说是标签                        _service->RequestEcho(&_ctx, &_req,&_responder,_cq,_cq,this);                    } else if (_status == PROCESS) {                        // 曾经开始解决一个申请了,生成一个新对象供下一个应用                        new CallData(_service, _cq); // 会调用proceed注册申请                        _resp.set_msg(_req.msg());                        _status = FINISH;                        _responder.Finish(_resp, Status::OK, this);                    } else {                        GPR_ASSERT(_status == FINISH);                        delete this;                    }                }            private:                Simple::Server::AsyncService* _service;                ServerCompletionQueue* _cq;                ServerContext _ctx;                EchoRequest _req;                EchoResponse _resp;                ServerAsyncResponseWriter<EchoResponse> _responder;                enum CallStatus { CREATE, PROCESS, FINISH};                CallStatus _status;        };        void HandleRpcs() {            new CallData(&_service, _cq.get());            void* tag;            bool ok;            while(true){                GPR_ASSERT(_cq->Next(&tag, &ok));                GPR_ASSERT(ok);                static_cast<CallData*>(tag)->Proceed();            }        }        std::unique_ptr<ServerCompletionQueue> _cq;        Simple::Server::AsyncService _service;        std::unique_ptr<grpc::Server> _server;};int main(){    ServerImpl server;    server.Run();    return 0;}