這次我們開始muduo源代碼的實際撰寫,首先我們知道muduo是LT模式,Reactor模式,下圖為Reactor模式的流程圖[來源1]

然后我們來看下muduo的整體架構[來源1]

首先muduo有一個主反應堆mainReactor以及幾個子反應堆subReactor,其中子反應堆的個數由用戶使用setThreadNum函式設定,mainReactor中主要有一個Acceptor,當用戶建立新的連接的時候,Acceptor會將connfd和對應的事件打包為一個channel然后采用輪詢的演算法,指定將該channel給所選擇的subReactor,以后該subReactor就負責該channel的所有作業,
TcpServer類
我們按照從上到下的思路進行講解,以下內容我們按照一個簡單的EchoServer的實作思路來講解,我們知道當我們自己實作一個Server的時候,會在建構式中實體化一個TcpServer
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注冊回呼函式
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
// 設定合適的loop執行緒數量 loopthread 不包括baseloop
server_.setThreadNum(3);
}
于是我們去看下TcpServer的建構式是在干什么
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 當有新用戶連接時候,會執行該回呼函式
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}
我們只需要關注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點,構造了一個Acceptor,我們首先要知道Acceptor主要就是連接新用戶并打包為一個Channel,所以我們就應該知道Acceptor按道理應該實作socket,bind,listen,accept這四個函式,
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop), acceptSocket_(createNonblocking()) // socket
,
acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr); // 系結套接字
// 有新用戶的連接,執行一個回呼(打包為channel)
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
其中Acceptor中有個acceptSocket_,其實就是我們平時所用的listenfd,建構式中實作了socket,bind,而其余的兩個函式的使用在其余代碼
// 開啟服務器監聽
void TcpServer::start()
{
// 防止一個TcpServer被start多次
if (started_++ == 0)
{
threadPool_->start(threadInitCallback_); // 啟動底層的loop執行緒池,這里會按照設定了threadnum設定pool的數量
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}
我們知道,當我們設定了threadnum之后,就會有一個mainloop,那么這個loop_就是那個mainloop,其中可以看見這個loop_就只做一個事情Acceptor::listen,
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_ => Poller
}
這里就實作了listen函式,還有最后一個函式accept,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()之后就會使得這個listenfd所在的channel對讀事件感興趣,那什么時候會有讀事件呢,就是當用戶建立新連接的時候,那么我們應該想一下,那當感興趣的事件發生之后,listenfd應該干什么呢,應該執行一個回呼函式呀,注意Acceptor建構式中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));這就是那個回呼,我們去看下handleRead在干嘛,
// listenfd有事件發生了,就是有新用戶連接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 若用戶實作定義了,則執行,否則說明用戶對新到來的連接沒有需要執行的,所以直接關閉
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發當前的新客戶端的Channel
}
else
{
::close(connfd);
}
}
...
}
這里是不是就實作了accept函式,至此當用戶建立一個新的連接時候,Acceptor就會得到一個connfd和其對應的peerAddr回傳給mainloop,這時候我們在注意到TcpServer建構式中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我們給acceptor_設定了一個newConnectionCallback_,于是由上面的代碼就可以知道,if (newConnectionCallback_)為真,就會執行這個回呼函式,于是就會執行TcpServer::newConnection,我們去看下這個函式是在干嘛,
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
// 輪詢演算法選擇一個subloop來管理對應的這個新連接
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
// 通過sockfd獲取其系結的本地ip和埠
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
// 根據連接成功的sockfd,創建TcpConnection
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, // Socket Channel
localAddr,
peerAddr));
connections_[connName] = conn;
// 下面的回呼時用戶設定給TcpServer,TcpServer又設定給TcpConnection,TcpConnetion又設定給Channel,Channel又設定給Poller,Poller通知channel呼叫這個回呼
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 設定了如何關閉連接的回呼
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);
// 直接呼叫connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
這里就比較長了,我先說下大概他干了啥事情:首先通過輪詢找到下一個subloop,然后將剛付訓傳的connfd和對應的peerAddr以及localAddr構造為一個TcpConnection給subloop,然后給這個conn設定了一系列的回呼函式,比如讀回呼,寫回呼,斷開回呼等等,下一章我們來說下上面的代碼最后幾行在干嘛,
自己的網址:www.shicoder.top
歡迎加群聊天 452380935
本文由博客一文多發平臺 OpenWrite 發布!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/463470.html
標籤:其他
下一篇:C++高性能網路服務保姆級教程
