大家好,我是dandyhuang,brpc在c艹届还是比拟牛逼的rpc框架,本次带来brpc的server端的源码剖析。剖析源码前,大家先搭建好环境,有利于代码调试和了解。依照brpc框架中example的echo_c++为例子,并且将protobuf中,编译出的两头文件echo.pb.cc和.h保留,有利于咱们更好的代码了解。

server端应用

namespace example {class EchoServiceImpl : public EchoService {public:    EchoServiceImpl() {};    virtual ~EchoServiceImpl() {};    virtual void Echo(google::protobuf::RpcController* cntl_base,                      const EchoRequest* request,                      EchoResponse* response,                      google::protobuf::Closure* done) {        brpc::ClosureGuard done_guard(done);        brpc::Controller* cntl =            static_cast<brpc::Controller*>(cntl_base);        response->set_message(request->message());        if (FLAGS_echo_attachment) {            cntl->response_attachment().append(cntl->request_attachment());        }    }};int main(int argc, char* argv[]) {    // Parse gflags. We recommend you to use gflags as well.    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);    brpc::Server server;      // 继承proto文件    example::EchoServiceImpl echo_service_impl;    if (server.AddService(&echo_service_impl,                           brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {        LOG(ERROR) << "Fail to add service";        return -1;    }    butil::EndPoint point;    point = butil::EndPoint(butil::IP_ANY, FLAGS_port);    // Start the server.    brpc::ServerOptions options;    options.idle_timeout_sec = FLAGS_idle_timeout_s;    if (server.Start(point, &options) != 0) {        LOG(ERROR) << "Fail to start EchoServer";        return -1;    }    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.    server.RunUntilAskedToQuit();    return 0;}

总体代码还是比拟简洁,如果可能依据IDL主动代码生成那样就更完满了。话不多说,间接开撸源码

brpc::Server::AddService初始化各种数据

int Server::AddServiceInternal(google::protobuf::Service* service,                               bool is_builtin_service,                               const ServiceOptions& svc_opt) {    // 如果idl中imp没有定义方法,那么校验失败    const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();    if (sd->method_count() == 0) {        LOG(ERROR) << "service=" << sd->full_name()                   << " does not have any method.";        return -1;    }        // 初始化并注册:NamingService,LoadBalancer,CompressHandler,protocols等    // 后续的收发包校验函数都会应用    if (InitializeOnce() != 0) {        LOG(ERROR) << "Fail to initialize Server[" << version() << ']';        return -1;    }    // InitializeOnce初始化失败则退出    if (status() != READY) {        LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server["                   << version() << "] which is " << status_str(status());        return -1;    }    // defined `option (idl_support) = true' or not.    const bool is_idl_support = sd->file()->options().GetExtension(idl_support);            Tabbed* tabbed = dynamic_cast<Tabbed*>(service);    // 初始化定义的service    for (int i = 0; i < sd->method_count(); ++i) {        const google::protobuf::MethodDescriptor* md = sd->method(i);        MethodProperty mp;        mp.is_builtin_service = is_builtin_service;        mp.own_method_status = true;        mp.params.is_tabbed = !!tabbed;        mp.params.allow_default_url = svc_opt.allow_default_url;        mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;        mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;        mp.service = service;        mp.method = md;        mp.status = new MethodStatus;        _method_map[md->full_name()] = mp;        if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {            MethodProperty mp2 = mp;            mp2.own_method_status = false;            // have to map service_name + method_name as well because ubrpc            // does not send the namespace before service_name.            std::string full_name_wo_ns;            full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());            full_name_wo_ns.append(sd->name());            full_name_wo_ns.push_back('.');            full_name_wo_ns.append(md->name());            if (_method_map.seek(full_name_wo_ns) == NULL) {                _method_map[full_name_wo_ns] = mp2;            } else {                LOG(ERROR) << '`' << full_name_wo_ns << "' already exists";                RemoveMethodsOf(service);                return -1;            }        }    }    const ServiceProperty ss = {        is_builtin_service, svc_opt.ownership, service, NULL };    _fullname_service_map[sd->full_name()] = ss;    _service_map[sd->name()] = ss;    if (is_builtin_service) {        ++_builtin_service_count;    } else {        if (_first_service == NULL) {            _first_service = service;        }    }    butil::StringPiece restful_mappings = svc_opt.restful_mappings;    restful_mappings.trim_spaces();    // restful_mappings解析咱们临时就不剖析了,次要就是匹配办法    if (!restful_mappings.empty()) {        // Parse the mappings.        ···        ···        ···    }        // AddBuiltinService 实例化。idl生成的服务,不属于tabbed    if (tabbed) {        if (_tab_info_list == NULL) {            _tab_info_list = new TabInfoList;        }        const size_t last_size = _tab_info_list->size();        tabbed->GetTabInfo(_tab_info_list);        const size_t cur_size = _tab_info_list->size();        for (size_t i = last_size; i != cur_size; ++i) {            const TabInfo& info = (*_tab_info_list)[i];            if (!info.valid()) {                LOG(ERROR) << "Invalid TabInfo: path=" << info.path                           << " tab_name=" << info.tab_name;                _tab_info_list->resize(last_size);                RemoveService(service);                return -1;            }        }    }    return 0;}

AddServiceInternal中,能够用于echo.proto生成的服务和BuiltinService的服务校验,初始化等。如办法是否有没定义,存储映射的关系:_method_map、_service_map、_fullname_service_map等

StartInternal外部其余服务也调用该函数

int Server::StartInternal(const butil::ip_t& ip,                          const PortRange& port_range,                          const ServerOptions *opt) {    std::unique_ptr<Server, RevertServerStatus> revert_server(this);    if (_failed_to_set_max_concurrency_of_method) {        _failed_to_set_max_concurrency_of_method = false;        LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "            "fix it before starting server";        return -1;    }    // addserver已初始化    if (InitializeOnce() != 0) {        LOG(ERROR) << "Fail to initialize Server[" << version() << ']';        return -1;    }    const Status st = status();    // addserver初始化胜利,设置为ready就绪状态    if (st != READY) {        if (st == RUNNING) {            LOG(ERROR) << "Server[" << version() << "] is already running on "                       << _listen_addr;        } else {            LOG(ERROR) << "Can't start Server[" << version()                       << "] which is " << status_str(status());        }        return -1;    }    if (opt) {        _options = *opt;    } else {        _options = ServerOptions();    }    // Init _keytable_pool always. If the server was stopped before, the pool    // should be destroyed in Join().    _keytable_pool = new bthread_keytable_pool_t;    if (bthread_keytable_pool_init(_keytable_pool) != 0) {        LOG(ERROR) << "Fail to init _keytable_pool";        delete _keytable_pool;        _keytable_pool = NULL;        return -1;    }      _tl_options = ThreadLocalOptions();    _concurrency = 0;    if (_options.has_builtin_services &&        _builtin_service_count <= 0 &&        AddBuiltinServices() != 0) {        LOG(ERROR) << "Fail to add builtin services";        return -1;    }    // If a server is started/stopped for mutiple times and one of the options    // sets has_builtin_service to true, builtin services will be enabled for    // any later re-start. Check this case and report to user.    if (!_options.has_builtin_services && _builtin_service_count > 0) {        LOG(ERROR) << "A server started/stopped for multiple times must be "            "consistent on ServerOptions.has_builtin_services";        return -1;    }    // Prepare all restful maps    for (ServiceMap::const_iterator it = _fullname_service_map.begin();         it != _fullname_service_map.end(); ++it) {        if (it->second.restful_map) {            it->second.restful_map->PrepareForFinding();        }    }    if (_global_restful_map) {        _global_restful_map->PrepareForFinding();    }        // cpu核数+1,设置协程数量    if (_options.num_threads > 0) {        if (FLAGS_usercode_in_pthread) {            _options.num_threads += FLAGS_usercode_backup_threads;        }        if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {            _options.num_threads = BTHREAD_MIN_CONCURRENCY;        }        bthread_setconcurrency(_options.num_threads);    }        // 设置限流auto、constant、unlimited    for (MethodMap::iterator it = _method_map.begin();        it != _method_map.end(); ++it) {        if (it->second.is_builtin_service) {            it->second.status->SetConcurrencyLimiter(NULL);        } else {            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {                amc = &_options.method_max_concurrency;            }            ConcurrencyLimiter* cl = NULL;            if (!CreateConcurrencyLimiter(*amc, &cl)) {                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";                return -1;            }            it->second.status->SetConcurrencyLimiter(cl);        }    }    _listen_addr.ip = ip;    for (int port = port_range.min_port; port <= port_range.max_port; ++port) {        _listen_addr.port = port;        // 创立listen_fd套接字        butil::fd_guard sockfd(tcp_listen(_listen_addr));        if (sockfd < 0) {            if (port != port_range.max_port) { // not the last port, try next                continue;            }            if (port_range.min_port != port_range.max_port) {                LOG(ERROR) << "Fail to listen " << ip                           << ":[" << port_range.min_port << '-'                           << port_range.max_port << ']';            } else {                LOG(ERROR) << "Fail to listen " << _listen_addr;            }            return -1;        }        if (_listen_addr.port == 0) {            // port=0 makes kernel dynamically select a port from            // https://en.wikipedia.org/wiki/Ephemeral_port            _listen_addr.port = get_port_from_fd(sockfd);            if (_listen_addr.port <= 0) {                LOG(ERROR) << "Fail to get port from fd=" << sockfd;                return -1;            }        }        if (_am == NULL) {           // 创立接收器,将解析,打包等协定都存储起来            _am = BuildAcceptor();            if (NULL == _am) {                LOG(ERROR) << "Fail to build acceptor";                return -1;            }        }        // Set `_status' to RUNNING before accepting connections        // to prevent requests being rejected as ELOGOFF        _status = RUNNING;        time(&_last_start_time);÷        // 记录一些信息到version中        GenerateVersionIfNeeded();        g_running_server_count.fetch_add(1, butil::memory_order_relaxed);        // 开启协程,创立epoll_create,接管客户端的连贯、申请        if (_am->StartAccept(sockfd, _options.idle_timeout_sec,                             _default_ssl_ctx) != 0) {            LOG(ERROR) << "Fail to start acceptor";            return -1;        }        sockfd.release();        break; // stop trying    }    // 内置服务accept创立的信息    if (_options.internal_port >= 0 && _options.has_builtin_services) {       // 同样的解决形式       ...    }        // pid写入文件    PutPidFileIfNeeded();    // 更新指标连贯信息等    CHECK_EQ(INVALID_BTHREAD, _derivative_thread);    if (bthread_start_background(&_derivative_thread, NULL,                                 UpdateDerivedVars, this) != 0) {        LOG(ERROR) << "Fail to create _derivative_thread";        return -1;    }    // Print tips to server launcher.    int http_port = _listen_addr.port;    std::ostringstream server_info;    server_info << "Server[" << version() << "] is serving on port="                << _listen_addr.port;    if (_options.internal_port >= 0 && _options.has_builtin_services) {        http_port = _options.internal_port;        server_info << " and internal_port=" << _options.internal_port;    }    LOG(INFO) << server_info.str() << '.';    if (_options.has_builtin_services) {        LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'                  << http_port << " in web browser.";    } else {        LOG(WARNING) << "Builtin services are disabled according to "            "ServerOptions.has_builtin_services";    }    // 设置链路追踪的地址    SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));    revert_server.release();    return 0;}

StartInternal中,创立了套接字、创立接收器。外围还是在StartAccept中。

接管连贯套接字StartAccept申请

int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,                          const std::shared_ptr<SocketSSLContext>& ssl_ctx) {   // 校验套接字合法性    if (listened_fd < 0) {        LOG(FATAL) << "Invalid listened_fd=" << listened_fd;        return -1;    }    ...    // Creation of _acception_id is inside lock so that OnNewConnections    // (which may run immediately) should see sane fields set below.    SocketOptions options;    options.fd = listened_fd;    options.user = this;    // 设置回调,到时候epoll收到连贯的申请的时候,会调用这个函数    options.on_edge_triggered_events = OnNewConnections;    if (Socket::Create(options, &_acception_id) != 0) {        // Close-idle-socket thread will be stopped inside destructor        LOG(FATAL) << "Fail to create _acception_id";        return -1;    }        _listened_fd = listened_fd;    _status = RUNNING;    return 0;}int Socket::Create(const SocketOptions& options, SocketId* id) {    butil::ResourceId<Socket> slot;    // 获取创立套接字    Socket* const m = butil::get_resource(&slot, Forbidden());    if (m == NULL) {        LOG(FATAL) << "Fail to get_resource<Socket>";        return -1;    }    // 初始化socket    g_vars->nsocket << 1;    CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));    m->_nevent.store(0, butil::memory_order_relaxed);    m->_keytable_pool = options.keytable_pool;    m->_tos = 0;    m->_remote_side = options.remote_side;    m->_on_edge_triggered_events = options.on_edge_triggered_events;    m->_this_id = MakeSocketId(            VersionOfVRef(m->_versioned_ref.fetch_add(                    1, butil::memory_order_release)), slot);    m->_preferred_index = -1;    m->_hc_count = 0;    CHECK(m->_read_buf.empty());    const int64_t cpuwide_now = butil::cpuwide_time_us();    m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);    m->reset_parsing_context(options.initial_parsing_context);    m->_correlation_id = 0;    m->_health_check_interval_s = options.health_check_interval_s;    m->_ninprocess.store(1, butil::memory_order_relaxed);    m->_auth_flag_error.store(0, butil::memory_order_relaxed);    const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);    if (rc2) {        LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);        m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));        return -1;    }    // NOTE: last two params are useless in bthread > r32787    const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);    if (rc) {        LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);        m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));        return -1;    }    m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);    m->_unwritten_bytes.store(0, butil::memory_order_relaxed);    CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));    // Must be last one! Internal fields of this Socket may be access    // just after calling ResetFileDescriptor.    // 这里设置套接字的一些信息,并且把fd注册到epoll队列    if (m->ResetFileDescriptor(options.fd) != 0) {        const int saved_errno = errno;        PLOG(ERROR) << "Fail to ResetFileDescriptor";        m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",                      berror(saved_errno));        return -1;    }    *id = m->_this_id;    return 0;}

调用socket::Create来创立套接字,这里套接字的accept、connect等,都是用这个函数创立的。on_edge_triggered_events用来回调解决,这里当产生客户端链接的事件时,咱们调用回调OnNewConnections。

ResetFileDescriptor设置socket信息,并注册epoll中

int Socket::ResetFileDescriptor(int fd) {    // Reset message sizes when fd is changed.    _last_msg_size = 0;    _avg_msg_size = 0;    _fd.store(fd, butil::memory_order_release);    _reset_fd_real_us = butil::gettimeofday_us();    // 校验fd是否非法    if (!ValidFileDescriptor(fd)) {        return 0;    }    // 获取本地ip信息    if (butil::get_local_side(fd, &_local_side) != 0) {        _local_side = butil::EndPoint();    }    // 敞开一些子过程等无用文件描述符    butil::make_close_on_exec(fd);    // 设置非阻塞    if (butil::make_non_blocking(fd) != 0) {        PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";        return -1;    }    // 敞开Nagle算法    butil::make_no_delay(fd);    if (_tos > 0 &&        setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {        PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;    }        // 设置发送缓冲区    if (FLAGS_socket_send_buffer_size > 0) {        int buff_size = FLAGS_socket_send_buffer_size;        socklen_t size = sizeof(buff_size);        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {            PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to "                         << buff_size;        }    }    // 设置接收缓冲区大小            if (FLAGS_socket_recv_buffer_size > 0) {        int buff_size = FLAGS_socket_recv_buffer_size;        socklen_t size = sizeof(buff_size);        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {            PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to "                         << buff_size;        }    }        // 第一次创立epoll_create,并将接管fd退出到epoll_ctl_add队列中    if (_on_edge_triggered_events) {        if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {            PLOG(ERROR) << "Fail to add SocketId=" << id()                         << " into EventDispatcher";            _fd.store(-1, butil::memory_order_release);            return -1;        }    }    return 0;}

这里设置了fd。并且启动协程,创立epoll队列。并将连贯设置为ET(边际触发)模式。此次,咱们就能够承受来自client端的申请了。

IO多路复用EventDispatcher::Run(),事件监听

void EventDispatcher::Run() {    while (!_stop) {        epoll_event e[32];        // 立即返回        int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);        // 没有事件的时候        if (n == 0) {            // 阻塞期待事件到来            n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);        }        if (_stop) {            break;        }        if (n < 0) {            if (EINTR == errno) {                // We've checked _stop, no wake-up will be missed.                continue;            }            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;            break;        }        // 事件EPOLLIN        for (int i = 0; i < n; ++i) {            if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)                || (e[i].events & has_epollrdhup)                ) {                // We don't care about the return value.                Socket::StartInputEvent(e[i].data.u64, e[i].events,                                        _consumer_thread_attr);            }        }        // 事件EPOLLOUT        for (int i = 0; i < n; ++i) {            if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {                // We don't care about the return value.                Socket::HandleEpollOut(e[i].data.u64);            }        }    }}

当有连贯事件时,触发epolin,调用StartInputEvent。

int Socket::StartInputEvent(SocketId id, uint32_t events,                            const bthread_attr_t& thread_attr) {    SocketUniquePtr s;    // 依据之前AddConsumer的id,找到对应Socket::Create创立的套接字映射    if (Address(id, &s) < 0) {        return -1;    }      // 校验回调函数是否为空,这里是OnNewConnections    if (NULL == s->_on_edge_triggered_events) {        return 0;    }    if (s->fd() < 0) {        CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;        return -1;    }    // 这里头,如果有多个事件产生,每个fd只保障只创立一个协程来解决。来保障并发平安问题    if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {        g_vars->neventthread << 1;        bthread_t tid;        // transfer ownership as well, don't use s anymore!        Socket* const p = s.release();        bthread_attr_t attr = thread_attr;        attr.keytable_pool = p->_keytable_pool;        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {            LOG(FATAL) << "Fail to start ProcessEvent";            ProcessEvent(p);        }    }    return 0;}void* Socket::ProcessEvent(void* arg) {    SocketUniquePtr s(static_cast<Socket*>(arg));    // 调用OnNewConnections    s->_on_edge_triggered_events(s.get());    return NULL;}

Address依据listend_fd获取信息,在启动协程,调用回调OnNewConnections

OnNewConnections接管连贯申请

void Acceptor::OnNewConnections(Socket* acception) {    int progress = Socket::PROGRESS_INIT;    do {          while (1) {          struct sockaddr in_addr;          socklen_t in_len = sizeof(in_addr);          // 承受到client发动连贯服务器的申请          butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));          if (in_fd < 0) {              // 非阻塞,曾经解决了所有的连贯,则返回              if (errno == EAGAIN) {                  return;              }              // 其余的一些起因,须要持续接管              continue;          }          Acceptor* am = dynamic_cast<Acceptor*>(acception->user());          if (NULL == am) {              LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";              acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");              return;          }          SocketId socket_id;          SocketOptions options;          options.keytable_pool = am->_keytable_pool;          options.fd = in_fd;          options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);          options.user = acception->user();          // 执行read读取缓冲区的数据          options.on_edge_triggered_events = InputMessenger::OnNewMessages;          options.initial_ssl_ctx = am->_ssl_ctx;        // 跟listen_fd一样的解决,创立接管的fd。并将fd注册到epoll队列,启动协程当客户端端发送数据申请时。          if (Socket::Create(options, &socket_id) != 0) {              LOG(ERROR) << "Fail to create Socket";              continue;          }          in_fd.release(); // transfer ownership to socket_id                              SocketUniquePtr sock;          // 开释回收socket          if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {              bool is_running = true;              {                  BAIDU_SCOPED_LOCK(am->_map_mutex);                  is_running = (am->status() == RUNNING);                  // fd统计,检测                  am->_socket_map.insert(socket_id, ConnectStatistics());              }              if (!is_running) {                  LOG(WARNING) << "Acceptor on fd=" << acception->fd()                      << " has been stopped, discard newly created " << *sock;                  sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "                          "discard newly created %s", acception->fd(),                          sock->description().c_str());                  return;              }          }       }      if (acception->Failed()) {        return;      }    // 这里对应因为只有一个协程同时解决同一个fd事件    } while (acception->MoreReadEvents(&progress));}

这里accept获取到客户端发动的连贯申请。并将fd也注册到epoll队列中。

OnNewMessages读事件调用

void InputMessenger::OnNewMessages(Socket* m) {    InputMessenger* messenger = static_cast<InputMessenger*>(m->user());    const InputMessageHandler* handlers = messenger->_handlers;    int progress = Socket::PROGRESS_INIT;    std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;    bool read_eof = false;    while (!read_eof) {        const int64_t received_us = butil::cpuwide_time_us();        const int64_t base_realtime = butil::gettimeofday_us() - received_us;        size_t once_read = m->_avg_msg_size * 16;        // 依据之前接管的包大大小,来判断本次要接管解决的数据        if (once_read < MIN_ONCE_READ) {            once_read = MIN_ONCE_READ;        } else if (once_read > MAX_ONCE_READ) {            once_read = MAX_ONCE_READ;        }        // read读取数据        const ssize_t nr = m->DoRead(once_read);        if (nr <= 0) {            // 对端敞开连贯            if (0 == nr) {                read_eof = true;                            } else if (errno != EAGAIN) {                if (errno == EINTR) {                    continue;  // just retry                }                const int saved_errno = errno;                PLOG(WARNING) << "Fail to read from " << *m;                m->SetFailed(saved_errno, "Fail to read from %s: %s",                             m->description().c_str(), berror(saved_errno));                return;            } else if (!m->MoreReadEvents(&progress)) {                // EAGAIN or EWOULDBLOCK 谬误阐明缓冲区没有数据可读了                return;            } else {                 // 对应于咱们每个fd只用一个协程解决,所以持续接管                continue;            }        }        // 统计等应用        m->AddInputBytes(nr);        // Avoid this socket to be closed due to idle_timeout_s        m->_last_readtime_us.store(received_us, butil::memory_order_relaxed);                size_t last_size = m->_read_buf.length();        int num_bthread_created = 0;        while (1) {            size_t index = 8888;              // 解析client传过来的数据,index记录解析的协定时哪种            ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);            if (!pr.is_ok()) {                // 半包状况,持续收包                if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {                    m->_last_msg_size += (last_size - m->_read_buf.length());                    break;                // 其余谬误退出                } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {                    LOG(WARNING)                        << "Close " << *m << " due to unknown message: "                        << butil::ToPrintable(m->_read_buf);                    m->SetFailed(EINVAL, "Close %s due to unknown message",                                 m->description().c_str());                    return;                } else {                    LOG(WARNING) << "Close " << *m << ": " << pr.error_str();                    m->SetFailed(EINVAL, "Close %s: %s",                                 m->description().c_str(), pr.error_str());                    return;                }            }            m->AddInputMessages(1);            // Calculate average size of messages            const size_t cur_size = m->_read_buf.length();            if (cur_size == 0) {                m->_read_buf.return_cached_blocks();            }            // 一个包的大小last_size - cur_size            m->_last_msg_size += (last_size - cur_size);            last_size = cur_size;            const size_t old_avg = m->_avg_msg_size;            if (old_avg != 0) {                m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)                / MSG_SIZE_WINDOW;            } else {                m->_avg_msg_size = m->_last_msg_size;            }            m->_last_msg_size = 0;                        if (pr.message() == NULL) { // the Process() step can be skipped.                continue;            }            pr.message()->_received_us = received_us;            pr.message()->_base_real_us = base_realtime;                        DestroyingPtr<InputMessageBase> msg(pr.message());            // 第一次的时候,last_msg为null。如果退出RunLastMessage会调用ProcessInputMessage                       // QueueMessage也会启动协程调用 ProcessInputMessage            QueueMessage(last_msg.release(), &num_bthread_created,                             m->_keytable_pool);            if (handlers[index].process == NULL) {                LOG(ERROR) << "process of index=" << index << " is NULL";                continue;            }            m->ReAddress(&msg->_socket);            m->PostponeEOF();            msg->_process = handlers[index].process;            msg->_arg = handlers[index].arg;                        if (handlers[index].verify != NULL) {                int auth_error = 0;                if (0 == m->FightAuthentication(&auth_error)) {                    // Get the right to authenticate                    if (handlers[index].verify(msg.get())) {                        m->SetAuthentication(0);                    } else {                        m->SetAuthentication(ERPCAUTH);                        LOG(WARNING) << "Fail to authenticate " << *m;                        m->SetFailed(ERPCAUTH, "Fail to authenticate %s",                                     m->description().c_str());                        return;                    }                } else {                    LOG_IF(FATAL, auth_error != 0) <<                      "Impossible! Socket should have been "                      "destroyed when authentication failed";                }            }            if (!m->is_read_progressive()) {                // msg置换给last_msg                last_msg.reset(msg.release());            } else {                QueueMessage(msg.release(), &num_bthread_created,                                 m->_keytable_pool);                bthread_flush();                num_bthread_created = 0;            }        }        if (num_bthread_created) {            bthread_flush();        }    }    if (read_eof) {        m->SetEOF();    }}

DoRead读取缓冲区的数据。CutInputMessage校验每次获取的数据是否正确。咱们拿http的协定来举例。之后校验胜利后,启动协程调用QueueMessage。

包校验CutInputMessage

ParseResult InputMessenger::CutInputMessage(        Socket* m, size_t* index, bool read_eof) {    // 初始化为-1,存储上一次fd解析的协定,缩小遍历次数    const int preferred = m->preferred_index();    const int max_index = (int)_max_index.load(butil::memory_order_acquire);    // 先解析上次解析过的协定    if (preferred >= 0 && preferred <= max_index            && _handlers[preferred].parse != NULL) {        ParseResult result =          // 依照http协定间隔,解析应用调用的函数为ParseHttpMessage            _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);        // 校验包不残缺返回        if (result.is_ok() ||            result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {            *index = preferred;            return result;        // 其余谬误返回        } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {            return result;        }        // 校验SocketUser是否为空        if (m->CreatedByConnect() &&            (ProtocolType)preferred != PROTOCOL_BAIDU_STD) {            // The protocol is fixed at client-side, no need to try others.            LOG(ERROR) << "Fail to parse response from " << m->remote_side()                       << " by " << _handlers[preferred].name                        << " at client-side";            return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);        }        // Clear context before trying next protocol which probably has        // an incompatible context with the current one.        if (m->parsing_context()) {            m->reset_parsing_context(NULL);        }        m->set_preferred_index(-1);    }    // 如果上次解析的解析不胜利,在从新从0开始解析。一个个协定解析    for (int i = 0; i <= max_index; ++i) {        // 之前解析过了,跳过        if (i == preferred || _handlers[i].parse == NULL) {            // Don't try preferred handler(already tried) or invalid handler            continue;        }        // 执行每个Protocol外头的Parse        ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);        if (result.is_ok() ||            result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {            m->set_preferred_index(i);            *index = i;            return result;        } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {            return result;        }        if (m->parsing_context()) {            m->reset_parsing_context(NULL);        }        // Try other protocols.    }    return MakeParseError(PARSE_ERROR_TRY_OTHERS);}

对收到的包,依据协定一个个解析。确认包是否正确

QueueMessage对收到的包,开启协程进行逻辑解决

static void QueueMessage(InputMessageBase* to_run_msg,                         int* num_bthread_created,                         bthread_keytable_pool_t* keytable_pool) {    if (!to_run_msg) {        return;    }    bthread_t th;    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?                          BTHREAD_ATTR_PTHREAD :                          BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;    tmp.keytable_pool = keytable_pool;    // 启动协程执行 ProcessInputMessage    if (bthread_start_background(            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {        ++*num_bthread_created;    } else {        ProcessInputMessage(to_run_msg);    }}void* ProcessInputMessage(void* void_arg) {    InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);    // 调用ProcessHttpRequest    msg->_process(msg);    return NULL;}

启动协程,调用ProcessInputMessage

以http协定为例ProcessHttpRequest执行调用

void ProcessHttpRequest(InputMessageBase *msg) {    const int64_t start_parse_us = butil::cpuwide_time_us();    // ParseHttpMessage时ParseFromIOBuf解析将数据都存储到HttpContext内    DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));    SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());    Socket* socket = socket_guard.get();    const Server* server = static_cast<const Server*>(msg->arg());    ScopedNonServiceError non_service_error(server);        // 创立controller    Controller* cntl = new (std::nothrow) Controller;    if (NULL == cntl) {        LOG(FATAL) << "Fail to new Controller";        return;    }    // 后续http回包都调用改析构函数    HttpResponseSender resp_sender(cntl);    resp_sender.set_received_us(msg->received_us());        // 判断是否是grpc协定    const bool is_http2 = imsg_guard->header().is_http2();    if (is_http2) {        H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);        resp_sender.set_h2_stream_id(h2_sctx->stream_id());    }    ControllerPrivateAccessor accessor(cntl);    HttpHeader& req_header = cntl->http_request();      // http head局部    imsg_guard->header().Swap(req_header);    // http body局部    butil::IOBuf& req_body = imsg_guard->body();    butil::EndPoint user_addr;    if (!GetUserAddressFromHeader(req_header, &user_addr)) {        user_addr = socket->remote_side();    }    ServerPrivateAccessor server_accessor(server);    const bool security_mode = server->options().security_mode() &&                               socket->user() == server_accessor.acceptor();    accessor.set_server(server)        .set_security_mode(security_mode)        .set_peer_id(socket->id())        .set_remote_side(user_addr)        .set_local_side(socket->local_side())        .set_auth_context(socket->auth_context())        .set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP)        .set_begin_time_us(msg->received_us())        .move_in_server_receiving_sock(socket_guard);    // 依据http对的包头,做一些逻辑解决,如log_id设置,trace_id    ...    ...    // 解析http urI,看是否和服务生成的map匹配    const Server::MethodProperty* const sp =        FindMethodPropertyByURI(path, server, &req_header._unresolved_path);    if (NULL == sp) {        if (security_mode) {            std::string escape_path;            WebEscape(path, &escape_path);            cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());        } else {            cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());        }        return;    } else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {        BadMethodRequest breq;        BadMethodResponse bres;        butil::StringSplitter split(path.c_str(), '/');        breq.set_service_name(std::string(split.field(), split.length()));        sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);        return;    }    // Switch to service-specific error.    non_service_error.release();    MethodStatus* method_status = sp->status;    resp_sender.set_method_status(method_status);    if (method_status) {        int rejected_cc = 0;        if (!method_status->OnRequested(&rejected_cc)) {            cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",                            sp->method->full_name().c_str(), rejected_cc);            return;        }    }    // 过载爱护的一些解决    if (!sp->is_builtin_service && !sp->params.is_tabbed) {        // 过载等        if (socket->is_overcrowded()) {            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",                            butil::endpoint2str(socket->remote_side()).c_str());            return;        }        // 限流        if (!server_accessor.AddConcurrency(cntl)) {            cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",                            server->options().max_concurrency);            return;        }        // 线程是否过多        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {            cntl->SetFailed(ELIMIT, "Too many user code to run when"                            " -usercode_in_pthread is on");            return;        }    } else if (security_mode) {        cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "                        "ServerOptions.internal_port=%d instead if you're in"                        " internal network", server->options().internal_port);        return;    }        //依据收到申请,调用对应的rpc办法,并依据办法创立req和resp    google::protobuf::Service* svc = sp->service;    const google::protobuf::MethodDescriptor* method = sp->method;    accessor.set_method(method);    google::protobuf::Message* req = svc->GetRequestPrototype(method).New();    resp_sender.own_request(req);    google::protobuf::Message* res = svc->GetResponsePrototype(method).New();    resp_sender.own_response(res);    if (__builtin_expect(!req || !res, 0)) {        PLOG(FATAL) << "Fail to new req or res";        cntl->SetFailed("Fail to new req or res");        return;    }    // http_body 转pb过程    if (sp->params.allow_http_body_to_pb &&        method->input_type()->field_count() > 0) {        // A protobuf service. No matter if Content-type is set to        // applcation/json or body is empty, we have to treat body as a json        // and try to convert it to pb, which guarantees that a protobuf        // service is always accessed with valid requests.        if (req_body.empty()) {            // Treat empty body specially since parsing it results in error            if (!req->IsInitialized()) {                cntl->SetFailed(EREQUEST, "%s needs to be created from a"                                " non-empty json, it has required fields.",                                req->GetDescriptor()->full_name().c_str());                return;            } // else all fields of the request are optional.        } else {            bool is_grpc_ct = false;            const HttpContentType content_type =                ParseContentType(req_header.content_type(), &is_grpc_ct);            const std::string* encoding = NULL;            // grpc协定依据不同的包头设置,如是否压缩,encoding等,对body校验解析            if (is_http2) {                if (is_grpc_ct) {                    bool grpc_compressed = false;                    if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {                        cntl->SetFailed(ERESPONSE, "Invalid gRPC response");                        return;                    }                    if (grpc_compressed) {                        encoding = req_header.GetHeader(common->GRPC_ENCODING);                        if (encoding == NULL) {                            cntl->SetFailed(                                EREQUEST, "Fail to find header `grpc-encoding'"                                " in compressed gRPC request");                            return;                        }                    }                    int64_t timeout_value_us =                        ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));                    if (timeout_value_us >= 0) {                        accessor.set_deadline_us(                                butil::gettimeofday_us() + timeout_value_us);                    }                }            } else {                encoding = req_header.GetHeader(common->CONTENT_ENCODING);            }            if (encoding != NULL && *encoding == common->GZIP) {                TRACEPRINTF("Decompressing request=%lu",                            (unsigned long)req_body.size());                butil::IOBuf uncompressed;                if (!policy::GzipDecompress(req_body, &uncompressed)) {                    cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");                    return;                }                req_body.swap(uncompressed);            }            if (content_type == HTTP_CONTENT_PROTO) {                if (!ParsePbFromIOBuf(req, req_body)) {                    cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",                                    req->GetDescriptor()->full_name().c_str());                    return;                }            } else {                butil::IOBufAsZeroCopyInputStream wrapper(req_body);                std::string err;                json2pb::Json2PbOptions options;                options.base64_to_bytes = sp->params.pb_bytes_to_base64;                cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);                if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {                    cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",                                    req->GetDescriptor()->full_name().c_str(), err.c_str());                    return;                }            }        }    } else {        // 如果不解析body to pb局部,则保留原始数据        cntl->request_attachment().swap(req_body);    }        // 创立done节点    google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);    imsg_guard.reset();  // optional, just release resourse ASAP    // 这里开始调用rpc生成的办法,如echo.proto    if (!FLAGS_usercode_in_pthread) {        return svc->CallMethod(method, cntl, req, res, done);    }    if (BeginRunningUserCode()) {        svc->CallMethod(method, cntl, req, res, done);        return EndRunningUserCodeInPlace();    } else {        return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);    }}

将收到的数据进行转化,创立control,req、resp、done等数据。最终调用CallMethod办法

echo.proto中echo.pb.cc的CallMethod办法

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,                             ::google::protobuf::RpcController* controller,                             const ::google::protobuf::Message* request,                             ::google::protobuf::Message* response,                             ::google::protobuf::Closure* done) {  GOOGLE_DCHECK_EQ(method->service(),                          protobuf_echo_2eproto::file_level_service_descriptors[0]);  switch(method->index()) {    case 0:      // 调用impl中的Echo办法      Echo(controller,             ::google::protobuf::down_cast<const ::example::EchoRequest*>(request),             ::google::protobuf::down_cast< ::example::EchoResponse*>(response),             done);      break;    default:      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";      break;  }}

调用echo办法,就是调用了EchoServiceImpl::echo

EchoServiceImpl::Echo

class EchoServiceImpl : public EchoService {public:    EchoServiceImpl() {};    virtual ~EchoServiceImpl() {};    virtual void Echo(google::protobuf::RpcController* cntl,                      const EchoRequest* request,                      EchoResponse* response,                      google::protobuf::Closure* done) {      // 写本人的业务逻辑    }};

至此咱们曾经收到数据做业务逻辑解决了

总结

brpc作为服务端整个收包过程根本就是这样,置信认真看完这篇文章。你对服务端必定会有肯定的播种。前面咱们在再写文章剖析介绍brpc服务端回包。 client收发包。brpc协程,socket套接字资源管理等。 关注我,我是dandyhuang。也可wx收dandyhuang_,有什么问题咱们能够一起探讨交换。