testserver.cc测试例子解读

张开发
2026/4/13 4:00:08 15 分钟阅读

分享文章

testserver.cc测试例子解读
一、介绍这个测试例子是基于muduo-core 实现的 Echo 服务器客户端发什么服务器原样返回核心遵循「Reactor 模型Multi-Reactor」mainLoop主线程 EventLoop负责监听新连接AcceptorsubLoop线程池中的 EventLoop负责处理已连接客户端的读写事件所有回调通过「函数对象 智能指针」解耦事件驱动核心是EventLoop::loop()的 epoll_wait 循环。二、每一行发生了什么1、EventLoop loop;初始化epoll_fdEpollPoller用于监听事件初始化wakeup_fdeventfd用于线程间唤醒subLoop 唤醒 mainLoop或反之标记threadId_为当前主线程 ID保证 One Loop Per Thread2、InetAddress addr(8080);运行效果初始化服务端监听地址默认 IP 为127.0.0.1端口 8080网络字节序。调用InetAddress(uint16_t port, std::string ip 127.0.0.1)构造函数填充sockaddr_in结构体sin_familyAF_INETsin_porthtons(8080)sin_addrinet_addr(127.0.0.1)。3、EchoServer server(loop, addr, EchoServer);运行效果初始化 EchoServer 对象核心做 3 件事初始化TcpServer对象关联 mainLoop、监听地址、服务器名称注册连接回调onConnection到TcpServer注册消息回调onMessage到 TcpServer设置 subLoop 线程数为 3Multi-Reactor 模式。子步骤 1TcpServer 初始化EchoServer 构造函数server_(loop, addr, name)调用TcpServer构造函数TcpServer.cc 18-35 行校验 mainLoop 非空初始化Acceptor对象负责监听新连接绑定 mainLoop初始化EventLoopThreadPool线程池关联 mainLoop给Acceptor注册newConnection回调TcpServer::newConnection这是新连接的核心入口。子步骤 2注册连接回调onConnection子步骤 3注册消息回调onMessage子步骤4设置subLoop线程数4、server.start();运行效果启动服务器核心做 2 件事启动EventLoopThreadPool创建 3 个 subLoop 线程让Acceptor开始监听 8080 端口mainLoop 监听新连接。子步骤 1启动线程池关联代码void EventLoopThreadPool::start(const ThreadInitCallback cb) { running_ true; for (int i 0; i numThreads_; i) { // 创建线程运行threadFunc std::string name name_ std::to_string(i 1); threads_.emplace_back(new Thread( std::bind(EventLoopThreadPool::threadFunc, this, cb), name)); threads_[i]-start(); } } // 线程函数每个线程创建一个EventLoop进入loop循环 void EventLoopThreadPool::threadFunc(const ThreadInitCallback cb) { EventLoop loop; if (cb) cb(loop); { MutexLockGuard lock(mutex_); loops_.push_back(loop); cond_.notifyAll(); } loop.loop(); // subLoop进入事件循环 }子步骤 2Acceptor 开始监听Acceptor::listen5、loop.loop();运行效果mainLoop 进入事件循环Reactor 核心永久循环直到quit()被调用调用Poller::poll()epoll_wait阻塞等待事件新连接 / 唤醒事件遍历触发的事件列表调用对应 Channel 的handleEvent()执行pendingFunctors_跨线程待执行的函数比如 newConnection。void EventLoop::loop() { looping_ true; quit_ false; while (!quit_) { activeChannels_.clear(); // 1. epoll_wait阻塞返回触发的事件activeChannels_ pollReturnTime_ poller_-poll(kPollTimeMs, activeChannels_); // 2. 处理所有触发的Channel事件 for (Channel *channel : activeChannels_) { channel-handleEvent(pollReturnTime_); } // 3. 执行跨线程的待执行函数 doPendingFunctors(); } looping_ false; }三、onMessage 回调逐行解析核心业务逻辑当客户端发送数据时会触发onMessage回调先看回调的触发前置条件再逐行解析前置onMessage 的触发链路从客户端发数据到回调执行客户端调用send()发数据 → 服务端 8080 对应的客户端 fd 触发 EPOLLIN该 fd 所属的 subLoop 的 Pollerepoll_wait检测到事件返回activeChannels_调用Channel::handleEvent()→ 触发TcpConnection::handleRead()handleRead()从 fd 读取数据到inputBuffer_调用messageCallback_即 onMessage。// 1. Channel处理读事件Channel.cc void Channel::handleEventWithGuard(Timestamp receiveTime) { if (revents_ (EPOLLIN | EPOLLPRI)) { if (readCallback_) readCallback_(receiveTime); // 调用TcpConnection::handleRead } } // 2. TcpConnection读取数据TcpConnection.cc void TcpConnection::handleRead(Timestamp receiveTime) { int savedErrno 0; ssize_t n inputBuffer_.readFd(channel_-fd(), savedErrno); // 读数据到inputBuffer_ if (n 0) { // 调用消息回调即EchoServer::onMessage messageCallback_(shared_from_this(), inputBuffer_, receiveTime); } }std::string msg buf-retrieveAllAsString();conn-send(msg);// 1. TcpConnection::sendTcpConnection.cc void TcpConnection::send(const std::string buf) { if (state_ kConnected) { if (loop_-isInLoopThread()) { // 若在subLoop线程直接发 sendInLoop(buf.c_str(), buf.size()); } else { // 跨线程放入pendingFunctors_ loop_-runInLoop( std::bind(TcpConnection::sendInLoop, this, buf.c_str(), buf.size())); } } } // 2. 核心发送逻辑TcpConnection.cc void TcpConnection::sendInLoop(const void *data, size_t len) { ssize_t nwrote 0; size_t remaining len; bool faultError false; // 情况1无待发送数据直接write if (!channel_-isWriting() outputBuffer_.readableBytes() 0) { nwrote ::write(channel_-fd(), data, len); if (nwrote 0) { remaining len - nwrote; if (remaining 0 writeCompleteCallback_) { // 发送完成触发回调 loop_-queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } } // 情况2write未发完剩余数据存入outputBuffer_注册EPOLLOUT if (!faultError remaining 0) { size_t oldLen outputBuffer_.readableBytes(); // 高水位回调超过64M触发 if (oldLen remaining highWaterMark_ oldLen highWaterMark_ highWaterMarkCallback_) { loop_-queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen remaining)); } outputBuffer_.append((char *)data nwrote, remaining); // 存入缓冲区 if (!channel_-isWriting()) { channel_-enableWriting(); // 注册EPOLLOUT事件 } } } // 3. EPOLLOUT触发时发送剩余数据TcpConnection.cc void TcpConnection::handleWrite() { if (channel_-isWriting()) { ssize_t n outputBuffer_.writeFd(channel_-fd(), savedErrno); // 写数据 if (n 0) { outputBuffer_.retrieve(n); // 清空已发送数据 if (outputBuffer_.readableBytes() 0) { channel_-disableWriting(); // 取消EPOLLOUT if (writeCompleteCallback_) { loop_-queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } } } }// conn-shutdown();// 1. 关闭写端TcpConnection.cc void TcpConnection::shutdown() { if (state_ kConnected) { setState(kDisconnecting); loop_-runInLoop(std::bind(TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop() { if (!channel_-isWriting()) { // 发送缓冲区已空 socket_-shutdownWrite(); // 关闭写端 } } // 2. 处理EPOLLHUPChannel.cc void Channel::handleEventWithGuard(Timestamp receiveTime) { if ((revents_ EPOLLHUP) !(revents_ EPOLLIN)) { if (closeCallback_) closeCallback_(); // 调用TcpConnection::handleClose } } // 3. 销毁连接TcpConnection.cc void TcpConnection::handleClose() { setState(kDisconnected); channel_-disableAll(); TcpConnectionPtr connPtr(shared_from_this()); connectionCallback_(connPtr); // 触发onConnectionConnection DOWN closeCallback_(connPtr); // 调用TcpServer::removeConnection }四、回调实现的核心逻辑从注册到触发整个 muduo 的回调体系基于「函数对象std::function 绑定std::bind 传递」核心步骤1. 回调注册testserver.cc → TcpServer → TcpConnectiontestserver.cc将onConnection/onMessage绑定为函数对象通过TcpServer::setXxxCallback存入 TcpServerTcpServer::newConnection创建 TcpConnection 时将 TcpServer 的回调传递给 TcpConnectionTcpConnection将回调存入自身成员connectionCallback_/messageCallback_并关联到 Channel 的事件处理。2. 回调触发事件驱动连接事件TcpConnection 创建 / 销毁时调用connectionCallback_onConnection消息事件客户端发数据触发读事件调用messageCallback_onMessage所有回调最终由EventLoop的事件循环驱动保证在对应线程mainLoop/subLoop执行。五、总结这个测试例子的核心是Reactor 模型的落地mainLoop监听新连接subLoop处理读写实现多线程并发回调通过「注册 - 传递 - 触发」解耦业务逻辑和底层网络Buffer解决 TCP 粘包 / 拆包异步发送避免线程阻塞整个流程基于 epoll 的事件驱动所有操作异步非阻塞符合高性能网络库的设计思想。每一行代码的执行都联动了 muduo-core 的核心类EventLoop是事件循环核心TcpServer是对外封装的服务器入口Acceptor是新连接监听者TcpConnection是单个连接的封装Channel是 fd 和事件的桥梁Buffer是数据缓冲区解决读写效率问题。回调的本质是「将业务函数以函数对象的形式传递给底层底层在特定事件触发时调用」而 muduo 通过std::function和std::bind实现了回调的灵活绑定通过shared_ptr保证对象生命周期安全。

更多文章