1 Star 0 Fork 1

向海/mymuduo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
EventLoop.cc 4.40 KB
一键复制 编辑 原始数据 按行查看 历史
xianghai 提交于 2023-02-25 00:44 . initialize
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>
// 防止一个线程创建多个eventloop
__thread EventLoop *t_loopInThisThread = nullptr;
// 定义默认poller io复用接口的超时时间 10s
const int kPollTimeMs = 10000;
// 创建wakeupfd, 用来notify唤醒sub loop
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_FATAL("eventfd error:%d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread\n", this);
}
else
{
t_loopInThisThread = this;
}
// 设置wakeupfd的事件类型,以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 每一个eventloop都将监听wakupchannel的EPOLLIN
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes! \n", n);
}
}
// 开启事件循环
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping\n", this);
while (!quit_)
{
activeChannels_.clear();
// 监听两类fd,一种是clientfd,一种是wakeupFd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// poller监听哪些channel发生事件,上报给eventloop,通知channel处理事件
channel->handleEvent(pollReturnTime_);
}
// 执行当前eventloop事件循环需要处理的回调操作
// mainloop事先注册一个回调,需要subloop来执行
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping\n", this);
looping_ = false;
}
// 退出事件循环 1.loop在自己的线程中调用quit(), 2.在非loop的线程中调用了loop的quit,
void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread()) // 如果在其他线程中调用了quit,比如在subloop(worker)中调用了mainloop(IO)的quit
{
wakeup();
}
}
// 当前线程中执行
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 在当前的loop线程中执行cb
{
cb();
}
else // 在非当前loop线程中执行cb, 需要唤醒loop所在线程,执行cb
{
queueInLoop(cb);
}
}
// 把cb放入队列中,唤醒loop所在线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
// 唤醒响应的需要执行上面回调操作的loop线程
// || callingPendingFunctors : 当前loop正在执行回调,又有新的回调进来,需要在执行完回调后唤醒一次执行新的回调
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程
}
}
// eventloop方法-> poller方法
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{
return poller_->hasChannel(channel);
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor &functor : functors)
{
functor(); // 执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/to-the-sea/mymuduo.git
git@gitee.com:to-the-sea/mymuduo.git
to-the-sea
mymuduo
mymuduo
master

搜索帮助