共计 3820 个字符,预计需要花费 10 分钟才能阅读完成。
gRPC 系列(二) 异步服务应用
相干文章:gRPC 系列(一)装置和入门
异步的实现次要围绕的是 grpc 提供的队列:grpc::CompletionQueue
。
客户端代码
异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步 rpc 调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。
#include <iostream>
#include <grpcpp/completion_queue.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/grpcpp.h>
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Status;
using grpc::Channel;
using grpc::CompletionQueue;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using Simple::EchoRequest;
using Simple::EchoResponse;
int main()
{std::shared_ptr<Channel> chan = grpc::CreateChannel("localhost:12345",grpc::InsecureChannelCredentials());
std::unique_ptr<Simple::Server::Stub> stub(Simple::Server::NewStub(chan));
ClientContext context;
EchoRequest req;
req.set_msg("hello world!");
EchoResponse resp;
CompletionQueue cq;
// 实现 rpc 调用会将 tag 增加到 cq 队列中
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> rpc(stub->AsyncEcho(&context, req, &cq));
Status status;
// 第三个参数是一个上下文标签,用于帮咱们标识这个申请
// grpc 框架只会将其保存起来
rpc->Finish(&resp, &status, (void*)1);
void* got_tag;
bool ok = false;
// 从队列中获取,申请的标签以及状态
cq.Next(&got_tag, &ok);
if(ok && got_tag == (void*)1){
// check 一下后果
std::cout << resp.msg() << std::endl;}
return 0;
}
服务端代码
异步服务端不不便了解,能够参考:grpc 应用记录 (三) 简略异步服务实例
这里次要波及到的类包含 grpc::ServerCompletionQueue
、grpc::ServerAsyncResponseWriter
、grpc::ServerAsyncResponseWriter
、Simple::Server::AsyncService
。
次要的解决流程是,
- 注册一个申请,传入上下文内容,包含 context、req、resp 以及你本人定义的上下文数据对象(能够作为 tag)。
- 主循环生产队列,取出一个数据对象,调用解决逻辑。如果是还未解决,则进行 rpc 逻辑解决,而后用
grpc::ServerAsyncResponseWriter
异步写响应,并且调配一个新的数据对象(结构的时候会调用 Proceed 函数注册解决的申请);如果是以及解决好的申请,开释数据对象的空间。
能够这么说,cq 的作用就是寄存 CallData,而后主循环不断读取 CallData,而后依据其中的上下文信息,做出相应的解决。
RequestXXX()其实就是注册一个 rpc 申请,而后咱们会传入 CallData 的地址参数,是为了再承受到指定 rpc 之后写入数据对象到音讯队列中。
Finish()是异步写回响应,这里的传入 this 指针也是为了让其实现写回后讲 this 增加到音讯队列中。
代码如下:
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/support/async_unary_call.h>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerBuilder;
using Simple::EchoRequest;
using Simple::EchoResponse;
class ServerImpl final {
public:
~ServerImpl(){_server->Shutdown();
_cq->Shutdown();}
void Run(){std::string server_address("localhost:12345");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&_service);
_cq = builder.AddCompletionQueue();
_server = builder.BuildAndStart();
std::cout << "Serfer listening on" << server_address << std::endl;
// main loop
HandleRpcs();}
private:
class CallData {
public:
CallData(Simple::Server::AsyncService* service, ServerCompletionQueue* cq)
:_service(service), _cq(cq), _responder(&_ctx), _status(CREATE) {Proceed();
}
void Proceed() {if (_status == CREATE) {
_status = PROCESS;
// 注册申请解决
// RequestEcho 其实就三确定注册办法 Echo
// 而后传入解决申请的数据对象
// ctx、req、responder_、_cq、this
// this 对于队列来说是标签
_service->RequestEcho(&_ctx, &_req,&_responder,_cq,_cq,this);
} else if (_status == PROCESS) {
// 曾经开始解决一个申请了,生成一个新对象供下一个应用
new CallData(_service, _cq); // 会调用 proceed 注册申请
_resp.set_msg(_req.msg());
_status = FINISH;
_responder.Finish(_resp, Status::OK, this);
} else {GPR_ASSERT(_status == FINISH);
delete this;
}
}
private:
Simple::Server::AsyncService* _service;
ServerCompletionQueue* _cq;
ServerContext _ctx;
EchoRequest _req;
EchoResponse _resp;
ServerAsyncResponseWriter<EchoResponse> _responder;
enum CallStatus {CREATE, PROCESS, FINISH};
CallStatus _status;
};
void HandleRpcs() {new CallData(&_service, _cq.get());
void* tag;
bool ok;
while(true){GPR_ASSERT(_cq->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();}
}
std::unique_ptr<ServerCompletionQueue> _cq;
Simple::Server::AsyncService _service;
std::unique_ptr<grpc::Server> _server;
};
int main()
{
ServerImpl server;
server.Run();
return 0;
}
正文完