关于golang:muduo源码分析之TcpServer模块

这次咱们开始muduo源代码的理论编写,首先咱们晓得muduoLT模式,Reactor模式,下图为Reactor模式的流程图[起源1]

而后咱们来看下muduo的整体架构[起源1]

首先muduo有一个主反应堆mainReactor以及几个子反应堆subReactor,其中子反应堆的个数由用户应用setThreadNum函数设置,mainReactor中次要有一个Acceptor,当用户建设新的连贯的时候,Acceptor会将connfd和对应的事件打包为一个channel而后采纳轮询的算法,指定将该channel给所抉择的subReactor,当前该subReactor就负责该channel的所有工作。

TcpServer类

咱们依照从上到下的思路进行解说,以下内容咱们依照一个简略的EchoServer的实现思路来解说,咱们晓得当咱们本人实现一个Server的时候,会在构造函数中实例化一个TcpServer

EchoServer(EventLoop *loop,
           const InetAddress &addr, 
           const std::string &name)
    : server_(loop, addr, name)
        , loop_(loop)
    {
        // 注册回调函数
        server_.setConnectionCallback(
            std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
        );

        server_.setMessageCallback(
            std::bind(&EchoServer::onMessage, this,
                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
        );

        // 设置适合的loop线程数量 loopthread 不包含baseloop
        server_.setThreadNum(3);
    }

于是咱们去看下TcpServer的构造函数是在干什么

TcpServer::TcpServer(EventLoop *loop,
                const InetAddress &listenAddr,
                const std::string &nameArg,
                Option option)
                : loop_(CheckLoopNotNull(loop))
                , ipPort_(listenAddr.toIpPort())
                , name_(nameArg)
                , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                , threadPool_(new EventLoopThreadPool(loop, name_))
                , connectionCallback_()
                , messageCallback_()
                , nextConnId_(1)
                , started_(0)
{
    // 当有新用户连贯时候,会执行该回调函数
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, 
        std::placeholders::_1, std::placeholders::_2));
}

咱们只须要关注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))threadPool_(new EventLoopThreadPool(loop, name_))
首先很明确的一点,结构了一个Acceptor,咱们首先要晓得Acceptor次要就是连贯新用户并打包为一个Channel,所以咱们就应该晓得Acceptor按情理应该实现socketbindlistenaccept这四个函数。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()) // socket
      ,
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // 绑定套接字
    // 有新用户的连贯,执行一个回调(打包为channel)
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

其中Acceptor中有个acceptSocket_,其实就是咱们平时所用的listenfd,构造函数中实现了socketbind,而其余的两个函数的应用在其余代码

// 开启服务器监听
void TcpServer::start()
{
    // 避免一个TcpServer被start屡次
    if (started_++ == 0) 
    {
        threadPool_->start(threadInitCallback_); // 启动底层的loop线程池,这里会依照设定了threadnum设置pool的数量
        loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
    }
}

咱们晓得,当咱们设置了threadnum之后,就会有一个mainloop,那么这个loop_就是那个mainloop,其中能够看见这个loop_就只做一个事件Acceptor::listen

void Acceptor::listen()
{
    listenning_ = true;
    acceptSocket_.listen();         // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
}

这里就实现了listen函数,还有最初一个函数accept,咱们缓缓向下剖析,从代码能够晓得acceptChannel_.enableReading()之后就会使得这个listenfd所在的channel对读事件感兴趣,那什么时候会有读事件呢,就是当用户建设新连贯的时候,那么咱们应该想一下,那当感兴趣的事件产生之后,listenfd应该干什么呢,应该执行一个回调函数呀。留神Acceptor构造函数中有这样一行代码acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));这就是那个回调,咱们去看下handleRead在干嘛。

// listenfd有事件产生了,就是有新用户连贯了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // 若用户实现定义了,则执行,否则阐明用户对新到来的连贯没有须要执行的,所以间接敞开
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,散发以后的新客户端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    ...
}

这里是不是就实现了accept函数,至此当用户建设一个新的连贯时候,Acceptor就会失去一个connfd和其对应的peerAddr返回给mainloop,这时候咱们在留神到TcpServer构造函数中有这样一行代码acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));咱们给acceptor_设置了一个newConnectionCallback_,于是由下面的代码就能够晓得,if (newConnectionCallback_)为真,就会执行这个回调函数,于是就会执行TcpServer::newConnection,咱们去看下这个函数是在干嘛。

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    // 轮询算法抉择一个subloop来治理对应的这个新连贯
    EventLoop *ioLoop = threadPool_->getNextLoop(); 
    char buf[64] = {0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    // 通过sockfd获取其绑定的本地ip和端口
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;
    if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);

    // 依据连贯胜利的sockfd,创立TcpConnection
    TcpConnectionPtr conn(new TcpConnection(
                            ioLoop,
                            connName,
                            sockfd,   // Socket Channel
                            localAddr,
                            peerAddr));
    connections_[connName] = conn;
    // 上面的回调时用户设置给TcpServer,TcpServer又设置给TcpConnection,TcpConnetion又设置给Channel,Channel又设置给Poller,Poller告诉channel调用这个回调
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);

    // 设置了如何敞开连贯的回调
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
    );

    // 间接调用connectEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

这里就比拟长了,我先说下大略他干了啥事件:首先通过轮询找到下一个subloop,而后将刚刚返回的connfd和对应的peerAddr以及localAddr结构为一个TcpConnectionsubloop,而后给这个conn设置了一系列的回调函数,比方读回调,写回调,断开回调等等。下一章咱们来说下下面的代码最初几行在干嘛。

本人的网址:www.shicoder.top
欢送加群聊天 452380935
本文由博客一文多发平台 OpenWrite 公布!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理