关于c++:Brpc-服务端收包源码分析一

42次阅读

共计 30426 个字符,预计需要花费 77 分钟才能阅读完成。

大家好,我是 dandyhuang,brpc 在 c 艹届还是比拟牛逼的 rpc 框架,本次带来 brpc 的 server 端的源码剖析。剖析源码前,大家先搭建好环境,有利于代码调试和了解。依照 brpc 框架中 example 的 echo_c++ 为例子,并且将 protobuf 中,编译出的两头文件 echo.pb.cc 和.h 保留,有利于咱们更好的代码了解。

server 端应用

namespace example {
class EchoServiceImpl : public EchoService {
public:
    EchoServiceImpl() {};
    virtual ~EchoServiceImpl() {};
    virtual void Echo(google::protobuf::RpcController* cntl_base,
                      const EchoRequest* request,
                      EchoResponse* response,
                      google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl =
            static_cast<brpc::Controller*>(cntl_base);
        response->set_message(request->message());
        if (FLAGS_echo_attachment) {cntl->response_attachment().append(cntl->request_attachment());
        }
    }
};
int main(int argc, char* argv[]) {
    // Parse gflags. We recommend you to use gflags as well.
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    brpc::Server server;
      // 继承 proto 文件
    example::EchoServiceImpl echo_service_impl;
    if (server.AddService(&echo_service_impl, 
                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";
        return -1;
    }

    butil::EndPoint point;
    point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
    // Start the server.
    brpc::ServerOptions options;
    options.idle_timeout_sec = FLAGS_idle_timeout_s;
    if (server.Start(point, &options) != 0) {LOG(ERROR) << "Fail to start EchoServer";
        return -1;
    }

    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
    server.RunUntilAskedToQuit();
    return 0;
}

总体代码还是比拟简洁,如果可能依据 IDL 主动代码生成那样就更完满了。话不多说,间接开撸源码

brpc::Server::AddService 初始化各种数据

int Server::AddServiceInternal(google::protobuf::Service* service,
                               bool is_builtin_service,
                               const ServiceOptions& svc_opt) {
    // 如果 idl 中 imp 没有定义方法,那么校验失败
    const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
    if (sd->method_count() == 0) {LOG(ERROR) << "service=" << sd->full_name()
                   << "does not have any method.";
        return -1;
    }
        // 初始化并注册:NamingService,LoadBalancer,CompressHandler,protocols 等
    // 后续的收发包校验函数都会应用
    if (InitializeOnce() != 0) {LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
        return -1;
    }
    // InitializeOnce 初始化失败则退出
    if (status() != READY) {LOG(ERROR) << "Can't add service="<< sd->full_name() <<" to Server["<< version() <<"] which is " << status_str(status());
        return -1;
    }
    // defined `option (idl_support) = true' or not.
    const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
        
    Tabbed* tabbed = dynamic_cast<Tabbed*>(service);
    // 初始化定义的 service
    for (int i = 0; i < sd->method_count(); ++i) {const google::protobuf::MethodDescriptor* md = sd->method(i);
        MethodProperty mp;
        mp.is_builtin_service = is_builtin_service;
        mp.own_method_status = true;
        mp.params.is_tabbed = !!tabbed;
        mp.params.allow_default_url = svc_opt.allow_default_url;
        mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
        mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
        mp.service = service;
        mp.method = md;
        mp.status = new MethodStatus;
        _method_map[md->full_name()] = mp;
        if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {
            MethodProperty mp2 = mp;
            mp2.own_method_status = false;
            // have to map service_name + method_name as well because ubrpc
            // does not send the namespace before service_name.
            std::string full_name_wo_ns;
            full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
            full_name_wo_ns.append(sd->name());
            full_name_wo_ns.push_back('.');
            full_name_wo_ns.append(md->name());
            if (_method_map.seek(full_name_wo_ns) == NULL) {_method_map[full_name_wo_ns] = mp2;
            } else {LOG(ERROR) << '`' << full_name_wo_ns << "'already exists";
                RemoveMethodsOf(service);
                return -1;
            }
        }
    }

    const ServiceProperty ss = {is_builtin_service, svc_opt.ownership, service, NULL};
    _fullname_service_map[sd->full_name()] = ss;
    _service_map[sd->name()] = ss;
    if (is_builtin_service) {++_builtin_service_count;} else {if (_first_service == NULL) {_first_service = service;}
    }

    butil::StringPiece restful_mappings = svc_opt.restful_mappings;
    restful_mappings.trim_spaces();
    // restful_mappings 解析咱们临时就不剖析了,次要就是匹配办法
    if (!restful_mappings.empty()) {
        // Parse the mappings.
        ···
        ···
        ···
    }
        // AddBuiltinService 实例化。idl 生成的服务,不属于 tabbed
    if (tabbed) {if (_tab_info_list == NULL) {_tab_info_list = new TabInfoList;}
        const size_t last_size = _tab_info_list->size();
        tabbed->GetTabInfo(_tab_info_list);
        const size_t cur_size = _tab_info_list->size();
        for (size_t i = last_size; i != cur_size; ++i) {const TabInfo& info = (*_tab_info_list)[i];
            if (!info.valid()) {LOG(ERROR) << "Invalid TabInfo: path=" << info.path
                           << "tab_name=" << info.tab_name;
                _tab_info_list->resize(last_size);
                RemoveService(service);
                return -1;
            }
        }
    }
    return 0;
}

AddServiceInternal 中,能够用于 echo.proto 生成的服务和 BuiltinService 的服务校验,初始化等。如办法是否有没定义,存储映射的关系:_method_map、_service_map、_fullname_service_map 等

StartInternal 外部其余服务也调用该函数

int Server::StartInternal(const butil::ip_t& ip,
                          const PortRange& port_range,
                          const ServerOptions *opt) {std::unique_ptr<Server, RevertServerStatus> revert_server(this);
    if (_failed_to_set_max_concurrency_of_method) {
        _failed_to_set_max_concurrency_of_method = false;
        LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed,"
            "fix it before starting server";
        return -1;
    }
    // addserver 已初始化
    if (InitializeOnce() != 0) {LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
        return -1;
    }
    const Status st = status();
    // addserver 初始化胜利,设置为 ready 就绪状态
    if (st != READY) {if (st == RUNNING) {LOG(ERROR) << "Server[" << version() << "] is already running on"
                       << _listen_addr;
        } else {LOG(ERROR) << "Can't start Server[" << version()
                       << "] which is" << status_str(status());
        }
        return -1;
    }
    if (opt) {_options = *opt;} else {_options = ServerOptions();
    }

    // Init _keytable_pool always. If the server was stopped before, the pool
    // should be destroyed in Join().
    _keytable_pool = new bthread_keytable_pool_t;
    if (bthread_keytable_pool_init(_keytable_pool) != 0) {LOG(ERROR) << "Fail to init _keytable_pool";
        delete _keytable_pool;
        _keytable_pool = NULL;
        return -1;
    }
  
    _tl_options = ThreadLocalOptions();
    _concurrency = 0;

    if (_options.has_builtin_services &&
        _builtin_service_count <= 0 &&
        AddBuiltinServices() != 0) {LOG(ERROR) << "Fail to add builtin services";
        return -1;
    }
    // If a server is started/stopped for mutiple times and one of the options
    // sets has_builtin_service to true, builtin services will be enabled for
    // any later re-start. Check this case and report to user.
    if (!_options.has_builtin_services && _builtin_service_count > 0) {LOG(ERROR) << "A server started/stopped for multiple times must be"
            "consistent on ServerOptions.has_builtin_services";
        return -1;
    }

    // Prepare all restful maps
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
         it != _fullname_service_map.end(); ++it) {if (it->second.restful_map) {it->second.restful_map->PrepareForFinding();
        }
    }
    if (_global_restful_map) {_global_restful_map->PrepareForFinding();
    }
        // cpu 核数 +1,设置协程数量
    if (_options.num_threads > 0) {if (FLAGS_usercode_in_pthread) {_options.num_threads += FLAGS_usercode_backup_threads;}
        if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {_options.num_threads = BTHREAD_MIN_CONCURRENCY;}
        bthread_setconcurrency(_options.num_threads);
    }
        // 设置限流 auto、constant、unlimited
    for (MethodMap::iterator it = _method_map.begin();
        it != _method_map.end(); ++it) {if (it->second.is_builtin_service) {it->second.status->SetConcurrencyLimiter(NULL);
        } else {
            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {amc = &_options.method_max_concurrency;}
            ConcurrencyLimiter* cl = NULL;
            if (!CreateConcurrencyLimiter(*amc, &cl)) {LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
                return -1;
            }
            it->second.status->SetConcurrencyLimiter(cl);
        }
    }

    _listen_addr.ip = ip;
    for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
        _listen_addr.port = port;
        // 创立 listen_fd 套接字
        butil::fd_guard sockfd(tcp_listen(_listen_addr));
        if (sockfd < 0) {if (port != port_range.max_port) { // not the last port, try next
                continue;
            }
            if (port_range.min_port != port_range.max_port) {LOG(ERROR) << "Fail to listen" << ip
                           << ":[" << port_range.min_port << '-'
                           << port_range.max_port << ']';
            } else {LOG(ERROR) << "Fail to listen" << _listen_addr;
            }
            return -1;
        }
        if (_listen_addr.port == 0) {
            // port=0 makes kernel dynamically select a port from
            // https://en.wikipedia.org/wiki/Ephemeral_port
            _listen_addr.port = get_port_from_fd(sockfd);
            if (_listen_addr.port <= 0) {LOG(ERROR) << "Fail to get port from fd=" << sockfd;
                return -1;
            }
        }
        if (_am == NULL) {
           // 创立接收器,将解析,打包等协定都存储起来
            _am = BuildAcceptor();
            if (NULL == _am) {LOG(ERROR) << "Fail to build acceptor";
                return -1;
            }
        }
        // Set `_status' to RUNNING before accepting connections
        // to prevent requests being rejected as ELOGOFF
        _status = RUNNING;
        time(&_last_start_time);÷
        // 记录一些信息到 version 中
        GenerateVersionIfNeeded();
        g_running_server_count.fetch_add(1, butil::memory_order_relaxed);

        // 开启协程,创立 epoll_create, 接管客户端的连贯、申请
        if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
                             _default_ssl_ctx) != 0) {LOG(ERROR) << "Fail to start acceptor";
            return -1;
        }
        sockfd.release();
        break; // stop trying
    }
    // 内置服务 accept 创立的信息
    if (_options.internal_port >= 0 && _options.has_builtin_services) {
       // 同样的解决形式
       ...
    }
        // pid 写入文件
    PutPidFileIfNeeded();

    // 更新指标连贯信息等
    CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
    if (bthread_start_background(&_derivative_thread, NULL,
                                 UpdateDerivedVars, this) != 0) {LOG(ERROR) << "Fail to create _derivative_thread";
        return -1;
    }

    // Print tips to server launcher.
    int http_port = _listen_addr.port;
    std::ostringstream server_info;
    server_info << "Server[" << version() << "] is serving on port="
                << _listen_addr.port;
    if (_options.internal_port >= 0 && _options.has_builtin_services) {
        http_port = _options.internal_port;
        server_info << "and internal_port=" << _options.internal_port;
    }
    LOG(INFO) << server_info.str() << '.';

    if (_options.has_builtin_services) {LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
                  << http_port << "in web browser.";
    } else {LOG(WARNING) << "Builtin services are disabled according to"
            "ServerOptions.has_builtin_services";
    }

    // 设置链路追踪的地址
    SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
    revert_server.release();
    return 0;
}

StartInternal 中,创立了套接字、创立接收器。外围还是在 StartAccept 中。

接管连贯套接字 StartAccept 申请

int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
                          const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
   // 校验套接字合法性 
   if (listened_fd < 0) {LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
        return -1;
    }
    ...
    // Creation of _acception_id is inside lock so that OnNewConnections
    // (which may run immediately) should see sane fields set below.
    SocketOptions options;
    options.fd = listened_fd;
    options.user = this;
    // 设置回调,到时候 epoll 收到连贯的申请的时候,会调用这个函数
    options.on_edge_triggered_events = OnNewConnections;
    if (Socket::Create(options, &_acception_id) != 0) {
        // Close-idle-socket thread will be stopped inside destructor
        LOG(FATAL) << "Fail to create _acception_id";
        return -1;
    }
    
    _listened_fd = listened_fd;
    _status = RUNNING;
    return 0;
}

int Socket::Create(const SocketOptions& options, SocketId* id) {
    butil::ResourceId<Socket> slot;
    // 获取创立套接字
    Socket* const m = butil::get_resource(&slot, Forbidden());
    if (m == NULL) {LOG(FATAL) << "Fail to get_resource<Socket>";
        return -1;
    }
    // 初始化 socket
    g_vars->nsocket << 1;
    CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
    m->_nevent.store(0, butil::memory_order_relaxed);
    m->_keytable_pool = options.keytable_pool;
    m->_tos = 0;
    m->_remote_side = options.remote_side;
    m->_on_edge_triggered_events = options.on_edge_triggered_events;
    m->_this_id = MakeSocketId(
            VersionOfVRef(m->_versioned_ref.fetch_add(1, butil::memory_order_release)), slot);
    m->_preferred_index = -1;
    m->_hc_count = 0;
    CHECK(m->_read_buf.empty());
    const int64_t cpuwide_now = butil::cpuwide_time_us();
    m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
    m->reset_parsing_context(options.initial_parsing_context);
    m->_correlation_id = 0;
    m->_health_check_interval_s = options.health_check_interval_s;
    m->_ninprocess.store(1, butil::memory_order_relaxed);
    m->_auth_flag_error.store(0, butil::memory_order_relaxed);
    const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
    if (rc2) {LOG(ERROR) << "Fail to create auth_id:" << berror(rc2);
        m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
        return -1;
    }
    // NOTE: last two params are useless in bthread > r32787
    const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
    if (rc) {LOG(ERROR) << "Fail to init _id_wait_list:" << berror(rc);
        m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
        return -1;
    }
    m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
    m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
    CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
    // Must be last one! Internal fields of this Socket may be access
    // just after calling ResetFileDescriptor.
    // 这里设置套接字的一些信息,并且把 fd 注册到 epoll 队列
    if (m->ResetFileDescriptor(options.fd) != 0) {
        const int saved_errno = errno;
        PLOG(ERROR) << "Fail to ResetFileDescriptor";
        m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", 
                     berror(saved_errno));
        return -1;
    }
    *id = m->_this_id;
    return 0;
}

调用 socket::Create 来创立套接字,这里套接字的 accept、connect 等,都是用这个函数创立的。on_edge_triggered_events 用来回调解决,这里当产生客户端链接的事件时,咱们调用回调 OnNewConnections。

ResetFileDescriptor 设置 socket 信息,并注册 epoll 中

int Socket::ResetFileDescriptor(int fd) {
    // Reset message sizes when fd is changed.
    _last_msg_size = 0;
    _avg_msg_size = 0;
    _fd.store(fd, butil::memory_order_release);
    _reset_fd_real_us = butil::gettimeofday_us();
    // 校验 fd 是否非法
    if (!ValidFileDescriptor(fd)) {return 0;}
    // 获取本地 ip 信息
    if (butil::get_local_side(fd, &_local_side) != 0) {_local_side = butil::EndPoint();
    }
    // 敞开一些子过程等无用文件描述符
    butil::make_close_on_exec(fd);
    // 设置非阻塞
    if (butil::make_non_blocking(fd) != 0) {PLOG(ERROR) << "Fail to set fd=" << fd << "to non-blocking";
        return -1;
    }
    // 敞开 Nagle 算法
    butil::make_no_delay(fd);
    if (_tos > 0 &&
        setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {PLOG(FATAL) << "Fail to set tos of fd=" << fd << "to" << _tos;
    }
        // 设置发送缓冲区
    if (FLAGS_socket_send_buffer_size > 0) {
        int buff_size = FLAGS_socket_send_buffer_size;
        socklen_t size = sizeof(buff_size);
        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << "to" 
                        << buff_size;
        }
    }
    // 设置接收缓冲区大小        
    if (FLAGS_socket_recv_buffer_size > 0) {
        int buff_size = FLAGS_socket_recv_buffer_size;
        socklen_t size = sizeof(buff_size);
        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << "to" 
                        << buff_size;
        }
    }
        // 第一次创立 epoll_create,并将接管 fd 退出到 epoll_ctl_add 队列中
    if (_on_edge_triggered_events) {if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {PLOG(ERROR) << "Fail to add SocketId=" << id() 
                        << "into EventDispatcher";
            _fd.store(-1, butil::memory_order_release);
            return -1;
        }
    }
    return 0;
}

这里设置了 fd。并且启动协程,创立 epoll 队列。并将连贯设置为 ET(边际触发) 模式。此次,咱们就能够承受来自 client 端的申请了。

IO 多路复用 EventDispatcher::Run(),事件监听


void EventDispatcher::Run() {while (!_stop) {epoll_event e[32];
        // 立即返回
        int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
        // 没有事件的时候
        if (n == 0) {
            // 阻塞期待事件到来
            n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
        }
        if (_stop) {break;}
        if (n < 0) {if (EINTR == errno) {
                // We've checked _stop, no wake-up will be missed.
                continue;
            }
            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
            break;
        }
        // 事件 EPOLLIN
        for (int i = 0; i < n; ++i) {if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
                || (e[i].events & has_epollrdhup)
                ) {
                // We don't care about the return value.
                Socket::StartInputEvent(e[i].data.u64, e[i].events,
                                        _consumer_thread_attr);
            }
        }
        // 事件 EPOLLOUT
        for (int i = 0; i < n; ++i) {if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
                // We don't care about the return value.
                Socket::HandleEpollOut(e[i].data.u64);
            }
        }
    }
}

当有连贯事件时,触发 epolin,调用 StartInputEvent。

int Socket::StartInputEvent(SocketId id, uint32_t events,
                            const bthread_attr_t& thread_attr) {
    SocketUniquePtr s;
    // 依据之前 AddConsumer 的 id,找到对应 Socket::Create 创立的套接字映射
    if (Address(id, &s) < 0) {return -1;}
      // 校验回调函数是否为空,这里是 OnNewConnections
    if (NULL == s->_on_edge_triggered_events) {return 0;}
    if (s->fd() < 0) {CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
        return -1;
    }

    // 这里头,如果有多个事件产生,每个 fd 只保障只创立一个协程来解决。来保障并发平安问题
    if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
        g_vars->neventthread << 1;
        bthread_t tid;
        // transfer ownership as well, don't use s anymore!
        Socket* const p = s.release();
        bthread_attr_t attr = thread_attr;
        attr.keytable_pool = p->_keytable_pool;
        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {LOG(FATAL) << "Fail to start ProcessEvent";
            ProcessEvent(p);
        }
    }
    return 0;
}

void* Socket::ProcessEvent(void* arg) {SocketUniquePtr s(static_cast<Socket*>(arg));
    // 调用 OnNewConnections
    s->_on_edge_triggered_events(s.get());
    return NULL;
}

Address 依据 listend_fd 获取信息,在启动协程,调用回调 OnNewConnections

OnNewConnections 接管连贯申请

void Acceptor::OnNewConnections(Socket* acception) {
    int progress = Socket::PROGRESS_INIT;
    do {while (1) {
          struct sockaddr in_addr;
          socklen_t in_len = sizeof(in_addr);
          // 承受到 client 发动连贯服务器的申请
          butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
          if (in_fd < 0) {
              // 非阻塞,曾经解决了所有的连贯,则返回
              if (errno == EAGAIN) {return;}
              // 其余的一些起因,须要持续接管
              continue;
          }

          Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
          if (NULL == am) {LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
              acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
              return;
          }

          SocketId socket_id;
          SocketOptions options;
          options.keytable_pool = am->_keytable_pool;
          options.fd = in_fd;
          options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
          options.user = acception->user();
          // 执行 read 读取缓冲区的数据
          options.on_edge_triggered_events = InputMessenger::OnNewMessages;
          options.initial_ssl_ctx = am->_ssl_ctx;
        // 跟 listen_fd 一样的解决,创立接管的 fd。并将 fd 注册到 epoll 队列,启动协程当客户端端发送数据申请时。if (Socket::Create(options, &socket_id) != 0) {LOG(ERROR) << "Fail to create Socket";
              continue;
          }
          in_fd.release(); // transfer ownership to socket_id
                    
          SocketUniquePtr sock;
          // 开释回收 socket
          if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
              bool is_running = true;
              {BAIDU_SCOPED_LOCK(am->_map_mutex);
                  is_running = (am->status() == RUNNING);
                  // fd 统计,检测
                  am->_socket_map.insert(socket_id, ConnectStatistics());
              }
              if (!is_running) {LOG(WARNING) << "Acceptor on fd=" << acception->fd()
                      << "has been stopped, discard newly created" << *sock;
                  sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped,"
                          "discard newly created %s", acception->fd(),
                          sock->description().c_str());
                  return;
              }
          } 
      }

      if (acception->Failed()) {return;}
    // 这里对应因为只有一个协程同时解决同一个 fd 事件
    } while (acception->MoreReadEvents(&progress));
}

这里 accept 获取到客户端发动的连贯申请。并将 fd 也注册到 epoll 队列中。

OnNewMessages 读事件调用

void InputMessenger::OnNewMessages(Socket* m) {InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
    const InputMessageHandler* handlers = messenger->_handlers;
    int progress = Socket::PROGRESS_INIT;
    std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
    bool read_eof = false;
    while (!read_eof) {const int64_t received_us = butil::cpuwide_time_us();
        const int64_t base_realtime = butil::gettimeofday_us() - received_us;
        size_t once_read = m->_avg_msg_size * 16;
        // 依据之前接管的包大大小,来判断本次要接管解决的数据
        if (once_read < MIN_ONCE_READ) {once_read = MIN_ONCE_READ;} else if (once_read > MAX_ONCE_READ) {once_read = MAX_ONCE_READ;}

        // read 读取数据
        const ssize_t nr = m->DoRead(once_read);
        if (nr <= 0) {
            // 对端敞开连贯
            if (0 == nr) {read_eof = true;} else if (errno != EAGAIN) {if (errno == EINTR) {continue;  // just retry}
                const int saved_errno = errno;
                PLOG(WARNING) << "Fail to read from" << *m;
                m->SetFailed(saved_errno, "Fail to read from %s: %s",
                             m->description().c_str(), berror(saved_errno));
                return;
            } else if (!m->MoreReadEvents(&progress)) {
                // EAGAIN or EWOULDBLOCK 谬误阐明缓冲区没有数据可读了
                return;
            } else { 
                // 对应于咱们每个 fd 只用一个协程解决,所以持续接管
                continue;
            }
        }
        // 统计等应用
        m->AddInputBytes(nr);
        // Avoid this socket to be closed due to idle_timeout_s
        m->_last_readtime_us.store(received_us, butil::memory_order_relaxed);
        
        size_t last_size = m->_read_buf.length();
        int num_bthread_created = 0;
        while (1) {
            size_t index = 8888;
              // 解析 client 传过来的数据,index 记录解析的协定时哪种
            ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
            if (!pr.is_ok()) {
                // 半包状况,持续收包
                if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {m->_last_msg_size += (last_size - m->_read_buf.length());
                    break;
                // 其余谬误退出
                } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {LOG(WARNING)
                        << "Close" << *m << "due to unknown message:"
                        << butil::ToPrintable(m->_read_buf);
                    m->SetFailed(EINVAL, "Close %s due to unknown message",
                                 m->description().c_str());
                    return;
                } else {LOG(WARNING) << "Close" << *m << ":" << pr.error_str();
                    m->SetFailed(EINVAL, "Close %s: %s",
                                 m->description().c_str(), pr.error_str());
                    return;
                }
            }

            m->AddInputMessages(1);
            // Calculate average size of messages
            const size_t cur_size = m->_read_buf.length();
            if (cur_size == 0) {m->_read_buf.return_cached_blocks();
            }
            // 一个包的大小 last_size - cur_size
            m->_last_msg_size += (last_size - cur_size);
            last_size = cur_size;
            const size_t old_avg = m->_avg_msg_size;
            if (old_avg != 0) {m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)
                / MSG_SIZE_WINDOW;
            } else {m->_avg_msg_size = m->_last_msg_size;}
            m->_last_msg_size = 0;
            
            if (pr.message() == NULL) {// the Process() step can be skipped.
                continue;
            }
            pr.message()->_received_us = received_us;
            pr.message()->_base_real_us = base_realtime;
            
            DestroyingPtr<InputMessageBase> msg(pr.message());
            // 第一次的时候,last_msg 为 null。如果退出 RunLastMessage 会调用 ProcessInputMessage
                       // QueueMessage 也会启动协程调用 ProcessInputMessage
            QueueMessage(last_msg.release(), &num_bthread_created,
                             m->_keytable_pool);
            if (handlers[index].process == NULL) {LOG(ERROR) << "process of index=" << index << "is NULL";
                continue;
            }
            m->ReAddress(&msg->_socket);
            m->PostponeEOF();
            msg->_process = handlers[index].process;
            msg->_arg = handlers[index].arg;
            
            if (handlers[index].verify != NULL) {
                int auth_error = 0;
                if (0 == m->FightAuthentication(&auth_error)) {
                    // Get the right to authenticate
                    if (handlers[index].verify(msg.get())) {m->SetAuthentication(0);
                    } else {m->SetAuthentication(ERPCAUTH);
                        LOG(WARNING) << "Fail to authenticate" << *m;
                        m->SetFailed(ERPCAUTH, "Fail to authenticate %s",
                                     m->description().c_str());
                        return;
                    }
                } else {LOG_IF(FATAL, auth_error != 0) <<
                      "Impossible! Socket should have been"
                      "destroyed when authentication failed";
                }
            }
            if (!m->is_read_progressive()) {
                // msg 置换给 last_msg
                last_msg.reset(msg.release());
            } else {QueueMessage(msg.release(), &num_bthread_created,
                                 m->_keytable_pool);
                bthread_flush();
                num_bthread_created = 0;
            }
        }
        if (num_bthread_created) {bthread_flush();
        }
    }

    if (read_eof) {m->SetEOF();
    }
}

DoRead 读取缓冲区的数据。CutInputMessage 校验每次获取的数据是否正确。咱们拿 http 的协定来举例。之后校验胜利后,启动协程调用 QueueMessage。

包校验 CutInputMessage

ParseResult InputMessenger::CutInputMessage(Socket* m, size_t* index, bool read_eof) {
    // 初始化为 -1, 存储上一次 fd 解析的协定,缩小遍历次数
    const int preferred = m->preferred_index();
    const int max_index = (int)_max_index.load(butil::memory_order_acquire);
    // 先解析上次解析过的协定
    if (preferred >= 0 && preferred <= max_index
            && _handlers[preferred].parse != NULL) {
        ParseResult result =
          // 依照 http 协定间隔,解析应用调用的函数为 ParseHttpMessage
            _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);
        // 校验包不残缺返回
        if (result.is_ok() ||
            result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
            *index = preferred;
            return result;
        // 其余谬误返回
        } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {return result;}
        // 校验 SocketUser 是否为空
        if (m->CreatedByConnect() &&
            (ProtocolType)preferred != PROTOCOL_BAIDU_STD) {
            // The protocol is fixed at client-side, no need to try others.
            LOG(ERROR) << "Fail to parse response from" << m->remote_side()
                       << "by" << _handlers[preferred].name 
                       << "at client-side";
            return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
        }
        // Clear context before trying next protocol which probably has
        // an incompatible context with the current one.
        if (m->parsing_context()) {m->reset_parsing_context(NULL);
        }
        m->set_preferred_index(-1);
    }
    // 如果上次解析的解析不胜利,在从新从 0 开始解析。一个个协定解析
    for (int i = 0; i <= max_index; ++i) {
        // 之前解析过了,跳过
        if (i == preferred || _handlers[i].parse == NULL) {// Don't try preferred handler(already tried) or invalid handler
            continue;
        }
        // 执行每个 Protocol 外头的 Parse
        ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);
        if (result.is_ok() ||
            result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {m->set_preferred_index(i);
            *index = i;
            return result;
        } else if (result.error() != PARSE_ERROR_TRY_OTHERS) {return result;}
        if (m->parsing_context()) {m->reset_parsing_context(NULL);
        }
        // Try other protocols.
    }
    return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}

对收到的包,依据协定一个个解析。确认包是否正确

QueueMessage 对收到的包,开启协程进行逻辑解决

static void QueueMessage(InputMessageBase* to_run_msg,
                         int* num_bthread_created,
                         bthread_keytable_pool_t* keytable_pool) {if (!to_run_msg) {return;}
    bthread_t th;
    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
                          BTHREAD_ATTR_PTHREAD :
                          BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
    tmp.keytable_pool = keytable_pool;
    // 启动协程执行 ProcessInputMessage
    if (bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {++*num_bthread_created;} else {ProcessInputMessage(to_run_msg);
    }
}

void* ProcessInputMessage(void* void_arg) {InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
    // 调用 ProcessHttpRequest
    msg->_process(msg);
    return NULL;
}

启动协程,调用 ProcessInputMessage

以 http 协定为例 ProcessHttpRequest 执行调用

void ProcessHttpRequest(InputMessageBase *msg) {const int64_t start_parse_us = butil::cpuwide_time_us();
    // ParseHttpMessage 时 ParseFromIOBuf 解析将数据都存储到 HttpContext 内
    DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
    SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg->arg());
    ScopedNonServiceError non_service_error(server);
        // 创立 controller
    Controller* cntl = new (std::nothrow) Controller;
    if (NULL == cntl) {LOG(FATAL) << "Fail to new Controller";
        return;
    }
    // 后续 http 回包都调用改析构函数
    HttpResponseSender resp_sender(cntl);
    resp_sender.set_received_us(msg->received_us());
        // 判断是否是 grpc 协定
    const bool is_http2 = imsg_guard->header().is_http2();
    if (is_http2) {H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
        resp_sender.set_h2_stream_id(h2_sctx->stream_id());
    }

    ControllerPrivateAccessor accessor(cntl);
    HttpHeader& req_header = cntl->http_request();
      // http head 局部
    imsg_guard->header().Swap(req_header);
    // http body 局部
    butil::IOBuf& req_body = imsg_guard->body();

    butil::EndPoint user_addr;
    if (!GetUserAddressFromHeader(req_header, &user_addr)) {user_addr = socket->remote_side();
    }
    ServerPrivateAccessor server_accessor(server);
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(user_addr)
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);
    // 依据 http 对的包头,做一些逻辑解决,如 log_id 设置,trace_id
    ...
    ...
    // 解析 http urI,看是否和服务生成的 map 匹配
    const Server::MethodProperty* const sp =
        FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
    if (NULL == sp) {if (security_mode) {
            std::string escape_path;
            WebEscape(path, &escape_path);
            cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());
        } else {cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
        }
        return;
    } else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {
        BadMethodRequest breq;
        BadMethodResponse bres;
        butil::StringSplitter split(path.c_str(), '/');
        breq.set_service_name(std::string(split.field(), split.length()));
        sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);
        return;
    }
    // Switch to service-specific error.
    non_service_error.release();
    MethodStatus* method_status = sp->status;
    resp_sender.set_method_status(method_status);
    if (method_status) {
        int rejected_cc = 0;
        if (!method_status->OnRequested(&rejected_cc)) {
            cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                            sp->method->full_name().c_str(), rejected_cc);
            return;
        }
    }

    // 过载爱护的一些解决
    if (!sp->is_builtin_service && !sp->params.is_tabbed) {
        // 过载等
        if (socket->is_overcrowded()) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            return;
        }
        // 限流
        if (!server_accessor.AddConcurrency(cntl)) {
            cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
                            server->options().max_concurrency);
            return;
        }
        // 线程是否过多
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            "-usercode_in_pthread is on");
            return;
        }
    } else if (security_mode) {
        cntl->SetFailed(EPERM, "Not allowed to access builtin services, try"
                        "ServerOptions.internal_port=%d instead if you're in"" internal network", server->options().internal_port);
        return;
    }
        // 依据收到申请,调用对应的 rpc 办法,并依据办法创立 req 和 resp
    google::protobuf::Service* svc = sp->service;
    const google::protobuf::MethodDescriptor* method = sp->method;
    accessor.set_method(method);
    google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
    resp_sender.own_request(req);
    google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
    resp_sender.own_response(res);

    if (__builtin_expect(!req || !res, 0)) {PLOG(FATAL) << "Fail to new req or res";
        cntl->SetFailed("Fail to new req or res");
        return;
    }
    // http_body 转 pb 过程
    if (sp->params.allow_http_body_to_pb &&
        method->input_type()->field_count() > 0) {
        // A protobuf service. No matter if Content-type is set to
        // applcation/json or body is empty, we have to treat body as a json
        // and try to convert it to pb, which guarantees that a protobuf
        // service is always accessed with valid requests.
        if (req_body.empty()) {
            // Treat empty body specially since parsing it results in error
            if (!req->IsInitialized()) {
                cntl->SetFailed(EREQUEST, "%s needs to be created from a"
                                "non-empty json, it has required fields.",
                                req->GetDescriptor()->full_name().c_str());
                return;
            } // else all fields of the request are optional.
        } else {
            bool is_grpc_ct = false;
            const HttpContentType content_type =
                ParseContentType(req_header.content_type(), &is_grpc_ct);
            const std::string* encoding = NULL;
            // grpc 协定依据不同的包头设置,如是否压缩,encoding 等,对 body 校验解析
            if (is_http2) {if (is_grpc_ct) {
                    bool grpc_compressed = false;
                    if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
                        return;
                    }
                    if (grpc_compressed) {encoding = req_header.GetHeader(common->GRPC_ENCODING);
                        if (encoding == NULL) {
                            cntl->SetFailed(
                                EREQUEST, "Fail to find header `grpc-encoding'"
                                "in compressed gRPC request");
                            return;
                        }
                    }
                    int64_t timeout_value_us =
                        ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
                    if (timeout_value_us >= 0) {
                        accessor.set_deadline_us(butil::gettimeofday_us() + timeout_value_us);
                    }
                }
            } else {encoding = req_header.GetHeader(common->CONTENT_ENCODING);
            }
            if (encoding != NULL && *encoding == common->GZIP) {
                TRACEPRINTF("Decompressing request=%lu",
                            (unsigned long)req_body.size());
                butil::IOBuf uncompressed;
                if (!policy::GzipDecompress(req_body, &uncompressed)) {cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
                    return;
                }
                req_body.swap(uncompressed);
            }
            if (content_type == HTTP_CONTENT_PROTO) {if (!ParsePbFromIOBuf(req, req_body)) {
                    cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
                                    req->GetDescriptor()->full_name().c_str());
                    return;
                }
            } else {butil::IOBufAsZeroCopyInputStream wrapper(req_body);
                std::string err;
                json2pb::Json2PbOptions options;
                options.base64_to_bytes = sp->params.pb_bytes_to_base64;
                cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);
                if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
                    cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
                                    req->GetDescriptor()->full_name().c_str(), err.c_str());
                    return;
                }
            }
        }
    } else {
        // 如果不解析 body to pb 局部,则保留原始数据
        cntl->request_attachment().swap(req_body);
    }
        // 创立 done 节点
    google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
    imsg_guard.reset();  // optional, just release resourse ASAP

    // 这里开始调用 rpc 生成的办法,如 echo.proto
    if (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl, req, res, done);
    }
    if (BeginRunningUserCode()) {svc->CallMethod(method, cntl, req, res, done);
        return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
    }
}

将收到的数据进行转化,创立 control,req、resp、done 等数据。最终调用 CallMethod 办法

echo.proto 中 echo.pb.cc 的 CallMethod 办法

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
                             ::google::protobuf::RpcController* controller,
                             const ::google::protobuf::Message* request,
                             ::google::protobuf::Message* response,
                             ::google::protobuf::Closure* done) {GOOGLE_DCHECK_EQ(method->service(),                          protobuf_echo_2eproto::file_level_service_descriptors[0]);
  switch(method->index()) {
    case 0:
      // 调用 impl 中的 Echo 办法
      Echo(controller,
             ::google::protobuf::down_cast<const ::example::EchoRequest*>(request),
             ::google::protobuf::down_cast< ::example::EchoResponse*>(response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}

调用 echo 办法,就是调用了 EchoServiceImpl::echo

EchoServiceImpl::Echo

class EchoServiceImpl : public EchoService {
public:
    EchoServiceImpl() {};
    virtual ~EchoServiceImpl() {};
    virtual void Echo(google::protobuf::RpcController* cntl,
                      const EchoRequest* request,
                      EchoResponse* response,
                      google::protobuf::Closure* done) {// 写本人的业务逻辑}
};

至此咱们曾经收到数据做业务逻辑解决了

总结

brpc 作为服务端整个收包过程根本就是这样,置信认真看完这篇文章。你对服务端必定会有肯定的播种。前面咱们在再写文章剖析介绍 brpc 服务端回包。client 收发包。brpc 协程,socket 套接字资源管理等。关注我,我是 dandyhuang。也可 wx 收 dandyhuang_,有什么问题咱们能够一起探讨交换。

正文完
 0