gRPC系列(二) 异步服务应用
相干文章:gRPC系列(一)装置和入门
异步的实现次要围绕的是grpc提供的队列:grpc::CompletionQueue
。
客户端代码
异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步rpc调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。
#include <iostream>#include <grpcpp/completion_queue.h>#include <grpcpp/security/credentials.h>#include <grpcpp/support/async_unary_call.h>#include <grpcpp/grpcpp.h>#include "../protos/simple/simple.grpc.pb.h"using grpc::Status;using grpc::Channel;using grpc::CompletionQueue;using grpc::ClientContext;using grpc::ClientAsyncResponseReader;using Simple::EchoRequest;using Simple::EchoResponse;int main(){ std::shared_ptr<Channel> chan = grpc::CreateChannel("localhost:12345",grpc::InsecureChannelCredentials()); std::unique_ptr<Simple::Server::Stub> stub(Simple::Server::NewStub(chan)); ClientContext context; EchoRequest req; req.set_msg("hello world!"); EchoResponse resp; CompletionQueue cq; // 实现rpc调用会将tag增加到cq队列中 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> rpc(stub->AsyncEcho(&context, req, &cq)); Status status; // 第三个参数是一个上下文标签,用于帮咱们标识这个申请 // grpc框架只会将其保存起来 rpc->Finish(&resp, &status, (void*)1); void* got_tag; bool ok = false; // 从队列中获取,申请的标签以及状态 cq.Next(&got_tag, &ok); if(ok && got_tag == (void*)1){ // check一下后果 std::cout << resp.msg() << std::endl; } return 0;}
服务端代码
异步服务端不不便了解,能够参考:grpc应用记录(三)简略异步服务实例
这里次要波及到的类包含grpc::ServerCompletionQueue
、grpc::ServerAsyncResponseWriter
、grpc::ServerAsyncResponseWriter
、Simple::Server::AsyncService
。
次要的解决流程是,
- 注册一个申请,传入上下文内容,包含context、req、resp以及你本人定义的上下文数据对象(能够作为tag)。
- 主循环生产队列,取出一个数据对象,调用解决逻辑。如果是还未解决,则进行rpc逻辑解决,而后用
grpc::ServerAsyncResponseWriter
异步写响应,并且调配一个新的数据对象(结构的时候会调用Proceed函数注册解决的申请);如果是以及解决好的申请,开释数据对象的空间。
能够这么说,cq的作用就是寄存CallData,而后主循环不断读取CallData,而后依据其中的上下文信息,做出相应的解决。
RequestXXX()其实就是注册一个rpc申请,而后咱们会传入CallData的地址参数,是为了再承受到指定rpc之后写入数据对象到音讯队列中。
Finish()是异步写回响应,这里的传入this指针也是为了让其实现写回后讲this增加到音讯队列中。
代码如下:
#include <grpcpp/security/server_credentials.h>#include <grpcpp/support/async_unary_call.h>#include <iostream>#include <memory>#include <string>#include <thread>#include <grpc/support/log.h>#include <grpcpp/grpcpp.h>#include "../protos/simple/simple.grpc.pb.h"using grpc::Server;using grpc::ServerAsyncResponseWriter;using grpc::ServerBuilder;using grpc::ServerCompletionQueue;using grpc::ServerContext;using grpc::Status;using grpc::ServerBuilder;using Simple::EchoRequest;using Simple::EchoResponse;class ServerImpl final { public: ~ServerImpl(){ _server->Shutdown(); _cq->Shutdown(); } void Run(){ std::string server_address("localhost:12345"); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&_service); _cq = builder.AddCompletionQueue(); _server = builder.BuildAndStart(); std::cout << "Serfer listening on" << server_address << std::endl; // main loop HandleRpcs(); } private: class CallData { public: CallData(Simple::Server::AsyncService* service, ServerCompletionQueue* cq) :_service(service), _cq(cq), _responder(&_ctx), _status(CREATE) { Proceed(); } void Proceed() { if (_status == CREATE) { _status = PROCESS; // 注册申请解决 // RequestEcho其实就三确定注册办法Echo // 而后传入解决申请的数据对象 // ctx、req、responder_、_cq、this // this对于队列来说是标签 _service->RequestEcho(&_ctx, &_req,&_responder,_cq,_cq,this); } else if (_status == PROCESS) { // 曾经开始解决一个申请了,生成一个新对象供下一个应用 new CallData(_service, _cq); // 会调用proceed注册申请 _resp.set_msg(_req.msg()); _status = FINISH; _responder.Finish(_resp, Status::OK, this); } else { GPR_ASSERT(_status == FINISH); delete this; } } private: Simple::Server::AsyncService* _service; ServerCompletionQueue* _cq; ServerContext _ctx; EchoRequest _req; EchoResponse _resp; ServerAsyncResponseWriter<EchoResponse> _responder; enum CallStatus { CREATE, PROCESS, FINISH}; CallStatus _status; }; void HandleRpcs() { new CallData(&_service, _cq.get()); void* tag; bool ok; while(true){ GPR_ASSERT(_cq->Next(&tag, &ok)); GPR_ASSERT(ok); static_cast<CallData*>(tag)->Proceed(); } } std::unique_ptr<ServerCompletionQueue> _cq; Simple::Server::AsyncService _service; std::unique_ptr<grpc::Server> _server;};int main(){ ServerImpl server; server.Run(); return 0;}