大家好,我是dandyhuang,brpc在c艹届还是比拟牛逼的rpc框架,本次带来brpc的server端的源码剖析。剖析源码前,大家先搭建好环境,有利于代码调试和了解。依照brpc框架中example的echo_c++为例子,并且将protobuf中,编译出的两头文件echo.pb.cc和.h保留,有利于咱们更好的代码了解。
server端应用
namespace example {class EchoServiceImpl : public EchoService {public: EchoServiceImpl() {}; virtual ~EchoServiceImpl() {}; virtual void Echo(google::protobuf::RpcController* cntl_base, const EchoRequest* request, EchoResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); response->set_message(request->message()); if (FLAGS_echo_attachment) { cntl->response_attachment().append(cntl->request_attachment()); } }};int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); brpc::Server server; // 继承proto文件 example::EchoServiceImpl echo_service_impl; if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { LOG(ERROR) << "Fail to add service"; return -1; } butil::EndPoint point; point = butil::EndPoint(butil::IP_ANY, FLAGS_port); // Start the server. brpc::ServerOptions options; options.idle_timeout_sec = FLAGS_idle_timeout_s; if (server.Start(point, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; } // Wait until Ctrl-C is pressed, then Stop() and Join() the server. server.RunUntilAskedToQuit(); return 0;}
总体代码还是比拟简洁,如果可能依据IDL主动代码生成那样就更完满了。话不多说,间接开撸源码
brpc::Server::AddService初始化各种数据
int Server::AddServiceInternal(google::protobuf::Service* service, bool is_builtin_service, const ServiceOptions& svc_opt) { // 如果idl中imp没有定义方法,那么校验失败 const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor(); if (sd->method_count() == 0) { LOG(ERROR) << "service=" << sd->full_name() << " does not have any method."; return -1; } // 初始化并注册:NamingService,LoadBalancer,CompressHandler,protocols等 // 后续的收发包校验函数都会应用 if (InitializeOnce() != 0) { LOG(ERROR) << "Fail to initialize Server[" << version() << ']'; return -1; } // InitializeOnce初始化失败则退出 if (status() != READY) { LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server[" << version() << "] which is " << status_str(status()); return -1; } // defined `option (idl_support) = true' or not. const bool is_idl_support = sd->file()->options().GetExtension(idl_support); Tabbed* tabbed = dynamic_cast<Tabbed*>(service); // 初始化定义的service for (int i = 0; i < sd->method_count(); ++i) { const google::protobuf::MethodDescriptor* md = sd->method(i); MethodProperty mp; mp.is_builtin_service = is_builtin_service; mp.own_method_status = true; mp.params.is_tabbed = !!tabbed; mp.params.allow_default_url = svc_opt.allow_default_url; mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb; mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64; mp.service = service; mp.method = md; mp.status = new MethodStatus; _method_map[md->full_name()] = mp; if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) { MethodProperty mp2 = mp; mp2.own_method_status = false; // have to map service_name + method_name as well because ubrpc // does not send the namespace before service_name. std::string full_name_wo_ns; full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size()); full_name_wo_ns.append(sd->name()); full_name_wo_ns.push_back('.'); full_name_wo_ns.append(md->name()); if (_method_map.seek(full_name_wo_ns) == NULL) { _method_map[full_name_wo_ns] = mp2; } else { LOG(ERROR) << '`' << full_name_wo_ns << "' already exists"; RemoveMethodsOf(service); return -1; } } } const ServiceProperty ss = { is_builtin_service, svc_opt.ownership, service, NULL }; _fullname_service_map[sd->full_name()] = ss; _service_map[sd->name()] = ss; if (is_builtin_service) { ++_builtin_service_count; } else { if (_first_service == NULL) { _first_service = service; } } butil::StringPiece restful_mappings = svc_opt.restful_mappings; restful_mappings.trim_spaces(); // restful_mappings解析咱们临时就不剖析了,次要就是匹配办法 if (!restful_mappings.empty()) { // Parse the mappings. ··· ··· ··· } // AddBuiltinService 实例化。idl生成的服务,不属于tabbed if (tabbed) { if (_tab_info_list == NULL) { _tab_info_list = new TabInfoList; } const size_t last_size = _tab_info_list->size(); tabbed->GetTabInfo(_tab_info_list); const size_t cur_size = _tab_info_list->size(); for (size_t i = last_size; i != cur_size; ++i) { const TabInfo& info = (*_tab_info_list)[i]; if (!info.valid()) { LOG(ERROR) << "Invalid TabInfo: path=" << info.path << " tab_name=" << info.tab_name; _tab_info_list->resize(last_size); RemoveService(service); return -1; } } } return 0;}
AddServiceInternal中,能够用于echo.proto生成的服务和BuiltinService的服务校验,初始化等。如办法是否有没定义,存储映射的关系:_method_map、_service_map、_fullname_service_map等
StartInternal外部其余服务也调用该函数
int Server::StartInternal(const butil::ip_t& ip, const PortRange& port_range, const ServerOptions *opt) { std::unique_ptr<Server, RevertServerStatus> revert_server(this); if (_failed_to_set_max_concurrency_of_method) { _failed_to_set_max_concurrency_of_method = false; LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, " "fix it before starting server"; return -1; } // addserver已初始化 if (InitializeOnce() != 0) { LOG(ERROR) << "Fail to initialize Server[" << version() << ']'; return -1; } const Status st = status(); // addserver初始化胜利,设置为ready就绪状态 if (st != READY) { if (st == RUNNING) { LOG(ERROR) << "Server[" << version() << "] is already running on " << _listen_addr; } else { LOG(ERROR) << "Can't start Server[" << version() << "] which is " << status_str(status()); } return -1; } if (opt) { _options = *opt; } else { _options = ServerOptions(); } // Init _keytable_pool always. If the server was stopped before, the pool // should be destroyed in Join(). _keytable_pool = new bthread_keytable_pool_t; if (bthread_keytable_pool_init(_keytable_pool) != 0) { LOG(ERROR) << "Fail to init _keytable_pool"; delete _keytable_pool; _keytable_pool = NULL; return -1; } _tl_options = ThreadLocalOptions(); _concurrency = 0; if (_options.has_builtin_services && _builtin_service_count <= 0 && AddBuiltinServices() != 0) { LOG(ERROR) << "Fail to add builtin services"; return -1; } // If a server is started/stopped for mutiple times and one of the options // sets has_builtin_service to true, builtin services will be enabled for // any later re-start. Check this case and report to user. if (!_options.has_builtin_services && _builtin_service_count > 0) { LOG(ERROR) << "A server started/stopped for multiple times must be " "consistent on ServerOptions.has_builtin_services"; return -1; } // Prepare all restful maps for (ServiceMap::const_iterator it = _fullname_service_map.begin(); it != _fullname_service_map.end(); ++it) { if (it->second.restful_map) { it->second.restful_map->PrepareForFinding(); } } if (_global_restful_map) { _global_restful_map->PrepareForFinding(); } // cpu核数+1,设置协程数量 if (_options.num_threads > 0) { if (FLAGS_usercode_in_pthread) { _options.num_threads += FLAGS_usercode_backup_threads; } if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) { _options.num_threads = BTHREAD_MIN_CONCURRENCY; } bthread_setconcurrency(_options.num_threads); } // 设置限流auto、constant、unlimited for (MethodMap::iterator it = _method_map.begin(); it != _method_map.end(); ++it) { if (it->second.is_builtin_service) { it->second.status->SetConcurrencyLimiter(NULL); } else { const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency; if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) { amc = &_options.method_max_concurrency; } ConcurrencyLimiter* cl = NULL; if (!CreateConcurrencyLimiter(*amc, &cl)) { LOG(ERROR) << "Fail to create ConcurrencyLimiter for method"; return -1; } it->second.status->SetConcurrencyLimiter(cl); } } _listen_addr.ip = ip; for (int port = port_range.min_port; port <= port_range.max_port; ++port) { _listen_addr.port = port; // 创立listen_fd套接字 butil::fd_guard sockfd(tcp_listen(_listen_addr)); if (sockfd < 0) { if (port != port_range.max_port) { // not the last port, try next continue; } if (port_range.min_port != port_range.max_port) { LOG(ERROR) << "Fail to listen " << ip << ":[" << port_range.min_port << '-' << port_range.max_port << ']'; } else { LOG(ERROR) << "Fail to listen " << _listen_addr; } return -1; } if (_listen_addr.port == 0) { // port=0 makes kernel dynamically select a port from // https://en.wikipedia.org/wiki/Ephemeral_port _listen_addr.port = get_port_from_fd(sockfd); if (_listen_addr.port <= 0) { LOG(ERROR) << "Fail to get port from fd=" << sockfd; return -1; } } if (_am == NULL) { // 创立接收器,将解析,打包等协定都存储起来 _am = BuildAcceptor(); if (NULL == _am) { LOG(ERROR) << "Fail to build acceptor"; return -1; } } // Set `_status' to RUNNING before accepting connections // to prevent requests being rejected as ELOGOFF _status = RUNNING; time(&_last_start_time);÷ // 记录一些信息到version中 GenerateVersionIfNeeded(); g_running_server_count.fetch_add(1, butil::memory_order_relaxed); // 开启协程,创立epoll_create,接管客户端的连贯、申请 if (_am->StartAccept(sockfd, _options.idle_timeout_sec, _default_ssl_ctx) != 0) { LOG(ERROR) << "Fail to start acceptor"; return -1; } sockfd.release(); break; // stop trying } // 内置服务accept创立的信息 if (_options.internal_port >= 0 && _options.has_builtin_services) { // 同样的解决形式 ... } // pid写入文件 PutPidFileIfNeeded(); // 更新指标连贯信息等 CHECK_EQ(INVALID_BTHREAD, _derivative_thread); if (bthread_start_background(&_derivative_thread, NULL, UpdateDerivedVars, this) != 0) { LOG(ERROR) << "Fail to create _derivative_thread"; return -1; } // Print tips to server launcher. int http_port = _listen_addr.port; std::ostringstream server_info; server_info << "Server[" << version() << "] is serving on port=" << _listen_addr.port; if (_options.internal_port >= 0 && _options.has_builtin_services) { http_port = _options.internal_port; server_info << " and internal_port=" << _options.internal_port; } LOG(INFO) << server_info.str() << '.'; if (_options.has_builtin_services) { LOG(INFO) << "Check out http://" << butil::my_hostname() << ':' << http_port << " in web browser."; } else { LOG(WARNING) << "Builtin services are disabled according to " "ServerOptions.has_builtin_services"; } // 设置链路追踪的地址 SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port)); revert_server.release(); return 0;}
StartInternal中,创立了套接字、创立接收器。外围还是在StartAccept中。
接管连贯套接字StartAccept申请
int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec, const std::shared_ptr<SocketSSLContext>& ssl_ctx) { // 校验套接字合法性 if (listened_fd < 0) { LOG(FATAL) << "Invalid listened_fd=" << listened_fd; return -1; } ... // Creation of _acception_id is inside lock so that OnNewConnections // (which may run immediately) should see sane fields set below. SocketOptions options; options.fd = listened_fd; options.user = this; // 设置回调,到时候epoll收到连贯的申请的时候,会调用这个函数 options.on_edge_triggered_events = OnNewConnections; if (Socket::Create(options, &_acception_id) != 0) { // Close-idle-socket thread will be stopped inside destructor LOG(FATAL) << "Fail to create _acception_id"; return -1; } _listened_fd = listened_fd; _status = RUNNING; return 0;}int Socket::Create(const SocketOptions& options, SocketId* id) { butil::ResourceId<Socket> slot; // 获取创立套接字 Socket* const m = butil::get_resource(&slot, Forbidden()); if (m == NULL) { LOG(FATAL) << "Fail to get_resource<Socket>"; return -1; } // 初始化socket g_vars->nsocket << 1; CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed)); m->_nevent.store(0, butil::memory_order_relaxed); m->_keytable_pool = options.keytable_pool; m->_tos = 0; m->_remote_side = options.remote_side; m->_on_edge_triggered_events = options.on_edge_triggered_events; m->_this_id = MakeSocketId( VersionOfVRef(m->_versioned_ref.fetch_add( 1, butil::memory_order_release)), slot); m->_preferred_index = -1; m->_hc_count = 0; CHECK(m->_read_buf.empty()); const int64_t cpuwide_now = butil::cpuwide_time_us(); m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed); m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); if (rc2) { LOG(ERROR) << "Fail to create auth_id: " << berror(rc2); m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); return -1; } // NOTE: last two params are useless in bthread > r32787 const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512); if (rc) { LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc); m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc)); return -1; } m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); m->_unwritten_bytes.store(0, butil::memory_order_relaxed); CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed)); // Must be last one! Internal fields of this Socket may be access // just after calling ResetFileDescriptor. // 这里设置套接字的一些信息,并且把fd注册到epoll队列 if (m->ResetFileDescriptor(options.fd) != 0) { const int saved_errno = errno; PLOG(ERROR) << "Fail to ResetFileDescriptor"; m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno)); return -1; } *id = m->_this_id; return 0;}
调用socket::Create来创立套接字,这里套接字的accept、connect等,都是用这个函数创立的。on_edge_triggered_events用来回调解决,这里当产生客户端链接的事件时,咱们调用回调OnNewConnections。
ResetFileDescriptor设置socket信息,并注册epoll中
int Socket::ResetFileDescriptor(int fd) { // Reset message sizes when fd is changed. _last_msg_size = 0; _avg_msg_size = 0; _fd.store(fd, butil::memory_order_release); _reset_fd_real_us = butil::gettimeofday_us(); // 校验fd是否非法 if (!ValidFileDescriptor(fd)) { return 0; } // 获取本地ip信息 if (butil::get_local_side(fd, &_local_side) != 0) { _local_side = butil::EndPoint(); } // 敞开一些子过程等无用文件描述符 butil::make_close_on_exec(fd); // 设置非阻塞 if (butil::make_non_blocking(fd) != 0) { PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking"; return -1; } // 敞开Nagle算法 butil::make_no_delay(fd); if (_tos > 0 && setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) { PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos; } // 设置发送缓冲区 if (FLAGS_socket_send_buffer_size > 0) { int buff_size = FLAGS_socket_send_buffer_size; socklen_t size = sizeof(buff_size); if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) { PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to " << buff_size; } } // 设置接收缓冲区大小 if (FLAGS_socket_recv_buffer_size > 0) { int buff_size = FLAGS_socket_recv_buffer_size; socklen_t size = sizeof(buff_size); if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) { PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to " << buff_size; } } // 第一次创立epoll_create,并将接管fd退出到epoll_ctl_add队列中 if (_on_edge_triggered_events) { if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; _fd.store(-1, butil::memory_order_release); return -1; } } return 0;}
这里设置了fd。并且启动协程,创立epoll队列。并将连贯设置为ET(边际触发)模式。此次,咱们就能够承受来自client端的申请了。
IO多路复用EventDispatcher::Run(),事件监听
void EventDispatcher::Run() { while (!_stop) { epoll_event e[32]; // 立即返回 int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0); // 没有事件的时候 if (n == 0) { // 阻塞期待事件到来 n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); } if (_stop) { break; } if (n < 0) { if (EINTR == errno) { // We've checked _stop, no wake-up will be missed. continue; } PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; break; } // 事件EPOLLIN for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) || (e[i].events & has_epollrdhup) ) { // We don't care about the return value. Socket::StartInputEvent(e[i].data.u64, e[i].events, _consumer_thread_attr); } } // 事件EPOLLOUT for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { // We don't care about the return value. Socket::HandleEpollOut(e[i].data.u64); } } }}
当有连贯事件时,触发epolin,调用StartInputEvent。
int Socket::StartInputEvent(SocketId id, uint32_t events, const bthread_attr_t& thread_attr) { SocketUniquePtr s; // 依据之前AddConsumer的id,找到对应Socket::Create创立的套接字映射 if (Address(id, &s) < 0) { return -1; } // 校验回调函数是否为空,这里是OnNewConnections if (NULL == s->_on_edge_triggered_events) { return 0; } if (s->fd() < 0) { CHECK(!(events & EPOLLIN)) << "epoll_events=" << events; return -1; } // 这里头,如果有多个事件产生,每个fd只保障只创立一个协程来解决。来保障并发平安问题 if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) { g_vars->neventthread << 1; bthread_t tid; // transfer ownership as well, don't use s anymore! Socket* const p = s.release(); bthread_attr_t attr = thread_attr; attr.keytable_pool = p->_keytable_pool; if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; ProcessEvent(p); } } return 0;}void* Socket::ProcessEvent(void* arg) { SocketUniquePtr s(static_cast<Socket*>(arg)); // 调用OnNewConnections s->_on_edge_triggered_events(s.get()); return NULL;}
Address依据listend_fd获取信息,在启动协程,调用回调OnNewConnections
OnNewConnections接管连贯申请
void Acceptor::OnNewConnections(Socket* acception) { int progress = Socket::PROGRESS_INIT; do { while (1) { struct sockaddr in_addr; socklen_t in_len = sizeof(in_addr); // 承受到client发动连贯服务器的申请 butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len)); if (in_fd < 0) { // 非阻塞,曾经解决了所有的连贯,则返回 if (errno == EAGAIN) { return; } // 其余的一些起因,须要持续接管 continue; } Acceptor* am = dynamic_cast<Acceptor*>(acception->user()); if (NULL == am) { LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor"; acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor"); return; } SocketId socket_id; SocketOptions options; options.keytable_pool = am->_keytable_pool; options.fd = in_fd; options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr); options.user = acception->user(); // 执行read读取缓冲区的数据 options.on_edge_triggered_events = InputMessenger::OnNewMessages; options.initial_ssl_ctx = am->_ssl_ctx; // 跟listen_fd一样的解决,创立接管的fd。并将fd注册到epoll队列,启动协程当客户端端发送数据申请时。 if (Socket::Create(options, &socket_id) != 0) { LOG(ERROR) << "Fail to create Socket"; continue; } in_fd.release(); // transfer ownership to socket_id SocketUniquePtr sock; // 开释回收socket if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) { bool is_running = true; { BAIDU_SCOPED_LOCK(am->_map_mutex); is_running = (am->status() == RUNNING); // fd统计,检测 am->_socket_map.insert(socket_id, ConnectStatistics()); } if (!is_running) { LOG(WARNING) << "Acceptor on fd=" << acception->fd() << " has been stopped, discard newly created " << *sock; sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, " "discard newly created %s", acception->fd(), sock->description().c_str()); return; } } } if (acception->Failed()) { return; } // 这里对应因为只有一个协程同时解决同一个fd事件 } while (acception->MoreReadEvents(&progress));}
这里accept获取到客户端发动的连贯申请。并将fd也注册到epoll队列中。
OnNewMessages读事件调用
void InputMessenger::OnNewMessages(Socket* m) { InputMessenger* messenger = static_cast<InputMessenger*>(m->user()); const InputMessageHandler* handlers = messenger->_handlers; int progress = Socket::PROGRESS_INIT; std::unique_ptr<InputMessageBase, RunLastMessage> last_msg; bool read_eof = false; while (!read_eof) { const int64_t received_us = butil::cpuwide_time_us(); const int64_t base_realtime = butil::gettimeofday_us() - received_us; size_t once_read = m->_avg_msg_size * 16; // 依据之前接管的包大大小,来判断本次要接管解决的数据 if (once_read < MIN_ONCE_READ) { once_read = MIN_ONCE_READ; } else if (once_read > MAX_ONCE_READ) { once_read = MAX_ONCE_READ; } // read读取数据 const ssize_t nr = m->DoRead(once_read); if (nr <= 0) { // 对端敞开连贯 if (0 == nr) { read_eof = true; } else if (errno != EAGAIN) { if (errno == EINTR) { continue; // just retry } const int saved_errno = errno; PLOG(WARNING) << "Fail to read from " << *m; m->SetFailed(saved_errno, "Fail to read from %s: %s", m->description().c_str(), berror(saved_errno)); return; } else if (!m->MoreReadEvents(&progress)) { // EAGAIN or EWOULDBLOCK 谬误阐明缓冲区没有数据可读了 return; } else { // 对应于咱们每个fd只用一个协程解决,所以持续接管 continue; } } // 统计等应用 m->AddInputBytes(nr); // Avoid this socket to be closed due to idle_timeout_s m->_last_readtime_us.store(received_us, butil::memory_order_relaxed); size_t last_size = m->_read_buf.length(); int num_bthread_created = 0; while (1) { size_t index = 8888; // 解析client传过来的数据,index记录解析的协定时哪种 ParseResult pr = messenger->CutInputMessage(m, &index, read_eof); if (!pr.is_ok()) { // 半包状况,持续收包 if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { m->_last_msg_size += (last_size - m->_read_buf.length()); break; // 其余谬误退出 } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) { LOG(WARNING) << "Close " << *m << " due to unknown message: " << butil::ToPrintable(m->_read_buf); m->SetFailed(EINVAL, "Close %s due to unknown message", m->description().c_str()); return; } else { LOG(WARNING) << "Close " << *m << ": " << pr.error_str(); m->SetFailed(EINVAL, "Close %s: %s", m->description().c_str(), pr.error_str()); return; } } m->AddInputMessages(1); // Calculate average size of messages const size_t cur_size = m->_read_buf.length(); if (cur_size == 0) { m->_read_buf.return_cached_blocks(); } // 一个包的大小last_size - cur_size m->_last_msg_size += (last_size - cur_size); last_size = cur_size; const size_t old_avg = m->_avg_msg_size; if (old_avg != 0) { m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size) / MSG_SIZE_WINDOW; } else { m->_avg_msg_size = m->_last_msg_size; } m->_last_msg_size = 0; if (pr.message() == NULL) { // the Process() step can be skipped. continue; } pr.message()->_received_us = received_us; pr.message()->_base_real_us = base_realtime; DestroyingPtr<InputMessageBase> msg(pr.message()); // 第一次的时候,last_msg为null。如果退出RunLastMessage会调用ProcessInputMessage // QueueMessage也会启动协程调用 ProcessInputMessage QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool); if (handlers[index].process == NULL) { LOG(ERROR) << "process of index=" << index << " is NULL"; continue; } m->ReAddress(&msg->_socket); m->PostponeEOF(); msg->_process = handlers[index].process; msg->_arg = handlers[index].arg; if (handlers[index].verify != NULL) { int auth_error = 0; if (0 == m->FightAuthentication(&auth_error)) { // Get the right to authenticate if (handlers[index].verify(msg.get())) { m->SetAuthentication(0); } else { m->SetAuthentication(ERPCAUTH); LOG(WARNING) << "Fail to authenticate " << *m; m->SetFailed(ERPCAUTH, "Fail to authenticate %s", m->description().c_str()); return; } } else { LOG_IF(FATAL, auth_error != 0) << "Impossible! Socket should have been " "destroyed when authentication failed"; } } if (!m->is_read_progressive()) { // msg置换给last_msg last_msg.reset(msg.release()); } else { QueueMessage(msg.release(), &num_bthread_created, m->_keytable_pool); bthread_flush(); num_bthread_created = 0; } } if (num_bthread_created) { bthread_flush(); } } if (read_eof) { m->SetEOF(); }}
DoRead读取缓冲区的数据。CutInputMessage校验每次获取的数据是否正确。咱们拿http的协定来举例。之后校验胜利后,启动协程调用QueueMessage。
包校验CutInputMessage
ParseResult InputMessenger::CutInputMessage( Socket* m, size_t* index, bool read_eof) { // 初始化为-1,存储上一次fd解析的协定,缩小遍历次数 const int preferred = m->preferred_index(); const int max_index = (int)_max_index.load(butil::memory_order_acquire); // 先解析上次解析过的协定 if (preferred >= 0 && preferred <= max_index && _handlers[preferred].parse != NULL) { ParseResult result = // 依照http协定间隔,解析应用调用的函数为ParseHttpMessage _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg); // 校验包不残缺返回 if (result.is_ok() || result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { *index = preferred; return result; // 其余谬误返回 } else if (result.error() != PARSE_ERROR_TRY_OTHERS) { return result; } // 校验SocketUser是否为空 if (m->CreatedByConnect() && (ProtocolType)preferred != PROTOCOL_BAIDU_STD) { // The protocol is fixed at client-side, no need to try others. LOG(ERROR) << "Fail to parse response from " << m->remote_side() << " by " << _handlers[preferred].name << " at client-side"; return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); } // Clear context before trying next protocol which probably has // an incompatible context with the current one. if (m->parsing_context()) { m->reset_parsing_context(NULL); } m->set_preferred_index(-1); } // 如果上次解析的解析不胜利,在从新从0开始解析。一个个协定解析 for (int i = 0; i <= max_index; ++i) { // 之前解析过了,跳过 if (i == preferred || _handlers[i].parse == NULL) { // Don't try preferred handler(already tried) or invalid handler continue; } // 执行每个Protocol外头的Parse ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg); if (result.is_ok() || result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { m->set_preferred_index(i); *index = i; return result; } else if (result.error() != PARSE_ERROR_TRY_OTHERS) { return result; } if (m->parsing_context()) { m->reset_parsing_context(NULL); } // Try other protocols. } return MakeParseError(PARSE_ERROR_TRY_OTHERS);}
对收到的包,依据协定一个个解析。确认包是否正确
QueueMessage对收到的包,开启协程进行逻辑解决
static void QueueMessage(InputMessageBase* to_run_msg, int* num_bthread_created, bthread_keytable_pool_t* keytable_pool) { if (!to_run_msg) { return; } bthread_t th; bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; tmp.keytable_pool = keytable_pool; // 启动协程执行 ProcessInputMessage if (bthread_start_background( &th, &tmp, ProcessInputMessage, to_run_msg) == 0) { ++*num_bthread_created; } else { ProcessInputMessage(to_run_msg); }}void* ProcessInputMessage(void* void_arg) { InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg); // 调用ProcessHttpRequest msg->_process(msg); return NULL;}
启动协程,调用ProcessInputMessage
以http协定为例ProcessHttpRequest执行调用
void ProcessHttpRequest(InputMessageBase *msg) { const int64_t start_parse_us = butil::cpuwide_time_us(); // ParseHttpMessage时ParseFromIOBuf解析将数据都存储到HttpContext内 DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg)); SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket()); Socket* socket = socket_guard.get(); const Server* server = static_cast<const Server*>(msg->arg()); ScopedNonServiceError non_service_error(server); // 创立controller Controller* cntl = new (std::nothrow) Controller; if (NULL == cntl) { LOG(FATAL) << "Fail to new Controller"; return; } // 后续http回包都调用改析构函数 HttpResponseSender resp_sender(cntl); resp_sender.set_received_us(msg->received_us()); // 判断是否是grpc协定 const bool is_http2 = imsg_guard->header().is_http2(); if (is_http2) { H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg); resp_sender.set_h2_stream_id(h2_sctx->stream_id()); } ControllerPrivateAccessor accessor(cntl); HttpHeader& req_header = cntl->http_request(); // http head局部 imsg_guard->header().Swap(req_header); // http body局部 butil::IOBuf& req_body = imsg_guard->body(); butil::EndPoint user_addr; if (!GetUserAddressFromHeader(req_header, &user_addr)) { user_addr = socket->remote_side(); } ServerPrivateAccessor server_accessor(server); const bool security_mode = server->options().security_mode() && socket->user() == server_accessor.acceptor(); accessor.set_server(server) .set_security_mode(security_mode) .set_peer_id(socket->id()) .set_remote_side(user_addr) .set_local_side(socket->local_side()) .set_auth_context(socket->auth_context()) .set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP) .set_begin_time_us(msg->received_us()) .move_in_server_receiving_sock(socket_guard); // 依据http对的包头,做一些逻辑解决,如log_id设置,trace_id ... ... // 解析http urI,看是否和服务生成的map匹配 const Server::MethodProperty* const sp = FindMethodPropertyByURI(path, server, &req_header._unresolved_path); if (NULL == sp) { if (security_mode) { std::string escape_path; WebEscape(path, &escape_path); cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str()); } else { cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str()); } return; } else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) { BadMethodRequest breq; BadMethodResponse bres; butil::StringSplitter split(path.c_str(), '/'); breq.set_service_name(std::string(split.field(), split.length())); sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL); return; } // Switch to service-specific error. non_service_error.release(); MethodStatus* method_status = sp->status; resp_sender.set_method_status(method_status); if (method_status) { int rejected_cc = 0; if (!method_status->OnRequested(&rejected_cc)) { cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d", sp->method->full_name().c_str(), rejected_cc); return; } } // 过载爱护的一些解决 if (!sp->is_builtin_service && !sp->params.is_tabbed) { // 过载等 if (socket->is_overcrowded()) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); return; } // 限流 if (!server_accessor.AddConcurrency(cntl)) { cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", server->options().max_concurrency); return; } // 线程是否过多 if (FLAGS_usercode_in_pthread && TooManyUserCode()) { cntl->SetFailed(ELIMIT, "Too many user code to run when" " -usercode_in_pthread is on"); return; } } else if (security_mode) { cntl->SetFailed(EPERM, "Not allowed to access builtin services, try " "ServerOptions.internal_port=%d instead if you're in" " internal network", server->options().internal_port); return; } //依据收到申请,调用对应的rpc办法,并依据办法创立req和resp google::protobuf::Service* svc = sp->service; const google::protobuf::MethodDescriptor* method = sp->method; accessor.set_method(method); google::protobuf::Message* req = svc->GetRequestPrototype(method).New(); resp_sender.own_request(req); google::protobuf::Message* res = svc->GetResponsePrototype(method).New(); resp_sender.own_response(res); if (__builtin_expect(!req || !res, 0)) { PLOG(FATAL) << "Fail to new req or res"; cntl->SetFailed("Fail to new req or res"); return; } // http_body 转pb过程 if (sp->params.allow_http_body_to_pb && method->input_type()->field_count() > 0) { // A protobuf service. No matter if Content-type is set to // applcation/json or body is empty, we have to treat body as a json // and try to convert it to pb, which guarantees that a protobuf // service is always accessed with valid requests. if (req_body.empty()) { // Treat empty body specially since parsing it results in error if (!req->IsInitialized()) { cntl->SetFailed(EREQUEST, "%s needs to be created from a" " non-empty json, it has required fields.", req->GetDescriptor()->full_name().c_str()); return; } // else all fields of the request are optional. } else { bool is_grpc_ct = false; const HttpContentType content_type = ParseContentType(req_header.content_type(), &is_grpc_ct); const std::string* encoding = NULL; // grpc协定依据不同的包头设置,如是否压缩,encoding等,对body校验解析 if (is_http2) { if (is_grpc_ct) { bool grpc_compressed = false; if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) { cntl->SetFailed(ERESPONSE, "Invalid gRPC response"); return; } if (grpc_compressed) { encoding = req_header.GetHeader(common->GRPC_ENCODING); if (encoding == NULL) { cntl->SetFailed( EREQUEST, "Fail to find header `grpc-encoding'" " in compressed gRPC request"); return; } } int64_t timeout_value_us = ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT)); if (timeout_value_us >= 0) { accessor.set_deadline_us( butil::gettimeofday_us() + timeout_value_us); } } } else { encoding = req_header.GetHeader(common->CONTENT_ENCODING); } if (encoding != NULL && *encoding == common->GZIP) { TRACEPRINTF("Decompressing request=%lu", (unsigned long)req_body.size()); butil::IOBuf uncompressed; if (!policy::GzipDecompress(req_body, &uncompressed)) { cntl->SetFailed(EREQUEST, "Fail to un-gzip request body"); return; } req_body.swap(uncompressed); } if (content_type == HTTP_CONTENT_PROTO) { if (!ParsePbFromIOBuf(req, req_body)) { cntl->SetFailed(EREQUEST, "Fail to parse http body as %s", req->GetDescriptor()->full_name().c_str()); return; } } else { butil::IOBufAsZeroCopyInputStream wrapper(req_body); std::string err; json2pb::Json2PbOptions options; options.base64_to_bytes = sp->params.pb_bytes_to_base64; cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64); if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) { cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s", req->GetDescriptor()->full_name().c_str(), err.c_str()); return; } } } } else { // 如果不解析body to pb局部,则保留原始数据 cntl->request_attachment().swap(req_body); } // 创立done节点 google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender); imsg_guard.reset(); // optional, just release resourse ASAP // 这里开始调用rpc生成的办法,如echo.proto if (!FLAGS_usercode_in_pthread) { return svc->CallMethod(method, cntl, req, res, done); } if (BeginRunningUserCode()) { svc->CallMethod(method, cntl, req, res, done); return EndRunningUserCodeInPlace(); } else { return EndRunningCallMethodInPool(svc, method, cntl, req, res, done); }}
将收到的数据进行转化,创立control,req、resp、done等数据。最终调用CallMethod办法
echo.proto中echo.pb.cc的CallMethod办法
void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method, ::google::protobuf::RpcController* controller, const ::google::protobuf::Message* request, ::google::protobuf::Message* response, ::google::protobuf::Closure* done) { GOOGLE_DCHECK_EQ(method->service(), protobuf_echo_2eproto::file_level_service_descriptors[0]); switch(method->index()) { case 0: // 调用impl中的Echo办法 Echo(controller, ::google::protobuf::down_cast<const ::example::EchoRequest*>(request), ::google::protobuf::down_cast< ::example::EchoResponse*>(response), done); break; default: GOOGLE_LOG(FATAL) << "Bad method index; this should never happen."; break; }}
调用echo办法,就是调用了EchoServiceImpl::echo
EchoServiceImpl::Echo
class EchoServiceImpl : public EchoService {public: EchoServiceImpl() {}; virtual ~EchoServiceImpl() {}; virtual void Echo(google::protobuf::RpcController* cntl, const EchoRequest* request, EchoResponse* response, google::protobuf::Closure* done) { // 写本人的业务逻辑 }};
至此咱们曾经收到数据做业务逻辑解决了
总结
brpc作为服务端整个收包过程根本就是这样,置信认真看完这篇文章。你对服务端必定会有肯定的播种。前面咱们在再写文章剖析介绍brpc服务端回包。 client收发包。brpc协程,socket套接字资源管理等。 关注我,我是dandyhuang。也可wx收dandyhuang_,有什么问题咱们能够一起探讨交换。