1 Star 1 Fork 0

SCIsMe/libco

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
co_routine.cpp 22.47 KB
一键复制 编辑 原始数据 按行查看 历史
leiffyli@tencent.com 提交于 2016-12-07 22:26 . update hook connect
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137
/*
* Tencent is pleased to support the open source community by making Libco available.
* Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "co_routine.h"
#include "co_routine_inner.h"
#include "co_epoll.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <string>
#include <map>
#include <poll.h>
#include <sys/time.h>
#include <errno.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/syscall.h>
#include <unistd.h>
extern "C"
{
extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");
};
using namespace std;
stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env );
struct stCoEpoll_t;
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* ocupy_co;
};
//int socket(int domain, int type, int protocol);
void co_log_err( const char *fmt,... )
{
}
#if defined( __LIBCO_RDTSCP__)
static unsigned long long counter(void)
{
register uint32_t lo, hi;
register unsigned long long o;
__asm__ __volatile__ (
"rdtscp" : "=a"(lo), "=d"(hi)
);
o = hi;
o <<= 32;
return (o | lo);
}
static unsigned long long getCpuKhz()
{
FILE *fp = fopen("/proc/cpuinfo","r");
if(!fp) return 1;
char buf[4096] = {0};
fread(buf,1,sizeof(buf),fp);
fclose(fp);
char *lp = strstr(buf,"cpu MHz");
if(!lp) return 1;
lp += strlen("cpu MHz");
while(*lp == ' ' || *lp == '\t' || *lp == ':')
{
++lp;
}
double mhz = atof(lp);
unsigned long long u = (unsigned long long)(mhz * 1000);
return u;
}
#endif
static unsigned long long GetTickMS()
{
#if defined( __LIBCO_RDTSCP__)
static uint32_t khz = getCpuKhz();
return counter() / khz;
#else
struct timeval now = { 0 };
gettimeofday( &now,NULL );
unsigned long long u = now.tv_sec;
u *= 1000;
u += now.tv_usec / 1000;
return u;
#endif
}
static pid_t GetPid()
{
static __thread pid_t pid = 0;
static __thread pid_t tid = 0;
if( !pid || !tid || pid != getpid() )
{
pid = getpid();
#if defined( __APPLE__ )
tid = syscall( SYS_gettid );
if( -1 == (long)tid )
{
tid = pid;
}
#else
tid = syscall( __NR_gettid );
#endif
}
return tid;
}
/*
static pid_t GetPid()
{
char **p = (char**)pthread_self();
return p ? *(pid_t*)(p + 18) : getpid();
}
*/
template <class T,class TLink>
void RemoveFromLink(T *ap)
{
TLink *lst = ap->pLink;
if(!lst) return ;
assert( lst->head && lst->tail );
if( ap == lst->head )
{
lst->head = ap->pNext;
if(lst->head)
{
lst->head->pPrev = NULL;
}
}
else
{
if(ap->pPrev)
{
ap->pPrev->pNext = ap->pNext;
}
}
if( ap == lst->tail )
{
lst->tail = ap->pPrev;
if(lst->tail)
{
lst->tail->pNext = NULL;
}
}
else
{
ap->pNext->pPrev = ap->pPrev;
}
ap->pPrev = ap->pNext = NULL;
ap->pLink = NULL;
}
template <class TNode,class TLink>
void inline AddTail(TLink*apLink,TNode *ap)
{
if( ap->pLink )
{
return ;
}
if(apLink->tail)
{
apLink->tail->pNext = (TNode*)ap;
ap->pNext = NULL;
ap->pPrev = apLink->tail;
apLink->tail = ap;
}
else
{
apLink->head = apLink->tail = ap;
ap->pNext = ap->pPrev = NULL;
}
ap->pLink = apLink;
}
template <class TNode,class TLink>
void inline PopHead( TLink*apLink )
{
if( !apLink->head )
{
return ;
}
TNode *lp = apLink->head;
if( apLink->head == apLink->tail )
{
apLink->head = apLink->tail = NULL;
}
else
{
apLink->head = apLink->head->pNext;
}
lp->pPrev = lp->pNext = NULL;
lp->pLink = NULL;
if( apLink->head )
{
apLink->head->pPrev = NULL;
}
}
template <class TNode,class TLink>
void inline Join( TLink*apLink,TLink *apOther )
{
//printf("apOther %p\n",apOther);
if( !apOther->head )
{
return ;
}
TNode *lp = apOther->head;
while( lp )
{
lp->pLink = apLink;
lp = lp->pNext;
}
lp = apOther->head;
if(apLink->tail)
{
apLink->tail->pNext = (TNode*)lp;
lp->pPrev = apLink->tail;
apLink->tail = apOther->tail;
}
else
{
apLink->head = apOther->head;
apLink->tail = apOther->tail;
}
apOther->head = apOther->tail = NULL;
}
/////////////////for copy stack //////////////////////////
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->ocupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}
stShareStack_t* co_alloc_sharestack(int count, int stack_size)
{
stShareStack_t* share_stack = (stShareStack_t*)malloc(sizeof(stShareStack_t));
share_stack->alloc_idx = 0;
share_stack->stack_size = stack_size;
//alloc stack array
share_stack->count = count;
stStackMem_t** stack_array = (stStackMem_t**)calloc(count, sizeof(stStackMem_t*));
for (int i = 0; i < count; i++)
{
stack_array[i] = co_alloc_stackmem(stack_size);
}
share_stack->stack_array = stack_array;
return share_stack;
}
static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{
if (!share_stack)
{
return NULL;
}
int idx = share_stack->alloc_idx % share_stack->count;
share_stack->alloc_idx++;
return share_stack->stack_array[idx];
}
// ----------------------------------------------------------------------------
struct stTimeoutItemLink_t;
struct stTimeoutItem_t;
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;
struct stTimeout_t *pTimeout;
struct stTimeoutItemLink_t *pstTimeoutList;
struct stTimeoutItemLink_t *pstActiveList;
co_epoll_res *result;
};
typedef void (*OnPreparePfn_t)( stTimeoutItem_t *,struct epoll_event &ev, stTimeoutItemLink_t *active );
typedef void (*OnProcessPfn_t)( stTimeoutItem_t *);
struct stTimeoutItem_t
{
enum
{
eMaxTimeout = 40 * 1000 //40s
};
stTimeoutItem_t *pPrev;
stTimeoutItem_t *pNext;
stTimeoutItemLink_t *pLink;
unsigned long long ullExpireTime;
OnPreparePfn_t pfnPrepare;
OnProcessPfn_t pfnProcess;
void *pArg; // routine
bool bTimeout;
};
struct stTimeoutItemLink_t
{
stTimeoutItem_t *head;
stTimeoutItem_t *tail;
};
struct stTimeout_t
{
stTimeoutItemLink_t *pItems;
int iItemSize;
unsigned long long ullStart;
long long llStartIdx;
};
stTimeout_t *AllocTimeout( int iSize )
{
stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) );
lp->iItemSize = iSize;
lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize );
lp->ullStart = GetTickMS();
lp->llStartIdx = 0;
return lp;
}
void FreeTimeout( stTimeout_t *apTimeout )
{
free( apTimeout->pItems );
free ( apTimeout );
}
int AddTimeout( stTimeout_t *apTimeout,stTimeoutItem_t *apItem ,unsigned long long allNow )
{
if( apTimeout->ullStart == 0 )
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0;
}
if( allNow < apTimeout->ullStart )
{
co_log_err("CO_ERR: AddTimeout line %d allNow %llu apTimeout->ullStart %llu",
__LINE__,allNow,apTimeout->ullStart);
return __LINE__;
}
if( apItem->ullExpireTime < allNow )
{
co_log_err("CO_ERR: AddTimeout line %d apItem->ullExpireTime %llu allNow %llu apTimeout->ullStart %llu",
__LINE__,apItem->ullExpireTime,allNow,apTimeout->ullStart);
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize )
{
co_log_err("CO_ERR: AddTimeout line %d diff %d",
__LINE__,diff);
return __LINE__;
}
AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
return 0;
}
inline void TakeAllTimeout( stTimeout_t *apTimeout,unsigned long long allNow,stTimeoutItemLink_t *apResult )
{
if( apTimeout->ullStart == 0 )
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0;
}
if( allNow < apTimeout->ullStart )
{
return ;
}
int cnt = allNow - apTimeout->ullStart + 1;
if( cnt > apTimeout->iItemSize )
{
cnt = apTimeout->iItemSize;
}
if( cnt < 0 )
{
return;
}
for( int i = 0;i<cnt;i++)
{
int idx = ( apTimeout->llStartIdx + i) % apTimeout->iItemSize;
Join<stTimeoutItem_t,stTimeoutItemLink_t>( apResult,apTimeout->pItems + idx );
}
apTimeout->ullStart = allNow;
apTimeout->llStartIdx += cnt - 1;
}
static int CoRoutineFunc( stCoRoutine_t *co,void * )
{
if( co->pfn )
{
co->pfn( co->arg );
}
co->cEnd = 1;
stCoRoutineEnv_t *env = co->env;
co_yield_env( env );
return 0;
}
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
void co_free( stCoRoutine_t *co )
{
free( co );
}
void co_release( stCoRoutine_t *co )
{
if( co->cEnd )
{
free( co );
}
}
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co);
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
co_swap( lpCurrRoutine, co );
}
void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}
void co_yield_ct()
{
co_yield_env( co_get_curr_thread_env() );
}
void co_yield( stCoRoutine_t *co )
{
co_yield_env( co->env );
}
void save_stack_buffer(stCoRoutine_t* ocupy_co)
{
///copy out
stStackMem_t* stack_mem = ocupy_co->stack_mem;
int len = stack_mem->stack_bp - ocupy_co->stack_sp;
if (ocupy_co->save_buffer)
{
free(ocupy_co->save_buffer), ocupy_co->save_buffer = NULL;
}
ocupy_co->save_buffer = (char*)malloc(len); //malloc buf;
ocupy_co->save_size = len;
memcpy(ocupy_co->save_buffer, ocupy_co->stack_sp, len);
}
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack)
{
env->pending_co = NULL;
env->ocupy_co = NULL;
}
else
{
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* ocupy_co = pending_co->stack_mem->ocupy_co;
//set pending co to ocupy thest stack mem;
pending_co->stack_mem->ocupy_co = pending_co;
env->ocupy_co = ocupy_co;
if (ocupy_co && ocupy_co != pending_co)
{
save_stack_buffer(ocupy_co);
}
}
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_ocupy_co = curr_env->ocupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
if (update_ocupy_co && update_pending_co && update_ocupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
//int poll(struct pollfd fds[], nfds_t nfds, int timeout);
// { fd,events,revents }
struct stPollItem_t ;
struct stPoll_t : public stTimeoutItem_t
{
struct pollfd *fds;
nfds_t nfds; // typedef unsigned long int nfds_t;
stPollItem_t *pPollItems;
int iAllEventDetach;
int iEpollFd;
int iRaiseCnt;
};
struct stPollItem_t : public stTimeoutItem_t
{
struct pollfd *pSelf;
stPoll_t *pPoll;
struct epoll_event stEvent;
};
/*
* EPOLLPRI POLLPRI // There is urgent data to read.
* EPOLLMSG POLLMSG
*
* POLLREMOVE
* POLLRDHUP
* POLLNVAL
*
* */
static uint32_t PollEvent2Epoll( short events )
{
uint32_t e = 0;
if( events & POLLIN ) e |= EPOLLIN;
if( events & POLLOUT ) e |= EPOLLOUT;
if( events & POLLHUP ) e |= EPOLLHUP;
if( events & POLLERR ) e |= EPOLLERR;
if( events & POLLRDNORM ) e |= EPOLLRDNORM;
if( events & POLLWRNORM ) e |= EPOLLWRNORM;
return e;
}
static short EpollEvent2Poll( uint32_t events )
{
short e = 0;
if( events & EPOLLIN ) e |= POLLIN;
if( events & EPOLLOUT ) e |= POLLOUT;
if( events & EPOLLHUP ) e |= POLLHUP;
if( events & EPOLLERR ) e |= POLLERR;
if( events & EPOLLRDNORM ) e |= POLLRDNORM;
if( events & EPOLLWRNORM ) e |= POLLWRNORM;
return e;
}
static stCoRoutineEnv_t* g_arrCoEnvPerThread[ 204800 ] = { 0 };
void co_init_curr_thread_env()
{
pid_t pid = GetPid();
g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1;
env->pending_co = NULL;
env->ocupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}
void OnPollProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active )
{
stPollItem_t *lp = (stPollItem_t *)ap;
lp->pSelf->revents = EpollEvent2Poll( e.events );
stPoll_t *pPoll = lp->pPoll;
pPoll->iRaiseCnt++;
if( !pPoll->iAllEventDetach )
{
pPoll->iAllEventDetach = 1;
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );
AddTail( active,pPoll );
}
}
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )
{
item->pfnPrepare( item,result->events[i],active );
}
else
{
AddTail( active,item );
}
}
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
while( lp )
{
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if( lp->pfnProcess )
{
lp->pfnProcess( lp );
}
lp = active->head;
}
if( pfn )
{
if( -1 == pfn( arg ) )
{
break;
}
}
}
}
void OnCoroutineEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
stCoEpoll_t *AllocEpoll()
{
stCoEpoll_t *ctx = (stCoEpoll_t*)calloc( 1,sizeof(stCoEpoll_t) );
ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE );
ctx->pTimeout = AllocTimeout( 60 * 1000 );
ctx->pstActiveList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
ctx->pstTimeoutList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
return ctx;
}
void FreeEpoll( stCoEpoll_t *ctx )
{
if( ctx )
{
free( ctx->pstActiveList );
free( ctx->pstTimeoutList );
FreeTimeout( ctx->pTimeout );
co_epoll_res_free( ctx->result );
}
free( ctx );
}
stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env )
{
return env->pCallStack[ env->iCallStackSize - 1 ];
}
stCoRoutine_t *GetCurrThreadCo( )
{
stCoRoutineEnv_t *env = co_get_curr_thread_env();
if( !env ) return 0;
return GetCurrCo(env);
}
typedef int (*poll_pfn_t)(struct pollfd fds[], nfds_t nfds, int timeout);
int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
if( timeout > stTimeoutItem_t::eMaxTimeout )
{
timeout = stTimeoutItem_t::eMaxTimeout;
}
int epfd = ctx->iEpollFd;
stCoRoutine_t* self = co_self();
//1.struct change
stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
memset( &arg,0,sizeof(arg) );
arg.iEpollFd = epfd;
arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;
stPollItem_t arr[2];
if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)
{
arg.pPollItems = arr;
}
else
{
arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
}
memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) );
arg.pfnProcess = OnPollProcessEvent;
arg.pArg = GetCurrCo( co_get_curr_thread_env() );
//2. add epoll
for(nfds_t i=0;i<nfds;i++)
{
arg.pPollItems[i].pSelf = arg.fds + i;
arg.pPollItems[i].pPoll = &arg;
arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
struct epoll_event &ev = arg.pPollItems[i].stEvent;
if( fds[i].fd > -1 )
{
ev.data.ptr = arg.pPollItems + i;
ev.events = PollEvent2Epoll( fds[i].events );
int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );
if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL)
{
if( arg.pPollItems != arr )
{
free( arg.pPollItems );
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return pollfunc(fds, nfds, timeout);
}
}
//if fail,the timeout would work
}
//3.add timeout
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout( ctx->pTimeout,&arg,now );
if( ret != 0 )
{
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret,now,timeout,arg.ullExpireTime);
errno = EINVAL;
if( arg.pPollItems != arr )
{
free( arg.pPollItems );
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return -__LINE__;
}
co_yield_env( co_get_curr_thread_env() );
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &arg );
for(nfds_t i = 0;i < nfds;i++)
{
int fd = fds[i].fd;
if( fd > -1 )
{
co_epoll_ctl( epfd,EPOLL_CTL_DEL,fd,&arg.pPollItems[i].stEvent );
}
fds[i].revents = arg.fds[i].revents;
}
int iRaiseCnt = arg.iRaiseCnt;
if( arg.pPollItems != arr )
{
free( arg.pPollItems );
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return iRaiseCnt;
}
int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms )
{
return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}
void SetEpoll( stCoRoutineEnv_t *env,stCoEpoll_t *ev )
{
env->pEpoll = ev;
}
stCoEpoll_t *co_get_epoll_ct()
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
return co_get_curr_thread_env()->pEpoll;
}
struct stHookPThreadSpec_t
{
stCoRoutine_t *co;
void *value;
enum
{
size = 1024
};
};
void *co_getspecific(pthread_key_t key)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain )
{
return pthread_getspecific( key );
}
return co->aSpec[ key ].value;
}
int co_setspecific(pthread_key_t key, const void *value)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain )
{
return pthread_setspecific( key,value );
}
co->aSpec[ key ].value = (void*)value;
return 0;
}
void co_disable_hook_sys()
{
stCoRoutine_t *co = GetCurrThreadCo();
if( co )
{
co->cEnableSysHook = 0;
}
}
bool co_is_enable_sys_hook()
{
stCoRoutine_t *co = GetCurrThreadCo();
return ( co && co->cEnableSysHook );
}
stCoRoutine_t *co_self()
{
return GetCurrThreadCo();
}
//co cond
struct stCoCond_t;
struct stCoCondItem_t
{
stCoCondItem_t *pPrev;
stCoCondItem_t *pNext;
stCoCond_t *pLink;
stTimeoutItem_t timeout;
};
struct stCoCond_t
{
stCoCondItem_t *head;
stCoCondItem_t *tail;
};
static void OnSignalProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
stCoCondItem_t *co_cond_pop( stCoCond_t *link );
int co_cond_signal( stCoCond_t *si )
{
stCoCondItem_t * sp = co_cond_pop( si );
if( !sp )
{
return 0;
}
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
return 0;
}
int co_cond_broadcast( stCoCond_t *si )
{
for(;;)
{
stCoCondItem_t * sp = co_cond_pop( si );
if( !sp ) return 0;
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
}
return 0;
}
int co_cond_timedwait( stCoCond_t *link,int ms )
{
stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t));
psi->timeout.pArg = GetCurrThreadCo();
psi->timeout.pfnProcess = OnSignalProcessEvent;
if( ms > 0 )
{
unsigned long long now = GetTickMS();
psi->timeout.ullExpireTime = now + ms;
int ret = AddTimeout( co_get_curr_thread_env()->pEpoll->pTimeout,&psi->timeout,now );
if( ret != 0 )
{
free(psi);
return ret;
}
}
AddTail( link, psi);
co_yield_ct();
RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi );
free(psi);
return 0;
}
stCoCond_t *co_cond_alloc()
{
return (stCoCond_t*)calloc( 1,sizeof(stCoCond_t) );
}
int co_cond_free( stCoCond_t * cc )
{
free( cc );
return 0;
}
stCoCondItem_t *co_cond_pop( stCoCond_t *link )
{
stCoCondItem_t *p = link->head;
if( p )
{
PopHead<stCoCondItem_t,stCoCond_t>( link );
}
return p;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/SunCeIsMe/libco.git
git@gitee.com:SunCeIsMe/libco.git
SunCeIsMe
libco
libco
master

搜索帮助