这次咱们开始 muduo
源代码的理论编写,首先咱们晓得 muduo
是LT
模式,Reactor
模式,下图为 Reactor
模式的流程图[起源 1]
而后咱们来看下 muduo
的整体架构[起源 1]
首先 muduo
有一个主反应堆 mainReactor
以及几个子反应堆 subReactor
,其中子反应堆的个数由用户应用setThreadNum
函数设置,mainReactor
中次要有一个 Acceptor
,当用户建设新的连贯的时候,Acceptor
会将 connfd
和对应的事件打包为一个 channel
而后采纳轮询的算法,指定将该 channel
给所抉择的 subReactor
,当前该subReactor
就负责该 channel
的所有工作。
TcpServer 类
咱们依照从上到下的思路进行解说,以下内容咱们依照一个简略的 EchoServer
的实现思路来解说,咱们晓得当咱们本人实现一个 Server
的时候,会在构造函数中实例化一个TcpServer
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
// 设置适合的 loop 线程数量 loopthread 不包含 baseloop
server_.setThreadNum(3);
}
于是咱们去看下 TcpServer
的构造函数是在干什么
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 当有新用户连贯时候,会执行该回调函数
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}
咱们只须要关注 acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明确的一点,结构了一个 Acceptor
,咱们首先要晓得Acceptor
次要就是连贯新用户并打包为一个 Channel
,所以咱们就应该晓得Acceptor
按情理应该实现 socket
,bind
,listen
,accept
这四个函数。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop), acceptSocket_(createNonblocking()) // socket
,
acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr); // 绑定套接字
// 有新用户的连贯,执行一个回调(打包为 channel)acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
其中 Acceptor
中有个acceptSocket_
,其实就是咱们平时所用的listenfd
,构造函数中实现了socket
,bind
,而其余的两个函数的应用在其余代码
// 开启服务器监听
void TcpServer::start()
{
// 避免一个 TcpServer 被 start 屡次
if (started_++ == 0)
{threadPool_->start(threadInitCallback_); // 启动底层的 loop 线程池, 这里会依照设定了 threadnum 设置 pool 的数量
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}
咱们晓得,当咱们设置了 threadnum
之后,就会有一个 mainloop
,那么这个loop_
就是那个 mainloop
,其中能够看见这个loop_
就只做一个事件Acceptor::listen
。
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_ => Poller}
这里就实现了 listen
函数,还有最初一个函数 accept
,咱们缓缓向下剖析,从代码能够晓得acceptChannel_.enableReading()
之后就会使得这个 listenfd
所在的 channel
对读事件感兴趣,那什么时候会有读事件呢,就是当用户建设新连贯的时候,那么咱们应该想一下,那当感兴趣的事件产生之后,listenfd
应该干什么呢,应该执行一个回调函数呀。留神 Acceptor
构造函数中有这样一行代码 acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
这就是那个回调,咱们去看下 handleRead
在干嘛。
// listenfd 有事件产生了,就是有新用户连贯了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 若用户实现定义了,则执行,否则阐明用户对新到来的连贯没有须要执行的,所以间接敞开
if (newConnectionCallback_)
{newConnectionCallback_(connfd, peerAddr); // 轮询找到 subLoop,唤醒,散发以后的新客户端的 Channel
}
else
{::close(connfd);
}
}
...
}
这里是不是就实现了 accept
函数,至此当用户建设一个新的连贯时候,Acceptor
就会失去一个 connfd
和其对应的 peerAddr
返回给 mainloop
,这时候咱们在留神到TcpServer
构造函数中有这样一行代码 acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
咱们给 acceptor_
设置了一个 newConnectionCallback_
,于是由下面的代码就能够晓得,if (newConnectionCallback_)
为真,就会执行这个回调函数,于是就会执行TcpServer::newConnection
,咱们去看下这个函数是在干嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
// 轮询算法抉择一个 subloop 来治理对应的这个新连贯
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
// 通过 sockfd 获取其绑定的本地 ip 和端口
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
// 依据连贯胜利的 sockfd, 创立 TcpConnection
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, // Socket Channel
localAddr,
peerAddr));
connections_[connName] = conn;
// 上面的回调时用户设置给 TcpServer,TcpServer 又设置给 TcpConnection,TcpConnetion 又设置给 Channel,Channel 又设置给 Poller,Poller 告诉 channel 调用这个回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 设置了如何敞开连贯的回调
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);
// 间接调用 connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
这里就比拟长了,我先说下大略他干了啥事件:首先通过轮询找到下一个 subloop
,而后将刚刚返回的connfd
和对应的 peerAddr
以及 localAddr
结构为一个 TcpConnection
给subloop
,而后给这个 conn
设置了一系列的回调函数,比方读回调,写回调,断开回调等等。下一章咱们来说下下面的代码最初几行在干嘛。
本人的网址:www.shicoder.top
欢送加群聊天 452380935
本文由博客一文多发平台 OpenWrite 公布!