forked from L1w-Y/muduo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventLoop.cc
More file actions
151 lines (131 loc) · 3.63 KB
/
Copy pathEventLoop.cc
File metadata and controls
151 lines (131 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include"EventLoop.h"
#include"Logger.h"
#include"Poller.h"
#include"Channel.h"
#include<sys/eventfd.h>
#include<unistd.h>
#include<fcntl.h>
#include<errno.h>
#include<memory>
__thread EventLoop* t_loopInThisThread = nullptr;
const int KPollTimeMs = 10000;
int createEventfd(){
int evtfd = ::eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd<0){
LOG_FATAL("eventfd error: %d",errno);
}
return evtfd;
}
void EventLoop::loop(){
looping_ = true;
quit_=false;
LOG_INFO("eventloop %p start looping \n",this);
while(!quit_){
activeChannels_.clear();
//poller监听两类fd,一类是来自client,一类是用于loop间通信的eventfd(统一会封装为channel)
pollReturnTime_ = poller_->poll(KPollTimeMs,&activeChannels_);
for(auto ch : activeChannels_)
{
ch->handleEvent(pollReturnTime_);
}
//执行当前loop循环需要处理的回调操作
dopendingFunctors();
}
LOG_INFO("eventloop %p stop looping \n",this);
looping_=false;
}
EventLoop::EventLoop()
: looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrentThread::tid())
,poller_(Poller::newDefaultPoller(this))
,wakeupFd_(createEventfd())
,wakeupChannel_(new Channel(this,wakeupFd_))
,currentActiveChannel_(nullptr)
{
LOG_DEBUG("EventLoop created %p in thread %d \n",this,threadId_);
if(t_loopInThisThread){
LOG_FATAL("Another loop %p exists in this thread %d \n",t_loopInThisThread,threadId_);
}
else
{
t_loopInThisThread=this;
}
//设置wakeupfd的事件类型以及发生事件后的回调
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));
//每一个eventloop都将监听wakeupchannel的EPOLLIN读事件
wakeupChannel_->enableReading();
}
/*
退出事件循环,两种情况
1.loop在自己的线程中调用quit
2.在其他线程中调用quit,比如subloop调用mainloop的quit
*/
void EventLoop::quit(){
quit_=true;
if(!isINLoopThread()){
wakeup();
}
}
void EventLoop::handleRead()
{
uint64_t one =1;
ssize_t n = read(wakeupFd_,&one,sizeof one);
if(n !=sizeof one){
LOG_ERROR("eventloop::handleRead() reads %zd bytes instead of 8",n);
}
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::runInLoop(Functor cb){
if(isINLoopThread()){
cb();
}
else{
queueInLoop(std::move(cb));
}
}
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
//唤醒执行回调的loop线程
if(!isINLoopThread()||callingPendingFunctors_){
wakeup();
}
}
void EventLoop::updateChannel(Channel *channel){
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *Channel){
return poller_->hasChannel(Channel);
}
//向wakeupfd_写一个数据
void EventLoop::wakeup(){
uint64_t one =1;
ssize_t n = write(wakeupFd_,&one,sizeof one);
if(n!=sizeof one){
LOG_ERROR("eventloop::wakeup() writes %lu bytes instead of 8\n",n);
}
}
void EventLoop::dopendingFunctors(){
std::vector<Functor> Functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
Functors.swap(pendingFunctors_);
}
for(auto &f : Functors){
f();//执行当前loop回调
}
callingPendingFunctors_ = false;
}