代码拉取完成,页面将自动刷新
同步操作将从 张远浩/call_demo_server 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#include "EventDispatch.h"
#include "BaseSocket.h"
#define MIN_TIMER_DURATION 100 // 100 miliseconds
CEventDispatch* CEventDispatch::m_pEventDispatch = NULL;
CEventDispatch::CEventDispatch()
{
running = false;
#ifdef _WIN32
FD_ZERO(&m_read_set);
FD_ZERO(&m_write_set);
FD_ZERO(&m_excep_set);
#elif __APPLE__
m_kqfd = kqueue();
if (m_kqfd == -1)
{
log("kqueue failed");
}
#else
m_epfd = epoll_create(1024);
if (m_epfd == -1)
{
log("epoll_create failed");
}
#endif
}
CEventDispatch::~CEventDispatch()
{
#ifdef _WIN32
#elif __APPLE__
close(m_kqfd);
#else
close(m_epfd);
#endif
}
void CEventDispatch::AddTimer(callback_t callback, void* user_data, uint64_t interval)
{
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
{
TimerItem* pItem = *it;
if (pItem->callback == callback && pItem->user_data == user_data)
{
pItem->interval = interval;
pItem->next_tick = get_tick_count() + interval;
return;
}
}
TimerItem* pItem = new TimerItem;
pItem->callback = callback;
pItem->user_data = user_data;
pItem->interval = interval;
pItem->next_tick = get_tick_count() + interval;
m_timer_list.push_back(pItem);
}
void CEventDispatch::RemoveTimer(callback_t callback, void* user_data)
{
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
{
TimerItem* pItem = *it;
if (pItem->callback == callback && pItem->user_data == user_data)
{
m_timer_list.erase(it);
delete pItem;
return;
}
}
}
void CEventDispatch::_CheckTimer()
{
uint64_t curr_tick = get_tick_count();
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); )
{
TimerItem* pItem = *it;
it++; // iterator maybe deleted in the callback, so we should increment it before callback
if (curr_tick >= pItem->next_tick)
{
pItem->next_tick += pItem->interval;
pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
}
}
}
void CEventDispatch::AddLoop(callback_t callback, void* user_data)
{
TimerItem* pItem = new TimerItem;
pItem->callback = callback;
pItem->user_data = user_data;
m_loop_list.push_back(pItem);
}
void CEventDispatch::_CheckLoop()
{
for (list<TimerItem*>::iterator it = m_loop_list.begin(); it != m_loop_list.end(); it++) {
TimerItem* pItem = *it;
pItem->callback(pItem->user_data, NETLIB_MSG_LOOP, 0, NULL);
}
}
CEventDispatch* CEventDispatch::Instance()
{
if (m_pEventDispatch == NULL)
{
m_pEventDispatch = new CEventDispatch();
}
return m_pEventDispatch;
}
#ifdef _WIN32
void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
{
CAutoLock func_lock(&m_lock);
if ((socket_event & SOCKET_READ) != 0)
{
FD_SET(fd, &m_read_set);
}
if ((socket_event & SOCKET_WRITE) != 0)
{
FD_SET(fd, &m_write_set);
}
if ((socket_event & SOCKET_EXCEP) != 0)
{
FD_SET(fd, &m_excep_set);
}
}
void CEventDispatch::RemoveEvent(SOCKET fd, uint8_t socket_event)
{
CAutoLock func_lock(&m_lock);
if ((socket_event & SOCKET_READ) != 0)
{
FD_CLR(fd, &m_read_set);
}
if ((socket_event & SOCKET_WRITE) != 0)
{
FD_CLR(fd, &m_write_set);
}
if ((socket_event & SOCKET_EXCEP) != 0)
{
FD_CLR(fd, &m_excep_set);
}
}
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)
{
Sleep(MIN_TIMER_DURATION);
continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnRead();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnWrite();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnClose();
pSocket->ReleaseRef();
}
}
}
}
void CEventDispatch::StopDispatch()
{
running = false;
}
#elif __APPLE__
void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
{
struct kevent ke;
if ((socket_event & SOCKET_READ) != 0)
{
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(m_kqfd, &ke, 1, NULL, 0, NULL);
}
if ((socket_event & SOCKET_WRITE) != 0)
{
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
kevent(m_kqfd, &ke, 1, NULL, 0, NULL);
}
}
void CEventDispatch::RemoveEvent(SOCKET fd, uint8_t socket_event)
{
struct kevent ke;
if ((socket_event & SOCKET_READ) != 0)
{
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(m_kqfd, &ke, 1, NULL, 0, NULL);
}
if ((socket_event & SOCKET_WRITE) != 0)
{
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(m_kqfd, &ke, 1, NULL, 0, NULL);
}
}
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
struct kevent events[1024];
int nfds = 0;
struct timespec timeout;
timeout.tv_sec = 0;
timeout.tv_nsec = wait_timeout * 1000000;
if(running)
return;
running = true;
while (running)
{
nfds = kevent(m_kqfd, NULL, 0, events, 1024, &timeout);
for (int i = 0; i < nfds; i++)
{
int ev_fd = events[i].ident;
CBaseSocket* pSocket = FindBaseSocket(ev_fd);
if (!pSocket)
continue;
if (events[i].filter == EVFILT_READ)
{
//log("OnRead, socket=%d\n", ev_fd);
pSocket->OnRead();
}
if (events[i].filter == EVFILT_WRITE)
{
//log("OnWrite, socket=%d\n", ev_fd);
pSocket->OnWrite();
}
pSocket->ReleaseRef();
}
_CheckTimer();
_CheckLoop();
}
}
void CEventDispatch::StopDispatch()
{
running = false;
}
#else
void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
{
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLPRI | EPOLLERR | EPOLLHUP;
ev.data.fd = fd;
if (epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
{
log("epoll_ctl() failed, errno=%d", errno);
}
}
void CEventDispatch::RemoveEvent(SOCKET fd, uint8_t socket_event)
{
if (epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, NULL) != 0)
{
log("epoll_ctl failed, errno=%d", errno);
}
}
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
struct epoll_event events[1024];
int nfds = 0;
if(running)
return;
running = true;
while (running)
{
nfds = epoll_wait(m_epfd, events, 1024, wait_timeout);
for (int i = 0; i < nfds; i++)
{
int ev_fd = events[i].data.fd;
CBaseSocket* pSocket = FindBaseSocket(ev_fd);
if (!pSocket)
continue;
//Commit by zhfu @2015-02-28
#ifdef EPOLLRDHUP
if (events[i].events & EPOLLRDHUP)
{
//log("On Peer Close, socket=%d, ev_fd);
pSocket->OnClose();
}
#endif
// Commit End
if (events[i].events & EPOLLIN)
{
//log("OnRead, socket=%d\n", ev_fd);
pSocket->OnRead();
}
if (events[i].events & EPOLLOUT)
{
//log("OnWrite, socket=%d\n", ev_fd);
pSocket->OnWrite();
}
if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))
{
//log("OnClose, socket=%d\n", ev_fd);
pSocket->OnClose();
}
pSocket->ReleaseRef();
}
_CheckTimer();
_CheckLoop();
}
}
void CEventDispatch::StopDispatch()
{
running = false;
}
#endif
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。